Skip to content

Commit

Permalink
changes -
Browse files Browse the repository at this point in the history
removed process action event
updated mail log
removed reschedule logic
  • Loading branch information
spandan_mondal committed Dec 11, 2024
1 parent cbcccc2 commit b52635b
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 282 deletions.
3 changes: 1 addition & 2 deletions kairon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

def create_argument_parser():
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\
message_broadcast,content_importer, mail_channel_process, mail_channel_read
message_broadcast,content_importer, mail_channel_read

parser = ArgumentParser(
prog="kairon",
Expand All @@ -63,7 +63,6 @@ def create_argument_parser():
delete_logs.add_subparser(subparsers, parents=parent_parsers)
message_broadcast.add_subparser(subparsers, parents=parent_parsers)
content_importer.add_subparser(subparsers, parents=parent_parsers)
mail_channel_process.add_subparser(subparsers, parents=parent_parsers)
mail_channel_read.add_subparser(subparsers, parents=parent_parsers)
return parser

Expand Down
36 changes: 0 additions & 36 deletions kairon/cli/mail_channel_process.py

This file was deleted.

3 changes: 1 addition & 2 deletions kairon/events/definitions/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
from kairon.events.definitions.history_delete import DeleteHistoryEvent
from kairon.events.definitions.mail_channel import MailProcessEvent, MailReadEvent
from kairon.events.definitions.mail_channel import MailReadEvent
from kairon.events.definitions.message_broadcast import MessageBroadcastEvent
from kairon.events.definitions.model_testing import ModelTestingEvent
from kairon.events.definitions.model_training import ModelTrainingEvent
Expand All @@ -23,7 +23,6 @@ class EventFactory:
EventClass.message_broadcast: MessageBroadcastEvent,
EventClass.content_importer: DocContentImporterEvent,
EventClass.mail_channel_read_mails: MailReadEvent,
EventClass.mail_channel_process_mails: MailProcessEvent
}

@staticmethod
Expand Down
63 changes: 9 additions & 54 deletions kairon/events/definitions/mail_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,11 @@
from kairon import Utility
from kairon.events.definitions.base import EventsBase
from kairon.exceptions import AppException
from kairon.shared.channels.mail.constants import MailConstants
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.constants import EventClass


class MailProcessEvent(EventsBase):
"""
Event to start mail channel scheduler if not already running.
"""

def __init__(self, bot: Text, user: Text, **kwargs):
"""
Initialise event.
"""
self.bot = bot
self.user = user

def validate(self):
"""
validate mail channel exists
"""
return MailProcessor.validate_smtp_connection(self.bot)


def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
mails: list = kwargs.get('mails', [])
payload = {'bot': self.bot, 'user': self.user, 'mails': mails}
Utility.request_event_server(EventClass.mail_channel_process_mails, payload)
except Exception as e:
logger.error(str(e))
raise AppException(e)

def execute(self, **kwargs):
"""
Execute the event.
"""
try:
mails = kwargs.get('mails')
if mails:
MailProcessor.process_message_task(self.bot, mails)
except Exception as e:
logger.error(str(e))
raise AppException(e)



class MailReadEvent(EventsBase):
"""
Event to read mails from mail channel and create events for each mail tp process them via bot
Expand All @@ -66,16 +22,17 @@ def __init__(self, bot: Text, user: Text, **kwargs):

def validate(self):
"""
validate mail channel exists
validate mail channel exists and works properly
"""
return MailProcessor.validate_imap_connection(self.bot)
return MailProcessor.validate_imap_connection(self.bot) and MailProcessor.validate_imap_connection(self.bot)

def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
payload = {'bot': self.bot, 'user': self.user}
self.validate()
Utility.request_event_server(EventClass.mail_channel_read_mails, payload)
except Exception as e:
logger.error(str(e))
Expand All @@ -87,12 +44,10 @@ def execute(self, **kwargs):
"""
try:
vals = MailProcessor.read_mails(self.bot)
print(vals)
emails, _, _ = vals
for email in emails:
ev = MailProcessEvent(self.bot, self.user)
ev.validate()
ev.enqueue(mails=[email])

emails, _ = vals
batch_size = MailConstants.PROCESS_MESSAGE_BATCH_SIZE
for i in range(0, len(emails), batch_size):
batch = emails[i:i + batch_size]
MailProcessor.process_message_task(self.bot, batch)
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}")
1 change: 0 additions & 1 deletion kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async def lifespan(app: FastAPI):
""" MongoDB is connected on the bot trainer startup """
config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"])
connect(**config)
EventUtility.reschedule_all_bots_channel_mail_reading()
yield
disconnect()

Expand Down
10 changes: 0 additions & 10 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,3 @@ def schedule_channel_mail_reading(bot: str):
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")

@staticmethod
def reschedule_all_bots_channel_mail_reading():
try:
bots = list(Channels.objects(connector_type= ChannelTypes.MAIL.value).distinct("bot"))
for bot in bots:
logger.info(f"Rescheduling mail reading for bot {bot}")
EventUtility.schedule_channel_mail_reading(bot)
except Exception as e:
raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}")
2 changes: 1 addition & 1 deletion kairon/shared/channels/mail/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ class MailConstants:

DEFAULT_TEMPLATE = "<p>{bot_response}</p> <br/><br/><span style='color:#999;'> Generated by kAIron AI.</span>\n"

PROCESS_MESSAGE_BATCH_SIZE = 4
PROCESS_MESSAGE_BATCH_SIZE = 8

3 changes: 1 addition & 2 deletions kairon/shared/channels/mail/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class MailResponseLog(Auditlog):
Mail response log
"""
sender_id = StringField(required=True)
subject = StringField()
body = StringField()
uid = IntField(required=True)
responses = ListField()
slots = DictField()
bot = StringField(required=True)
Expand Down
21 changes: 12 additions & 9 deletions kairon/shared/channels/mail/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def process_message_task(bot: str, message_batch: [dict]):


@staticmethod
def read_mails(bot: str) -> ([dict], str, int):
def read_mails(bot: str) -> ([dict], str):
"""
Read mails from the mailbox
Parameters:
Expand All @@ -247,16 +247,20 @@ def read_mails(bot: str) -> ([dict], str, int):
"""
logger.info(f"reading mails for {bot}")
mp = MailProcessor(bot)
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
messages = []
is_logged_in = False
last_processed_uid = mp.state.last_email_uid
query = f'{int(last_processed_uid) + 1}:*'
try:
mp.login_imap()
is_logged_in = True
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date(), uid=query), mark_seen=False)
msgs = []
if last_processed_uid == 0:
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
else:
query = f'{int(last_processed_uid) + 1}:*'
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
for msg in msgs:
if int(msg.uid) <= last_processed_uid:
continue
Expand All @@ -267,8 +271,7 @@ def read_mails(bot: str) -> ([dict], str, int):
body = msg.text or msg.html or ""
#attachments = msg.attachments
mail_log = MailResponseLog(sender_id = sender_id,
subject = subject,
body = body,
uid = last_processed_uid,
bot = bot,
user = mp.bot_settings.user,
status=MailStatus.Processing.value,
Expand All @@ -288,11 +291,11 @@ def read_mails(bot: str) -> ([dict], str, int):
mp.state.save()

is_logged_in = False
return messages, mp.bot_settings.user, time_shift
return messages, mp.bot_settings.user
except Exception as e:
logger.exception(e)
if is_logged_in:
mp.logout_imap()
return [], mp.bot_settings.user, time_shift
return [], mp.bot_settings.user


1 change: 0 additions & 1 deletion kairon/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class EventClass(str, Enum):
web_search = "web_search"
scheduler_evaluator = "scheduler_evaluator"
content_importer = "content_importer"
mail_channel_process_mails = "email_channel_process_mails"
mail_channel_read_mails = "email_channel_read_mails"


Expand Down
1 change: 0 additions & 1 deletion system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ events:
scheduler_evaluator: ${SCHEDULER_TASK_DEFINITION}
content_importer: ${DOC_CONTENT_IMPORTER_TASK_DEFINITION}
email_channel_read_mails: ${MAIL_CHANNEL_READ_TASK_DEFINITION}
email_channel_process_mails: ${MAIL_CHANNEL_PROCESS_TASK_DEFINITION}
scheduler:
collection: ${EVENT_SCHEDULER_COLLECTION:"kscheduler"}
type: ${EVENT_SCHEDULER_TYPE:"kscheduler"}
Expand Down
18 changes: 6 additions & 12 deletions tests/unit_test/channels/mail_channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ async def test_send_mail(self, mock_get_channel_config, mock_smtp):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()
mail_response_log.save()
Expand Down Expand Up @@ -210,8 +209,7 @@ async def test_send_mail_exception(self, mock_get_channel_config, mock_smtp):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down Expand Up @@ -253,8 +251,7 @@ def test_process_mail(self, mock_get_channel_config):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down Expand Up @@ -311,15 +308,14 @@ async def test_read_mails(self, mock_get_channel_config,
mock_mailbox_instance.login.return_value = mock_mailbox_instance
mock_mailbox_instance.fetch.return_value = [mock_mail_message]

mails, user, time_shift = MailProcessor.read_mails(bot_id)
mails, user = MailProcessor.read_mails(bot_id)
print(mails)
assert len(mails) == 1
assert mails[0]["subject"] == "Test Subject"
assert mails[0]["mail_id"] == "[email protected]"
assert mails[0]["date"] == "2023-10-10"
assert mails[0]["body"] == "Test Body"
assert user == 'mail_channel_test_user_acc'
assert time_shift == 1200



Expand Down Expand Up @@ -349,10 +345,9 @@ async def test_read_mails_no_messages(self, mock_get_channel_config,
mock_mailbox_instance.login.return_value = mock_mailbox_instance
mock_mailbox_instance.fetch.return_value = []

mails, user, time_shift = MailProcessor.read_mails(bot_id)
mails, user = MailProcessor.read_mails(bot_id)
assert len(mails) == 0
assert user == 'mail_channel_test_user_acc'
assert time_shift == 1200

mock_logout_imap.assert_called_once()

Expand All @@ -369,8 +364,7 @@ async def test_process_messages(self, mock_process_messages_via_bot, mock_send_m
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down
29 changes: 0 additions & 29 deletions tests/unit_test/channels/mail_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,3 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr
EventUtility.schedule_channel_mail_reading(bot)
assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception"


@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading')
@patch('kairon.shared.chat.data_objects.Channels.objects')
def test_reschedule_all_bots_channel_mail_reading(mock_channels_objects, mock_schedule_channel_mail_reading):
from kairon.events.utility import EventUtility

mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2']

EventUtility.reschedule_all_bots_channel_mail_reading()

mock_channels_objects.return_value.distinct.assert_called_once_with("bot")
assert mock_schedule_channel_mail_reading.call_count == 2
mock_schedule_channel_mail_reading.assert_any_call('bot1')
mock_schedule_channel_mail_reading.assert_any_call('bot2')

@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading')
@patch('kairon.shared.chat.data_objects.Channels.objects')
def test_reschedule_all_bots_channel_mail_reading_exception(mock_channels_objects, mock_schedule_channel_mail_reading):
from kairon.events.utility import EventUtility

mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2']
mock_schedule_channel_mail_reading.side_effect = Exception("Test Exception")

with pytest.raises(AppException) as excinfo:
EventUtility.reschedule_all_bots_channel_mail_reading()

assert str(excinfo.value) == "Failed to reschedule mail reading events. Error: Test Exception"
mock_channels_objects.return_value.distinct.assert_called_once_with("bot")
assert mock_schedule_channel_mail_reading.call_count == 1
Loading

0 comments on commit b52635b

Please sign in to comment.