Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mail channel filter #1651

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,10 @@ def dispatch_scheduled_event(event_id: Text = Path(description="Event id")):
@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id")):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
return {"data": None, "message": "Mail scheduler epoch request!"}


@app.get('/api/mail/stop/{bot}', response_model=Response)
def stop_mail_reading(bot: Text = Path(description="Bot id")):
EventUtility.stop_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler stopped!"}
16 changes: 14 additions & 2 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.kscheduler import KScheduler
from kairon.exceptions import AppException
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import EventClass, ChannelTypes
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import TASK_TYPE
from loguru import logger

Expand Down Expand Up @@ -60,3 +59,16 @@ def schedule_channel_mail_reading(bot: str):
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")

@staticmethod
def stop_channel_mail_reading(bot: str):
from kairon.shared.channels.mail.processor import MailProcessor

try:
mail_processor = MailProcessor(bot)
event_id = mail_processor.state.event_id
if event_id:
mail_processor.update_event_id(None)
KScheduler().delete_job(event_id)
except Exception as e:
raise AppException(f"Failed to stop mail reading for bot {bot}. Error: {str(e)}")
78 changes: 67 additions & 11 deletions kairon/shared/channels/mail/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from loguru import logger
from pydantic.schema import timedelta
from pydantic.validators import datetime
from imap_tools import MailBox, AND
from imap_tools import MailBox, AND, OR, NOT

from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.account.data_objects import Bot
from kairon.shared.channels.mail.constants import MailConstants
Expand Down Expand Up @@ -35,8 +37,9 @@ def update_event_id(self, event_id):
self.state.event_id = event_id
self.state.save()


@staticmethod
def get_mail_channel_state_data(bot):
def get_mail_channel_state_data(bot:str):
"""
Get mail channel state data
"""
Expand Down Expand Up @@ -227,6 +230,57 @@ def process_message_task(bot: str, message_batch: [dict]):
"""
asyncio.run(MailProcessor.process_messages(bot, message_batch))

def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None,
read_status="all"):
"""
Generate IMAP criteria for fetching emails.

Args:
subjects (list[str], optional): List of subjects to include.
ignore_subjects (list[str], optional): List of subjects to exclude.
from_addresses (list[str], optional): List of senders to include.
ignore_from (list[str], optional): List of senders to exclude.
read_status (str): Read status filter ('all', 'seen', 'unseen').

Returns:
imap_tools.query.AND: IMAP criteria object.
"""
criteria = []

if read_status.lower() == "seen":
criteria.append(AND(seen=True))
elif read_status.lower() == "unseen":
criteria.append(AND(seen=False))

if subjects:
criteria.append(OR(subject=subjects))

if ignore_subjects:
for subject in ignore_subjects:
criteria.append(NOT(AND(subject=subject)))

if from_addresses:
criteria.append(OR(from_=from_addresses))

if ignore_from:
for sender in ignore_from:
criteria.append(NOT(AND(from_=sender)))

last_processed_uid = self.state.last_email_uid
base_read_criteria = None
if last_processed_uid == 0:
time_shift = int(self.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
base_read_criteria = AND(date_gte=last_read_timestamp.date())
else:
query = f'{int(last_processed_uid) + 1}:*'
base_read_criteria = AND(uid=query)

criteria.append(base_read_criteria)

# Combine all criteria with AND
return AND(*criteria)


@staticmethod
def read_mails(bot: str) -> ([dict], str):
Expand All @@ -253,15 +307,17 @@ def read_mails(bot: str) -> ([dict], str):
try:
mp.login_imap()
is_logged_in = True
msgs = []
if last_processed_uid == 0:
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
else:
query = f'{int(last_processed_uid) + 1}:*'
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
for msg in msgs:
subject = mp.config.get('subjects', "")
subject = Utility.string_to_list(subject)
ignore_subject = mp.config.get('ignore_subjects', "")
ignore_subject = Utility.string_to_list(ignore_subject)
from_addresses = mp.config.get('from_emails', "")
from_addresses = Utility.string_to_list(from_addresses)
ignore_from = mp.config.get('ignore_from_emails', "")
ignore_from = Utility.string_to_list(ignore_from)
read_status = mp.config.get('seen_status', 'all')
criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status)
for msg in mp.mailbox.fetch(criteria, mark_seen=False):
if int(msg.uid) <= last_processed_uid:
continue
last_processed_uid = int(msg.uid)
Expand Down
21 changes: 19 additions & 2 deletions kairon/shared/channels/mail/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from urllib.parse import urljoin


from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import ChannelTypes


class MailScheduler:
Expand All @@ -28,6 +28,23 @@ def request_epoch(bot: str):
if not resp['success']:
raise AppException("Failed to request email channel epoch")

@staticmethod
def request_stop(bot: str):
event_server_url = Utility.get_event_server_url()
if Utility.is_exist(Channels, raise_error=False, bot=bot, connector_type=ChannelTypes.MAIL.value):
resp = Utility.execute_http_request(
"GET",
urljoin(
event_server_url,
f"/api/mail/stop/{bot}",
),
err_msg="Failed to request epoch",
)
if not resp['success']:
raise AppException("Failed to stop email channel reading")
else:
raise AppException("Mail channel does not exist")




Expand Down
5 changes: 5 additions & 0 deletions kairon/shared/chat/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def delete_channel_config(bot: Text, **kwargs):
:return: None
"""
kwargs.update({"bot": bot})
try:
from kairon.shared.channels.mail.scheduler import MailScheduler
MailScheduler.request_stop(bot)
except Exception as e:
logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}")
Utility.hard_delete_document([Channels], **kwargs)

@staticmethod
Expand Down
12 changes: 11 additions & 1 deletion kairon/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2223,6 +2223,15 @@ def delete_documents(document: Document, user: str = "System"):
document["user"] = user
document.delete()

@staticmethod
def string_to_list(comma_sep_string: str, delimilter: str = ",") -> List[str]:
"""
Convert comma separated string to list
"""
if not comma_sep_string:
return []
return [item.strip() for item in comma_sep_string.split(delimilter) if item.strip()]


class StoryValidator:
@staticmethod
Expand Down Expand Up @@ -2755,4 +2764,5 @@ def __handle_member_left_bot(**kwargs):
body = body.replace("MEMBER_NAME", user_name)
body = body.replace("BOT_NAME", bot_name)
subject = f"Notification: {user_name} has left the {bot_name} bot"
return body, subject
return body, subject

5 changes: 5 additions & 0 deletions metadata/integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ channels:
optional_fields:
- interval
- intent
- subjects
- ignore_subjects
- from_emails
- ignore_from_emails
- seen_status

actions:
pipedrive:
Expand Down
54 changes: 52 additions & 2 deletions tests/unit_test/channels/mail_channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest.mock import patch, MagicMock

import pytest
from imap_tools import MailMessage
from imap_tools import MailMessage, AND

from mongoengine import connect, disconnect
from uuid6 import uuid7
Expand Down Expand Up @@ -276,6 +276,53 @@ def test_process_mail(self, mock_get_channel_config):
assert result == "Hello John Doe, How can I help you today?"



@patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config")
@pytest.mark.asyncio
async def test_generate_criteria(self, mock_get_channel_config):
bot_id = pytest.mail_test_bot
mock_get_channel_config.return_value = {
'config': {
'email_account': "[email protected]",
'email_password': "password",
'imap_server': "imap.testuser.com",
}
}

mp = MailProcessor(bot=bot_id)
mp.state.last_email_uid = 123
#seen
criteria = mp.generate_criteria(read_status="seen")
print(criteria)
assert criteria == '((SEEN) (UID 124:*))'

#unseen
criteria = mp.generate_criteria(read_status="unseen")
assert criteria == '((UNSEEN) (UID 124:*))'

#default
criteria = mp.generate_criteria()
assert criteria == '((UID 124:*))'

#subjects
criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"])
assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))'

#from
criteria = mp.generate_criteria(from_addresses=["info", "[email protected]", "[email protected]"])
assert criteria == '((OR OR FROM "[email protected]" FROM "[email protected]" FROM "info") (UID 124:*))'

#mix
criteria = mp.generate_criteria(read_status="unseen",
subjects=["Test Subject", "another test subject", "happy"],
ignore_subjects=['cat'],
ignore_from=["info", "nomreply"],
from_addresses=["@digite.com", "@nimblework.com"])

assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))'



@patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap")
@patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task")
@patch("kairon.shared.channels.mail.processor.MailBox")
Expand Down Expand Up @@ -538,4 +585,7 @@ def test_get_log_exception(self):
BotSettings.objects(user="mail_channel_test_user_acc").delete()
Bot.objects(user="mail_channel_test_user_acc").delete()
Account.objects(user="mail_channel_test_user_acc").delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()



53 changes: 53 additions & 0 deletions tests/unit_test/channels/mail_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,56 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr
EventUtility.schedule_channel_mail_reading(bot)
assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception"

@patch('kairon.events.utility.KScheduler.delete_job')
@patch('kairon.events.utility.KScheduler.__init__', return_value=None)
@patch('kairon.shared.channels.mail.processor.MailProcessor')
@patch('pymongo.MongoClient', autospec=True)
def test_stop_channel_mail_reading(mock_mongo, mock_mail_processor, mock_kscheduler, mock_delete_job):
from kairon.events.utility import EventUtility

bot = "test_bot"
mock_mail_processor_instance = mock_mail_processor.return_value
mock_mail_processor_instance.config = {"interval": 1}
mock_mail_processor_instance.state.event_id = 'existing_event_id'
mock_mail_processor_instance.bot_settings.user = "test_user"

# # Test case when event_id is None
EventUtility.stop_channel_mail_reading(bot)
mock_delete_job.assert_called_once()

@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': True}
mock_imp.return_value = True
bot = "test_bot"
try:
MailScheduler.request_stop(bot)
except AppException:
pytest.fail("request_epoch() raised AppException unexpectedly!")


@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_response_not_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': False}
mock_imp.return_value = True
bot = "test_bot"
with pytest.raises(AppException):
MailScheduler.request_stop(bot)

@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_no_channel_exist_exception(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': True}
mock_imp.return_value = False
bot = "test_bot"
with pytest.raises(AppException):
MailScheduler.request_stop(bot)

29 changes: 28 additions & 1 deletion tests/unit_test/utility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3305,4 +3305,31 @@ async def test_handle_first_name_member_update_email_template(self, validate_and
)
validate_and_send_mail_mock.assert_called_once_with(
email, expected_subject, expected_body
)
)


def test_comma_sep_string_to_list(self):
# Test input with empty elements
assert Utility.string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"]

# Test input with only commas
assert Utility.string_to_list(",,,") == []

# Test input with no commas (single element)
assert Utility.string_to_list("apple") == ["apple"]

# Test input with empty string
assert Utility.string_to_list("") == []

# Test input with None
assert Utility.string_to_list(None) == []

# Test input with special characters
assert Utility.string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"]

# Test input with numeric values
assert Utility.string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"]

# Test input with spaces
assert Utility.string_to_list("apple, banana, orange") == ["apple", "banana", "orange"]

Loading