Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes - #1635

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kairon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

def create_argument_parser():
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\
message_broadcast,content_importer, mail_channel_process, mail_channel_read
message_broadcast,content_importer, mail_channel_read

parser = ArgumentParser(
prog="kairon",
Expand All @@ -63,7 +63,6 @@ def create_argument_parser():
delete_logs.add_subparser(subparsers, parents=parent_parsers)
message_broadcast.add_subparser(subparsers, parents=parent_parsers)
content_importer.add_subparser(subparsers, parents=parent_parsers)
mail_channel_process.add_subparser(subparsers, parents=parent_parsers)
mail_channel_read.add_subparser(subparsers, parents=parent_parsers)
return parser

Expand Down
36 changes: 0 additions & 36 deletions kairon/cli/mail_channel_process.py

This file was deleted.

3 changes: 1 addition & 2 deletions kairon/events/definitions/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
from kairon.events.definitions.history_delete import DeleteHistoryEvent
from kairon.events.definitions.mail_channel import MailProcessEvent, MailReadEvent
from kairon.events.definitions.mail_channel import MailReadEvent
from kairon.events.definitions.message_broadcast import MessageBroadcastEvent
from kairon.events.definitions.model_testing import ModelTestingEvent
from kairon.events.definitions.model_training import ModelTrainingEvent
Expand All @@ -23,7 +23,6 @@ class EventFactory:
EventClass.message_broadcast: MessageBroadcastEvent,
EventClass.content_importer: DocContentImporterEvent,
EventClass.mail_channel_read_mails: MailReadEvent,
EventClass.mail_channel_process_mails: MailProcessEvent
}

@staticmethod
Expand Down
63 changes: 9 additions & 54 deletions kairon/events/definitions/mail_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,11 @@
from kairon import Utility
from kairon.events.definitions.base import EventsBase
from kairon.exceptions import AppException
from kairon.shared.channels.mail.constants import MailConstants
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.constants import EventClass


class MailProcessEvent(EventsBase):
"""
Event to start mail channel scheduler if not already running.
"""

def __init__(self, bot: Text, user: Text, **kwargs):
"""
Initialise event.
"""
self.bot = bot
self.user = user

def validate(self):
"""
validate mail channel exists
"""
return MailProcessor.validate_smtp_connection(self.bot)


def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
mails: list = kwargs.get('mails', [])
payload = {'bot': self.bot, 'user': self.user, 'mails': mails}
Utility.request_event_server(EventClass.mail_channel_process_mails, payload)
except Exception as e:
logger.error(str(e))
raise AppException(e)

def execute(self, **kwargs):
"""
Execute the event.
"""
try:
mails = kwargs.get('mails')
if mails:
MailProcessor.process_message_task(self.bot, mails)
except Exception as e:
logger.error(str(e))
raise AppException(e)



class MailReadEvent(EventsBase):
"""
Event to read mails from mail channel and create events for each mail tp process them via bot
Expand All @@ -66,16 +22,17 @@ def __init__(self, bot: Text, user: Text, **kwargs):

def validate(self):
"""
validate mail channel exists
validate mail channel exists and works properly
"""
return MailProcessor.validate_imap_connection(self.bot)
return MailProcessor.validate_imap_connection(self.bot) and MailProcessor.validate_imap_connection(self.bot)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate validation call

The validate method contains a duplicate call to validate_imap_connection:

-        return MailProcessor.validate_imap_connection(self.bot) and MailProcessor.validate_imap_connection(self.bot)
+        return MailProcessor.validate_imap_connection(self.bot)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return MailProcessor.validate_imap_connection(self.bot) and MailProcessor.validate_imap_connection(self.bot)
return MailProcessor.validate_imap_connection(self.bot)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to smtp and imap validation


def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
payload = {'bot': self.bot, 'user': self.user}
self.validate()
Utility.request_event_server(EventClass.mail_channel_read_mails, payload)
except Exception as e:
logger.error(str(e))
Expand All @@ -87,12 +44,10 @@ def execute(self, **kwargs):
"""
try:
vals = MailProcessor.read_mails(self.bot)
print(vals)
emails, _, _ = vals
for email in emails:
ev = MailProcessEvent(self.bot, self.user)
ev.validate()
ev.enqueue(mails=[email])

emails, _ = vals
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra veriable vals (line)

batch_size = MailConstants.PROCESS_MESSAGE_BATCH_SIZE
for i in range(0, len(emails), batch_size):
batch = emails[i:i + batch_size]
MailProcessor.process_message_task(self.bot, batch)
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}")
1 change: 0 additions & 1 deletion kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async def lifespan(app: FastAPI):
""" MongoDB is connected on the bot trainer startup """
config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"])
connect(**config)
EventUtility.reschedule_all_bots_channel_mail_reading()
yield
disconnect()

Expand Down
10 changes: 0 additions & 10 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,3 @@ def schedule_channel_mail_reading(bot: str):
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")

@staticmethod
def reschedule_all_bots_channel_mail_reading():
try:
bots = list(Channels.objects(connector_type= ChannelTypes.MAIL.value).distinct("bot"))
for bot in bots:
logger.info(f"Rescheduling mail reading for bot {bot}")
EventUtility.schedule_channel_mail_reading(bot)
except Exception as e:
raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}")
2 changes: 1 addition & 1 deletion kairon/shared/channels/mail/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ class MailConstants:

DEFAULT_TEMPLATE = "<p>{bot_response}</p> <br/><br/><span style='color:#999;'> Generated by kAIron AI.</span>\n"

PROCESS_MESSAGE_BATCH_SIZE = 4
PROCESS_MESSAGE_BATCH_SIZE = 8

3 changes: 1 addition & 2 deletions kairon/shared/channels/mail/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class MailResponseLog(Auditlog):
Mail response log
"""
sender_id = StringField(required=True)
subject = StringField()
body = StringField()
uid = IntField(required=True)
responses = ListField()
slots = DictField()
bot = StringField(required=True)
Expand Down
21 changes: 12 additions & 9 deletions kairon/shared/channels/mail/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def process_message_task(bot: str, message_batch: [dict]):


@staticmethod
def read_mails(bot: str) -> ([dict], str, int):
def read_mails(bot: str) -> ([dict], str):
"""
Read mails from the mailbox
Parameters:
Expand All @@ -247,16 +247,20 @@ def read_mails(bot: str) -> ([dict], str, int):
"""
logger.info(f"reading mails for {bot}")
mp = MailProcessor(bot)
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
messages = []
is_logged_in = False
last_processed_uid = mp.state.last_email_uid
query = f'{int(last_processed_uid) + 1}:*'
try:
mp.login_imap()
is_logged_in = True
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date(), uid=query), mark_seen=False)
msgs = []
if last_processed_uid == 0:
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
else:
query = f'{int(last_processed_uid) + 1}:*'
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
for msg in msgs:
if int(msg.uid) <= last_processed_uid:
continue
Expand All @@ -267,8 +271,7 @@ def read_mails(bot: str) -> ([dict], str, int):
body = msg.text or msg.html or ""
#attachments = msg.attachments
mail_log = MailResponseLog(sender_id = sender_id,
subject = subject,
body = body,
uid = last_processed_uid,
bot = bot,
user = mp.bot_settings.user,
status=MailStatus.Processing.value,
Expand All @@ -288,11 +291,11 @@ def read_mails(bot: str) -> ([dict], str, int):
mp.state.save()

is_logged_in = False
return messages, mp.bot_settings.user, time_shift
return messages, mp.bot_settings.user
except Exception as e:
logger.exception(e)
if is_logged_in:
mp.logout_imap()
return [], mp.bot_settings.user, time_shift
return [], mp.bot_settings.user


1 change: 0 additions & 1 deletion kairon/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class EventClass(str, Enum):
web_search = "web_search"
scheduler_evaluator = "scheduler_evaluator"
content_importer = "content_importer"
mail_channel_process_mails = "email_channel_process_mails"
mail_channel_read_mails = "email_channel_read_mails"


Expand Down
1 change: 0 additions & 1 deletion system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ events:
scheduler_evaluator: ${SCHEDULER_TASK_DEFINITION}
content_importer: ${DOC_CONTENT_IMPORTER_TASK_DEFINITION}
email_channel_read_mails: ${MAIL_CHANNEL_READ_TASK_DEFINITION}
email_channel_process_mails: ${MAIL_CHANNEL_PROCESS_TASK_DEFINITION}
scheduler:
collection: ${EVENT_SCHEDULER_COLLECTION:"kscheduler"}
type: ${EVENT_SCHEDULER_TYPE:"kscheduler"}
Expand Down
18 changes: 6 additions & 12 deletions tests/unit_test/channels/mail_channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ async def test_send_mail(self, mock_get_channel_config, mock_smtp):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()
mail_response_log.save()
Expand Down Expand Up @@ -210,8 +209,7 @@ async def test_send_mail_exception(self, mock_get_channel_config, mock_smtp):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down Expand Up @@ -253,8 +251,7 @@ def test_process_mail(self, mock_get_channel_config):
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down Expand Up @@ -311,15 +308,14 @@ async def test_read_mails(self, mock_get_channel_config,
mock_mailbox_instance.login.return_value = mock_mailbox_instance
mock_mailbox_instance.fetch.return_value = [mock_mail_message]

mails, user, time_shift = MailProcessor.read_mails(bot_id)
mails, user = MailProcessor.read_mails(bot_id)
print(mails)
assert len(mails) == 1
assert mails[0]["subject"] == "Test Subject"
assert mails[0]["mail_id"] == "[email protected]"
assert mails[0]["date"] == "2023-10-10"
assert mails[0]["body"] == "Test Body"
assert user == 'mail_channel_test_user_acc'
assert time_shift == 1200



Expand Down Expand Up @@ -349,10 +345,9 @@ async def test_read_mails_no_messages(self, mock_get_channel_config,
mock_mailbox_instance.login.return_value = mock_mailbox_instance
mock_mailbox_instance.fetch.return_value = []

mails, user, time_shift = MailProcessor.read_mails(bot_id)
mails, user = MailProcessor.read_mails(bot_id)
assert len(mails) == 0
assert user == 'mail_channel_test_user_acc'
assert time_shift == 1200

mock_logout_imap.assert_called_once()

Expand All @@ -369,8 +364,7 @@ async def test_process_messages(self, mock_process_messages_via_bot, mock_send_m
mail_response_log = MailResponseLog(bot=pytest.mail_test_bot,
sender_id="[email protected]",
user="mail_channel_test_user_acc",
subject="Test Subject",
body="Test Body",
uid=123
)
mail_response_log.save()

Expand Down
29 changes: 0 additions & 29 deletions tests/unit_test/channels/mail_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,3 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr
EventUtility.schedule_channel_mail_reading(bot)
assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception"


@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading')
@patch('kairon.shared.chat.data_objects.Channels.objects')
def test_reschedule_all_bots_channel_mail_reading(mock_channels_objects, mock_schedule_channel_mail_reading):
from kairon.events.utility import EventUtility

mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2']

EventUtility.reschedule_all_bots_channel_mail_reading()

mock_channels_objects.return_value.distinct.assert_called_once_with("bot")
assert mock_schedule_channel_mail_reading.call_count == 2
mock_schedule_channel_mail_reading.assert_any_call('bot1')
mock_schedule_channel_mail_reading.assert_any_call('bot2')

@patch('kairon.events.utility.EventUtility.schedule_channel_mail_reading')
@patch('kairon.shared.chat.data_objects.Channels.objects')
def test_reschedule_all_bots_channel_mail_reading_exception(mock_channels_objects, mock_schedule_channel_mail_reading):
from kairon.events.utility import EventUtility

mock_channels_objects.return_value.distinct.return_value = ['bot1', 'bot2']
mock_schedule_channel_mail_reading.side_effect = Exception("Test Exception")

with pytest.raises(AppException) as excinfo:
EventUtility.reschedule_all_bots_channel_mail_reading()

assert str(excinfo.value) == "Failed to reschedule mail reading events. Error: Test Exception"
mock_channels_objects.return_value.distinct.assert_called_once_with("bot")
assert mock_schedule_channel_mail_reading.call_count == 1
Loading
Loading