diff --git a/kairon/events/server.py b/kairon/events/server.py index 3afa312b4..bbcb89f1f 100644 --- a/kairon/events/server.py +++ b/kairon/events/server.py @@ -149,4 +149,10 @@ def dispatch_scheduled_event(event_id: Text = Path(description="Event id")): @app.get('/api/mail/schedule/{bot}', response_model=Response) def request_epoch(bot: Text = Path(description="Bot id")): EventUtility.schedule_channel_mail_reading(bot) - return {"data": None, "message": "Mail scheduler epoch request!"} \ No newline at end of file + return {"data": None, "message": "Mail scheduler epoch request!"} + + +@app.get('/api/mail/stop/{bot}', response_model=Response) +def stop_mail_reading(bot: Text = Path(description="Bot id")): + EventUtility.stop_channel_mail_reading(bot) + return {"data": None, "message": "Mail scheduler stopped!"} \ No newline at end of file diff --git a/kairon/events/utility.py b/kairon/events/utility.py index 85fe35493..f201b2613 100644 --- a/kairon/events/utility.py +++ b/kairon/events/utility.py @@ -5,8 +5,7 @@ from kairon.events.executors.factory import ExecutorFactory from kairon.events.scheduler.kscheduler import KScheduler from kairon.exceptions import AppException -from kairon.shared.chat.data_objects import Channels -from kairon.shared.constants import EventClass, ChannelTypes +from kairon.shared.constants import EventClass from kairon.shared.data.constant import TASK_TYPE from loguru import logger @@ -60,3 +59,16 @@ def schedule_channel_mail_reading(bot: str): EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user}) except Exception as e: raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}") + + @staticmethod + def stop_channel_mail_reading(bot: str): + from kairon.shared.channels.mail.processor import MailProcessor + + try: + mail_processor = MailProcessor(bot) + event_id = mail_processor.state.event_id + if event_id: + mail_processor.update_event_id(None) + KScheduler().delete_job(event_id) + except Exception as e: + raise AppException(f"Failed to stop mail reading for bot {bot}. Error: {str(e)}") diff --git a/kairon/shared/channels/mail/processor.py b/kairon/shared/channels/mail/processor.py index e19533258..5bc37bed3 100644 --- a/kairon/shared/channels/mail/processor.py +++ b/kairon/shared/channels/mail/processor.py @@ -1,14 +1,16 @@ import asyncio import time +from typing import List from loguru import logger from pydantic.schema import timedelta from pydantic.validators import datetime -from imap_tools import MailBox, AND +from imap_tools import MailBox, AND, OR, NOT from kairon.exceptions import AppException from kairon.shared.account.data_objects import Bot from kairon.shared.channels.mail.constants import MailConstants from kairon.shared.channels.mail.data_objects import MailResponseLog, MailStatus, MailChannelStateData +from kairon.shared.chat.data_objects import Channels from kairon.shared.chat.processor import ChatDataProcessor from kairon.shared.constants import ChannelTypes from kairon.shared.data.data_objects import BotSettings @@ -35,8 +37,18 @@ def update_event_id(self, event_id): self.state.event_id = event_id self.state.save() + + @staticmethod + def does_mail_channel_exist(bot:str): + """ + Check if mail channel exists + """ + if Channels.objects(bot=bot, connector_type=ChannelTypes.MAIL.value).first(): + return True + return False + @staticmethod - def get_mail_channel_state_data(bot): + def get_mail_channel_state_data(bot:str): """ Get mail channel state data """ @@ -227,6 +239,65 @@ def process_message_task(bot: str, message_batch: [dict]): """ asyncio.run(MailProcessor.process_messages(bot, message_batch)) + def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None, + read_status="all"): + """ + Generate IMAP criteria for fetching emails. + + Args: + subjects (list[str], optional): List of subjects to include. + ignore_subjects (list[str], optional): List of subjects to exclude. + from_addresses (list[str], optional): List of senders to include. + ignore_from (list[str], optional): List of senders to exclude. + read_status (str): Read status filter ('all', 'seen', 'unseen'). + + Returns: + imap_tools.query.AND: IMAP criteria object. + """ + criteria = [] + + if read_status.lower() == "seen": + criteria.append(AND(seen=True)) + elif read_status.lower() == "unseen": + criteria.append(AND(seen=False)) + + if subjects: + criteria.append(OR(subject=subjects)) + + if ignore_subjects: + for subject in ignore_subjects: + criteria.append(NOT(AND(subject=subject))) + + if from_addresses: + criteria.append(OR(from_=from_addresses)) + + if ignore_from: + for sender in ignore_from: + criteria.append(NOT(AND(from_=sender))) + + last_processed_uid = self.state.last_email_uid + base_read_criteria = None + if last_processed_uid == 0: + time_shift = int(self.config.get('interval', 20 * 60)) + last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) + base_read_criteria = AND(date_gte=last_read_timestamp.date()) + else: + query = f'{int(last_processed_uid) + 1}:*' + base_read_criteria = AND(uid=query) + + criteria.append(base_read_criteria) + + # Combine all criteria with AND + return AND(*criteria) + + @staticmethod + def comma_sep_string_to_list(comma_sep_string: str) -> List[str]: + """ + Convert comma separated string to list + """ + if not comma_sep_string: + return [] + return [item.strip() for item in comma_sep_string.split(",") if item.strip()] @staticmethod def read_mails(bot: str) -> ([dict], str): @@ -253,15 +324,17 @@ def read_mails(bot: str) -> ([dict], str): try: mp.login_imap() is_logged_in = True - msgs = [] - if last_processed_uid == 0: - time_shift = int(mp.config.get('interval', 20 * 60)) - last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) - msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False) - else: - query = f'{int(last_processed_uid) + 1}:*' - msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False) - for msg in msgs: + subject = mp.config.get('subjects', "") + subject = MailProcessor.comma_sep_string_to_list(subject) + ignore_subject = mp.config.get('ignore_subjects', "") + ignore_subject = MailProcessor.comma_sep_string_to_list(ignore_subject) + from_addresses = mp.config.get('from_emails', "") + from_addresses = MailProcessor.comma_sep_string_to_list(from_addresses) + ignore_from = mp.config.get('ignore_from_emails', "") + ignore_from = MailProcessor.comma_sep_string_to_list(ignore_from) + read_status = mp.config.get('seen_status', 'all') + criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status) + for msg in mp.mailbox.fetch(criteria, mark_seen=False): if int(msg.uid) <= last_processed_uid: continue last_processed_uid = int(msg.uid) diff --git a/kairon/shared/channels/mail/scheduler.py b/kairon/shared/channels/mail/scheduler.py index adff58319..d87a77fcc 100644 --- a/kairon/shared/channels/mail/scheduler.py +++ b/kairon/shared/channels/mail/scheduler.py @@ -1,6 +1,4 @@ from urllib.parse import urljoin - - from kairon import Utility from kairon.exceptions import AppException from kairon.shared.channels.mail.processor import MailProcessor @@ -28,6 +26,23 @@ def request_epoch(bot: str): if not resp['success']: raise AppException("Failed to request email channel epoch") + @staticmethod + def request_stop(bot: str): + event_server_url = Utility.get_event_server_url() + if MailProcessor.does_mail_channel_exist(bot): + resp = Utility.execute_http_request( + "GET", + urljoin( + event_server_url, + f"/api/mail/stop/{bot}", + ), + err_msg="Failed to request epoch", + ) + if not resp['success']: + raise AppException("Failed to stop email channel reading") + else: + raise AppException("Mail channel does not exist") + diff --git a/kairon/shared/chat/processor.py b/kairon/shared/chat/processor.py index 77fc10500..77b4d305c 100644 --- a/kairon/shared/chat/processor.py +++ b/kairon/shared/chat/processor.py @@ -70,6 +70,12 @@ def delete_channel_config(bot: Text, **kwargs): :return: None """ kwargs.update({"bot": bot}) + if kwargs.get("connector_type") != ChannelTypes.SLACK.value: + try: + from kairon.shared.channels.mail.scheduler import MailScheduler + MailScheduler.request_stop(bot) + except Exception as e: + logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}") Utility.hard_delete_document([Channels], **kwargs) @staticmethod diff --git a/metadata/integrations.yml b/metadata/integrations.yml index d433d18f4..5b8b5755d 100644 --- a/metadata/integrations.yml +++ b/metadata/integrations.yml @@ -85,6 +85,11 @@ channels: optional_fields: - interval - intent + - subjects + - ignore_subjects + - from_emails + - ignore_from_emails + - seen_status actions: pipedrive: diff --git a/tests/unit_test/channels/mail_channel_test.py b/tests/unit_test/channels/mail_channel_test.py index 7a6e7f103..c02b51da6 100644 --- a/tests/unit_test/channels/mail_channel_test.py +++ b/tests/unit_test/channels/mail_channel_test.py @@ -3,7 +3,7 @@ from unittest.mock import patch, MagicMock import pytest -from imap_tools import MailMessage +from imap_tools import MailMessage, AND from mongoengine import connect, disconnect from uuid6 import uuid7 @@ -276,6 +276,53 @@ def test_process_mail(self, mock_get_channel_config): assert result == "Hello John Doe, How can I help you today?" + + @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") + @pytest.mark.asyncio + async def test_generate_criteria(self, mock_get_channel_config): + bot_id = pytest.mail_test_bot + mock_get_channel_config.return_value = { + 'config': { + 'email_account': "mail_channel_test_user_acc@testuser.com", + 'email_password': "password", + 'imap_server': "imap.testuser.com", + } + } + + mp = MailProcessor(bot=bot_id) + mp.state.last_email_uid = 123 + #seen + criteria = mp.generate_criteria(read_status="seen") + print(criteria) + assert criteria == '((SEEN) (UID 124:*))' + + #unseen + criteria = mp.generate_criteria(read_status="unseen") + assert criteria == '((UNSEEN) (UID 124:*))' + + #default + criteria = mp.generate_criteria() + assert criteria == '((UID 124:*))' + + #subjects + criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"]) + assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))' + + #from + criteria = mp.generate_criteria(from_addresses=["info", "important1@gmail.com", "anotherparrtern@gmail.com"]) + assert criteria == '((OR OR FROM "anotherparrtern@gmail.com" FROM "important1@gmail.com" FROM "info") (UID 124:*))' + + #mix + criteria = mp.generate_criteria(read_status="unseen", + subjects=["Test Subject", "another test subject", "happy"], + ignore_subjects=['cat'], + ignore_from=["info", "nomreply"], + from_addresses=["@digite.com", "@nimblework.com"]) + + assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))' + + + @patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap") @patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task") @patch("kairon.shared.channels.mail.processor.MailBox") @@ -538,4 +585,42 @@ def test_get_log_exception(self): BotSettings.objects(user="mail_channel_test_user_acc").delete() Bot.objects(user="mail_channel_test_user_acc").delete() Account.objects(user="mail_channel_test_user_acc").delete() - Channels.objects(connector_type=ChannelTypes.MAIL.value).delete() \ No newline at end of file + Channels.objects(connector_type=ChannelTypes.MAIL.value).delete() + + + def test_comma_sep_string_to_list(self): + # Test input with empty elements + assert MailProcessor.comma_sep_string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"] + + # Test input with only commas + assert MailProcessor.comma_sep_string_to_list(",,,") == [] + + # Test input with no commas (single element) + assert MailProcessor.comma_sep_string_to_list("apple") == ["apple"] + + # Test input with empty string + assert MailProcessor.comma_sep_string_to_list("") == [] + + # Test input with None + assert MailProcessor.comma_sep_string_to_list(None) == [] + + # Test input with special characters + assert MailProcessor.comma_sep_string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"] + + # Test input with numeric values + assert MailProcessor.comma_sep_string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"] + + # Test input with spaces + assert MailProcessor.comma_sep_string_to_list("apple, banana, orange") == ["apple", "banana", "orange"] + + + @patch('kairon.shared.channels.mail.processor.Channels') + def test_mail_channel_exist(self, mock_channels): + mock_channels.objects.return_value.first.return_value = True + assert MailProcessor.does_mail_channel_exist("test_bot") is True + + @patch('kairon.shared.channels.mail.processor.Channels') + def test_mail_channel_not_exist(self, mock_channels): + mock_channels.objects.return_value.first.return_value = False + assert MailProcessor.does_mail_channel_exist("test_bot") is False + diff --git a/tests/unit_test/channels/mail_scheduler_test.py b/tests/unit_test/channels/mail_scheduler_test.py index f7073edc9..60b634b33 100644 --- a/tests/unit_test/channels/mail_scheduler_test.py +++ b/tests/unit_test/channels/mail_scheduler_test.py @@ -108,3 +108,57 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr EventUtility.schedule_channel_mail_reading(bot) assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception" +@patch('kairon.events.utility.KScheduler.delete_job') +@patch('kairon.events.utility.KScheduler.__init__', return_value=None) +@patch('kairon.shared.channels.mail.processor.MailProcessor') +@patch('pymongo.MongoClient', autospec=True) +def test_stop_channel_mail_reading(mock_mongo, mock_mail_processor, mock_kscheduler, mock_delete_job): + from kairon.events.utility import EventUtility + + bot = "test_bot" + mock_mail_processor_instance = mock_mail_processor.return_value + mock_mail_processor_instance.config = {"interval": 1} + mock_mail_processor_instance.state.event_id = 'existing_event_id' + mock_mail_processor_instance.bot_settings.user = "test_user" + +# # Test case when event_id is None + EventUtility.stop_channel_mail_reading(bot) + mock_delete_job.assert_called_once() + + +@patch('kairon.shared.channels.mail.processor.MailProcessor.does_mail_channel_exist') +@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') +@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') +def test_request_stop_success(mock_execute_http_request, mock_get_event_server_url, mock_imp): + mock_get_event_server_url.return_value = "http://localhost" + mock_execute_http_request.return_value = {'success': True} + mock_imp.return_value = True + bot = "test_bot" + try: + MailScheduler.request_stop(bot) + except AppException: + pytest.fail("request_epoch() raised AppException unexpectedly!") + + +@patch('kairon.shared.channels.mail.processor.MailProcessor.does_mail_channel_exist') +@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') +@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') +def test_request_stop__response_not_success(mock_execute_http_request, mock_get_event_server_url, mock_imp): + mock_get_event_server_url.return_value = "http://localhost" + mock_execute_http_request.return_value = {'success': False} + mock_imp.return_value = True + bot = "test_bot" + with pytest.raises(AppException): + MailScheduler.request_stop(bot) + +@patch('kairon.shared.channels.mail.processor.MailProcessor.does_mail_channel_exist') +@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') +@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') +def test_request_stop_no_channel_exist_exception(mock_execute_http_request, mock_get_event_server_url, mock_imp): + mock_get_event_server_url.return_value = "http://localhost" + mock_execute_http_request.return_value = {'success': True} + mock_imp.return_value = False + bot = "test_bot" + with pytest.raises(AppException): + MailScheduler.request_stop(bot) +