diff --git a/kairon/__init__.py b/kairon/__init__.py index 7f04b40b1..b43c09932 100644 --- a/kairon/__init__.py +++ b/kairon/__init__.py @@ -45,7 +45,7 @@ def create_argument_parser(): from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\ - message_broadcast,content_importer, mail_channel_process, mail_channel_read + message_broadcast,content_importer, mail_channel_read parser = ArgumentParser( prog="kairon", @@ -63,7 +63,6 @@ def create_argument_parser(): delete_logs.add_subparser(subparsers, parents=parent_parsers) message_broadcast.add_subparser(subparsers, parents=parent_parsers) content_importer.add_subparser(subparsers, parents=parent_parsers) - mail_channel_process.add_subparser(subparsers, parents=parent_parsers) mail_channel_read.add_subparser(subparsers, parents=parent_parsers) return parser diff --git a/kairon/cli/mail_channel_process.py b/kairon/cli/mail_channel_process.py deleted file mode 100644 index 21a734b32..000000000 --- a/kairon/cli/mail_channel_process.py +++ /dev/null @@ -1,36 +0,0 @@ -import json -from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter -from typing import List -from rasa.cli import SubParsersAction - -from kairon.events.definitions.mail_channel import MailProcessEvent - - -def process_channel_mails(args): - mails = json.loads(args.mails) - if not isinstance(mails, list): - raise ValueError("Mails should be a list") - MailProcessEvent(args.bot, args.user).execute(mails=mails) - - -def add_subparser(subparsers: SubParsersAction, parents: List[ArgumentParser]): - mail_parser = subparsers.add_parser( - "mail-channel-process", - conflict_handler="resolve", - formatter_class=ArgumentDefaultsHelpFormatter, - parents=parents, - help="Mail channel process mails" - ) - mail_parser.add_argument('bot', - type=str, - help="Bot id for which command is executed", action='store') - - mail_parser.add_argument('user', - type=str, - help="Kairon user who is initiating the command", action='store') - - mail_parser.add_argument('mails', - type=str, - help="json representing List of mails to be processed", action='store') - - mail_parser.set_defaults(func=process_channel_mails) \ No newline at end of file diff --git a/kairon/events/definitions/factory.py b/kairon/events/definitions/factory.py index 31adda787..5ce3696a2 100644 --- a/kairon/events/definitions/factory.py +++ b/kairon/events/definitions/factory.py @@ -2,7 +2,7 @@ from kairon.events.definitions.data_importer import TrainingDataImporterEvent from kairon.events.definitions.faq_importer import FaqDataImporterEvent from kairon.events.definitions.history_delete import DeleteHistoryEvent -from kairon.events.definitions.mail_channel import MailProcessEvent, MailReadEvent +from kairon.events.definitions.mail_channel import MailReadEvent from kairon.events.definitions.message_broadcast import MessageBroadcastEvent from kairon.events.definitions.model_testing import ModelTestingEvent from kairon.events.definitions.model_training import ModelTrainingEvent @@ -23,7 +23,6 @@ class EventFactory: EventClass.message_broadcast: MessageBroadcastEvent, EventClass.content_importer: DocContentImporterEvent, EventClass.mail_channel_read_mails: MailReadEvent, - EventClass.mail_channel_process_mails: MailProcessEvent } @staticmethod diff --git a/kairon/events/definitions/mail_channel.py b/kairon/events/definitions/mail_channel.py index 61b09fd7e..87baa67f5 100644 --- a/kairon/events/definitions/mail_channel.py +++ b/kairon/events/definitions/mail_channel.py @@ -3,55 +3,11 @@ from kairon import Utility from kairon.events.definitions.base import EventsBase from kairon.exceptions import AppException +from kairon.shared.channels.mail.constants import MailConstants from kairon.shared.channels.mail.processor import MailProcessor from kairon.shared.constants import EventClass -class MailProcessEvent(EventsBase): - """ - Event to start mail channel scheduler if not already running. - """ - - def __init__(self, bot: Text, user: Text, **kwargs): - """ - Initialise event. - """ - self.bot = bot - self.user = user - - def validate(self): - """ - validate mail channel exists - """ - return MailProcessor.validate_smtp_connection(self.bot) - - - def enqueue(self, **kwargs): - """ - Send event to event server. - """ - try: - mails: list = kwargs.get('mails', []) - payload = {'bot': self.bot, 'user': self.user, 'mails': mails} - Utility.request_event_server(EventClass.mail_channel_process_mails, payload) - except Exception as e: - logger.error(str(e)) - raise AppException(e) - - def execute(self, **kwargs): - """ - Execute the event. - """ - try: - mails = kwargs.get('mails') - if mails: - MailProcessor.process_message_task(self.bot, mails) - except Exception as e: - logger.error(str(e)) - raise AppException(e) - - - class MailReadEvent(EventsBase): """ Event to read mails from mail channel and create events for each mail tp process them via bot @@ -66,9 +22,9 @@ def __init__(self, bot: Text, user: Text, **kwargs): def validate(self): """ - validate mail channel exists + validate mail channel exists and works properly """ - return MailProcessor.validate_imap_connection(self.bot) + return MailProcessor.validate_imap_connection(self.bot) and MailProcessor.validate_imap_connection(self.bot) def enqueue(self, **kwargs): """ @@ -76,6 +32,7 @@ def enqueue(self, **kwargs): """ try: payload = {'bot': self.bot, 'user': self.user} + self.validate() Utility.request_event_server(EventClass.mail_channel_read_mails, payload) except Exception as e: logger.error(str(e)) @@ -87,12 +44,10 @@ def execute(self, **kwargs): """ try: vals = MailProcessor.read_mails(self.bot) - print(vals) - emails, _, _ = vals - for email in emails: - ev = MailProcessEvent(self.bot, self.user) - ev.validate() - ev.enqueue(mails=[email]) - + emails, _ = vals + batch_size = MailConstants.PROCESS_MESSAGE_BATCH_SIZE + for i in range(0, len(emails), batch_size): + batch = emails[i:i + batch_size] + MailProcessor.process_message_task(self.bot, batch) except Exception as e: raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}") diff --git a/kairon/events/server.py b/kairon/events/server.py index 781c5c2fc..3afa312b4 100644 --- a/kairon/events/server.py +++ b/kairon/events/server.py @@ -56,7 +56,6 @@ async def lifespan(app: FastAPI): """ MongoDB is connected on the bot trainer startup """ config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"]) connect(**config) - EventUtility.reschedule_all_bots_channel_mail_reading() yield disconnect() diff --git a/kairon/events/utility.py b/kairon/events/utility.py index 1d4f0be24..85fe35493 100644 --- a/kairon/events/utility.py +++ b/kairon/events/utility.py @@ -60,13 +60,3 @@ def schedule_channel_mail_reading(bot: str): EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user}) except Exception as e: raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}") - - @staticmethod - def reschedule_all_bots_channel_mail_reading(): - try: - bots = list(Channels.objects(connector_type= ChannelTypes.MAIL.value).distinct("bot")) - for bot in bots: - logger.info(f"Rescheduling mail reading for bot {bot}") - EventUtility.schedule_channel_mail_reading(bot) - except Exception as e: - raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}") \ No newline at end of file diff --git a/kairon/shared/channels/mail/constants.py b/kairon/shared/channels/mail/constants.py index 737448543..37558f220 100644 --- a/kairon/shared/channels/mail/constants.py +++ b/kairon/shared/channels/mail/constants.py @@ -7,5 +7,5 @@ class MailConstants: DEFAULT_TEMPLATE = "

{bot_response}



Generated by kAIron AI.\n" - PROCESS_MESSAGE_BATCH_SIZE = 4 + PROCESS_MESSAGE_BATCH_SIZE = 8 diff --git a/kairon/shared/channels/mail/data_objects.py b/kairon/shared/channels/mail/data_objects.py index ab387e0eb..f587cd584 100644 --- a/kairon/shared/channels/mail/data_objects.py +++ b/kairon/shared/channels/mail/data_objects.py @@ -28,8 +28,7 @@ class MailResponseLog(Auditlog): Mail response log """ sender_id = StringField(required=True) - subject = StringField() - body = StringField() + uid = IntField(required=True) responses = ListField() slots = DictField() bot = StringField(required=True) diff --git a/kairon/shared/channels/mail/processor.py b/kairon/shared/channels/mail/processor.py index f68f286bb..e19533258 100644 --- a/kairon/shared/channels/mail/processor.py +++ b/kairon/shared/channels/mail/processor.py @@ -229,7 +229,7 @@ def process_message_task(bot: str, message_batch: [dict]): @staticmethod - def read_mails(bot: str) -> ([dict], str, int): + def read_mails(bot: str) -> ([dict], str): """ Read mails from the mailbox Parameters: @@ -247,16 +247,20 @@ def read_mails(bot: str) -> ([dict], str, int): """ logger.info(f"reading mails for {bot}") mp = MailProcessor(bot) - time_shift = int(mp.config.get('interval', 20 * 60)) - last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) messages = [] is_logged_in = False last_processed_uid = mp.state.last_email_uid - query = f'{int(last_processed_uid) + 1}:*' try: mp.login_imap() is_logged_in = True - msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date(), uid=query), mark_seen=False) + msgs = [] + if last_processed_uid == 0: + time_shift = int(mp.config.get('interval', 20 * 60)) + last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) + msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False) + else: + query = f'{int(last_processed_uid) + 1}:*' + msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False) for msg in msgs: if int(msg.uid) <= last_processed_uid: continue @@ -267,8 +271,7 @@ def read_mails(bot: str) -> ([dict], str, int): body = msg.text or msg.html or "" #attachments = msg.attachments mail_log = MailResponseLog(sender_id = sender_id, - subject = subject, - body = body, + uid = last_processed_uid, bot = bot, user = mp.bot_settings.user, status=MailStatus.Processing.value, @@ -288,11 +291,11 @@ def read_mails(bot: str) -> ([dict], str, int): mp.state.save() is_logged_in = False - return messages, mp.bot_settings.user, time_shift + return messages, mp.bot_settings.user except Exception as e: logger.exception(e) if is_logged_in: mp.logout_imap() - return [], mp.bot_settings.user, time_shift + return [], mp.bot_settings.user diff --git a/kairon/shared/constants.py b/kairon/shared/constants.py index b4151ea8d..36e23ee55 100644 --- a/kairon/shared/constants.py +++ b/kairon/shared/constants.py @@ -80,7 +80,6 @@ class EventClass(str, Enum): web_search = "web_search" scheduler_evaluator = "scheduler_evaluator" content_importer = "content_importer" - mail_channel_process_mails = "email_channel_process_mails" mail_channel_read_mails = "email_channel_read_mails" diff --git a/system.yaml b/system.yaml index 5c6ad8ebf..36ac27f94 100644 --- a/system.yaml +++ b/system.yaml @@ -140,7 +140,6 @@ events: scheduler_evaluator: ${SCHEDULER_TASK_DEFINITION} content_importer: ${DOC_CONTENT_IMPORTER_TASK_DEFINITION} email_channel_read_mails: ${MAIL_CHANNEL_READ_TASK_DEFINITION} - email_channel_process_mails: ${MAIL_CHANNEL_PROCESS_TASK_DEFINITION} scheduler: collection: ${EVENT_SCHEDULER_COLLECTION:"kscheduler"} type: ${EVENT_SCHEDULER_TYPE:"kscheduler"} diff --git a/tests/unit_test/channels/mail_channel_test.py b/tests/unit_test/channels/mail_channel_test.py index c4cd2f127..7a6e7f103 100644 --- a/tests/unit_test/channels/mail_channel_test.py +++ b/tests/unit_test/channels/mail_channel_test.py @@ -172,8 +172,7 @@ async def test_send_mail(self, mock_get_channel_config, mock_smtp): mail_response_log = MailResponseLog(bot=pytest.mail_test_bot, sender_id="recipient@test.com", user="mail_channel_test_user_acc", - subject="Test Subject", - body="Test Body", + uid=123 ) mail_response_log.save() mail_response_log.save() @@ -210,8 +209,7 @@ async def test_send_mail_exception(self, mock_get_channel_config, mock_smtp): mail_response_log = MailResponseLog(bot=pytest.mail_test_bot, sender_id="recipient@test.com", user="mail_channel_test_user_acc", - subject="Test Subject", - body="Test Body", + uid=123 ) mail_response_log.save() @@ -253,8 +251,7 @@ def test_process_mail(self, mock_get_channel_config): mail_response_log = MailResponseLog(bot=pytest.mail_test_bot, sender_id="recipient@test.com", user="mail_channel_test_user_acc", - subject="Test Subject", - body="Test Body", + uid=123 ) mail_response_log.save() @@ -311,7 +308,7 @@ async def test_read_mails(self, mock_get_channel_config, mock_mailbox_instance.login.return_value = mock_mailbox_instance mock_mailbox_instance.fetch.return_value = [mock_mail_message] - mails, user, time_shift = MailProcessor.read_mails(bot_id) + mails, user = MailProcessor.read_mails(bot_id) print(mails) assert len(mails) == 1 assert mails[0]["subject"] == "Test Subject" @@ -319,7 +316,6 @@ async def test_read_mails(self, mock_get_channel_config, assert mails[0]["date"] == "2023-10-10" assert mails[0]["body"] == "Test Body" assert user == 'mail_channel_test_user_acc' - assert time_shift == 1200 @@ -349,10 +345,9 @@ async def test_read_mails_no_messages(self, mock_get_channel_config, mock_mailbox_instance.login.return_value = mock_mailbox_instance mock_mailbox_instance.fetch.return_value = [] - mails, user, time_shift = MailProcessor.read_mails(bot_id) + mails, user = MailProcessor.read_mails(bot_id) assert len(mails) == 0 assert user == 'mail_channel_test_user_acc' - assert time_shift == 1200 mock_logout_imap.assert_called_once() @@ -369,8 +364,7 @@ async def test_process_messages(self, mock_process_messages_via_bot, mock_send_m mail_response_log = MailResponseLog(bot=pytest.mail_test_bot, sender_id="recipient@test.com", user="mail_channel_test_user_acc", - subject="Test Subject", - body="Test Body", + uid=123 ) mail_response_log.save() diff --git a/tests/unit_test/channels/mail_scheduler_test.py b/tests/unit_test/channels/mail_scheduler_test.py index 8df57f86d..f7073edc9 100644 --- a/tests/unit_test/channels/mail_scheduler_test.py +++ b/tests/unit_test/channels/mail_scheduler_test.py @@ -108,32 +108,3 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr EventUtility.schedule_channel_mail_reading(bot) assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception" - -@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading') -@patch('kairon.shared.chat.data_objects.Channels.objects') -def test_reschedule_all_bots_channel_mail_reading(mock_channels_objects, mock_schedule_channel_mail_reading): - from kairon.events.utility import EventUtility - - mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2'] - - EventUtility.reschedule_all_bots_channel_mail_reading() - - mock_channels_objects.return_value.distinct.assert_called_once_with("bot") - assert mock_schedule_channel_mail_reading.call_count == 2 - mock_schedule_channel_mail_reading.assert_any_call('bot1') - mock_schedule_channel_mail_reading.assert_any_call('bot2') - -@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading') -@patch('kairon.shared.chat.data_objects.Channels.objects') -def test_reschedule_all_bots_channel_mail_reading_exception(mock_channels_objects, mock_schedule_channel_mail_reading): - from kairon.events.utility import EventUtility - - mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2'] - mock_schedule_channel_mail_reading.side_effect = Exception("Test Exception") - - with pytest.raises(AppException) as excinfo: - EventUtility.reschedule_all_bots_channel_mail_reading() - - assert str(excinfo.value) == "Failed to reschedule mail reading events. Error: Test Exception" - mock_channels_objects.return_value.distinct.assert_called_once_with("bot") - assert mock_schedule_channel_mail_reading.call_count == 1 \ No newline at end of file diff --git a/tests/unit_test/cli_test.py b/tests/unit_test/cli_test.py index 3dce1d99f..608b4c11c 100644 --- a/tests/unit_test/cli_test.py +++ b/tests/unit_test/cli_test.py @@ -324,32 +324,6 @@ def init_connection(self): Utility.load_environment() connect(**Utility.mongoengine_connection(Utility.environment['database']["url"])) - @mock.patch("kairon.cli.mail_channel_process.MailProcessEvent.execute") - def test_start_mail_channel_process(self, mock_execute): - from kairon.cli.mail_channel_process import process_channel_mails - data = [{"mail": "test_mail"}] - data = json.dumps(data) - with patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(func=process_channel_mails, - bot="test_bot", - user="test_user", mails=data)): - cli() - mock_execute.assert_called_once() - - - @mock.patch("kairon.cli.mail_channel_process.MailProcessEvent.execute") - def test_start_mail_channel_process_wrong_format(self, mock_execute): - from kairon.cli.mail_channel_process import process_channel_mails - data = {"mail": "test_mail"} - data = json.dumps(data) - with patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(func=process_channel_mails, - bot="test_bot", - user="test_user", mails=data)): - with pytest.raises(ValueError): - cli() - mock_execute.assert_not_called() - @mock.patch("kairon.cli.mail_channel_read.MailReadEvent.execute") def test_start_mail_channel_read(self, mock_execute): from kairon.cli.mail_channel_read import read_channel_mails diff --git a/tests/unit_test/events/definitions_test.py b/tests/unit_test/events/definitions_test.py index da5d630a7..7a3f179fa 100644 --- a/tests/unit_test/events/definitions_test.py +++ b/tests/unit_test/events/definitions_test.py @@ -1200,94 +1200,6 @@ def test_delete_message_broadcast(self): MessageBroadcastProcessor.get_settings(setting_id, bot) - - @responses.activate - def test_validate_mail_channel_schedule_event(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - bot = "test_add_schedule_event" - user = "test_user" - url = f"http://localhost:5001/api/events/execute/{EventClass.mail_channel_process_mails}?is_scheduled=False" - responses.add( - "POST", url, - json={"message": "test msg", "success": True, "error_code": 400, "data": None} - ) - with patch('kairon.shared.channels.mail.processor.MailProcessor.__init__', return_value=None) as mp: - with patch('kairon.shared.channels.mail.processor.MailProcessor.login_smtp', return_value=None) as mock_login: - with patch('kairon.shared.channels.mail.processor.MailProcessor.logout_smtp', return_value=None) as mock_logout: - - event = MailProcessEvent(bot, user) - status = event.validate() - assert status - - @responses.activate - def test_validate_mail_channel_schedule_event_fail(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - bot = "test_add_schedule_event" - user = "test_user" - url = f"http://localhost:5001/api/events/execute/{EventClass.mail_channel_process_mails}?is_scheduled=False" - responses.add( - "POST", url, - json={"message": "test msg", "success": True, "error_code": 400, "data": None} - ) - event = MailProcessEvent(bot, user) - status = event.validate() - assert not status - - - - @responses.activate - def test_trigger_mail_channel_process_event_enqueue(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - bot = "test_add_schedule_event" - user = "test_user" - url = f"http://localhost:5001/api/events/execute/{EventClass.mail_channel_process_mails}?is_scheduled=False" - responses.add( - "POST", url, - json={"message": "test msg", "success": True, "error_code": 400, "data": None} - ) - event = MailProcessEvent(bot, user) - try: - event.enqueue() - except AppException as e: - pytest.fail(f"Unexpected exception: {e}") - - @responses.activate - def test_trigger_mail_channel_process_event_enqueue_exception(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - from kairon.exceptions import AppException - from unittest.mock import patch - - bot = "test_add_schedule_event" - user = "test_user" - url = f"http://localhost:5001/api/events/execute/{EventClass.mail_channel_process_mails}?is_scheduled=False" - responses.add( - "POST", url, - json={"message": "test msg", "success": False, "error_code": 400, "data": None} - ) - event = MailProcessEvent(bot, user) - with pytest.raises(AppException, match="Failed to trigger email_channel_process_mails event: test msg"): - event.enqueue() - - - @responses.activate - def test_trigger_mail_channel_process_event_execute(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - try: - MailProcessEvent("", "").execute() - except AppException as e: - pytest.fail(f"Unexpected exception: {e}") - - @responses.activate - def test_trigger_mail_channel_process_event_execute_exception(self): - from kairon.events.definitions.mail_channel import MailProcessEvent - from kairon.exceptions import AppException - from unittest.mock import patch - - with patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task", - side_effect=Exception("Test")): - with pytest.raises(AppException, match="Test"): - MailProcessEvent("", "").execute(mails=["test@mail.com"]) - @responses.activate def test_mail_channel_read_event_enqueue(self): from kairon.events.definitions.mail_channel import MailReadEvent @@ -1300,26 +1212,32 @@ def test_mail_channel_read_event_enqueue(self): ) event = MailReadEvent(bot, user) try: - event.enqueue() + with patch('kairon.shared.channels.mail.processor.MailProcessor.__init__', return_value=None) as mp: + with patch('kairon.shared.channels.mail.processor.MailProcessor.login_smtp', + return_value=None) as mock_login: + with patch('kairon.shared.channels.mail.processor.MailProcessor.logout_smtp', + return_value=None) as mock_logout: + with patch('kairon.shared.channels.mail.processor.MailProcessor.login_imap', + return_value=None) as mock_login_imap: + with patch('kairon.shared.channels.mail.processor.MailProcessor.logout_imap', + return_value=None) as mock_logout_imap: + event.enqueue() except AppException as e: pytest.fail(f"Unexpected exception: {e}") @patch('kairon.shared.channels.mail.processor.MailProcessor.read_mails') - @patch('kairon.events.definitions.mail_channel.MailProcessEvent.enqueue') - @patch('kairon.events.definitions.mail_channel.MailProcessEvent.validate') - def test_mail_read_event_execute(self, mock_validate, mock_enqueue, mock_read_mails): + @patch('kairon.events.definitions.mail_channel.MailProcessor.process_message_task') + def test_mail_read_event_execute(self, mock_process_message_task, mock_read_mails): from kairon.events.definitions.mail_channel import MailReadEvent bot = "test_add_schedule_event" user = "test_user" - mock_read_mails.return_value = (["test@mail.com"], user, 10) - mock_validate.return_value = True + mock_read_mails.return_value = (["test@mail.com"], user) event = MailReadEvent(bot, user) event.execute() mock_read_mails.assert_called_once_with(bot) - mock_validate.assert_called_once() - mock_enqueue.assert_called_once_with(mails=["test@mail.com"]) + mock_process_message_task.assert_called_once_with('test_add_schedule_event', ["test@mail.com"]) def test_mail_read_event_execute_exception(self): bot = "test_add_schedule_event"