Skip to content

Commit

Permalink
mail read schedule stop mechanism, filters and their related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
spandan_mondal committed Dec 12, 2024
1 parent ebcdd78 commit 4116ea2
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 18 deletions.
8 changes: 7 additions & 1 deletion kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,10 @@ def dispatch_scheduled_event(event_id: Text = Path(description="Event id")):
@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id")):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
return {"data": None, "message": "Mail scheduler epoch request!"}


@app.get('/api/mail/stop/{bot}', response_model=Response)
def stop_mail_reading(bot: Text = Path(description="Bot id")):
EventUtility.stop_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler stopped!"}
16 changes: 14 additions & 2 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.kscheduler import KScheduler
from kairon.exceptions import AppException
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import EventClass, ChannelTypes
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import TASK_TYPE
from loguru import logger

Expand Down Expand Up @@ -60,3 +59,16 @@ def schedule_channel_mail_reading(bot: str):
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")

@staticmethod
def stop_channel_mail_reading(bot: str):
from kairon.shared.channels.mail.processor import MailProcessor

try:
mail_processor = MailProcessor(bot)
event_id = mail_processor.state.event_id
if event_id:
mail_processor.update_event_id(None)
KScheduler().delete_job(event_id)
except Exception as e:
raise logger.error(f"Failed to stop mail reading for bot {bot}. Error: {str(e)}")
95 changes: 84 additions & 11 deletions kairon/shared/channels/mail/processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import asyncio
import time
from typing import List

from loguru import logger
from pydantic.schema import timedelta
from pydantic.validators import datetime
from imap_tools import MailBox, AND
from imap_tools import MailBox, AND, OR, NOT
from kairon.exceptions import AppException
from kairon.shared.account.data_objects import Bot
from kairon.shared.channels.mail.constants import MailConstants
from kairon.shared.channels.mail.data_objects import MailResponseLog, MailStatus, MailChannelStateData
from kairon.shared.chat.data_objects import Channels
from kairon.shared.chat.processor import ChatDataProcessor
from kairon.shared.constants import ChannelTypes
from kairon.shared.data.data_objects import BotSettings
Expand All @@ -35,8 +37,18 @@ def update_event_id(self, event_id):
self.state.event_id = event_id
self.state.save()


@staticmethod
def does_mail_channel_exist(bot:str):
"""
Check if mail channel exists
"""
if Channels.objects(bot=bot, connector_type=ChannelTypes.MAIL.value).first():
return True
return False

@staticmethod
def get_mail_channel_state_data(bot):
def get_mail_channel_state_data(bot:str):
"""
Get mail channel state data
"""
Expand Down Expand Up @@ -227,6 +239,65 @@ def process_message_task(bot: str, message_batch: [dict]):
"""
asyncio.run(MailProcessor.process_messages(bot, message_batch))

def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None,
read_status="all"):
"""
Generate IMAP criteria for fetching emails.
Args:
subjects (list[str], optional): List of subjects to include.
ignore_subjects (list[str], optional): List of subjects to exclude.
from_addresses (list[str], optional): List of senders to include.
ignore_from (list[str], optional): List of senders to exclude.
read_status (str): Read status filter ('all', 'seen', 'unseen').
Returns:
imap_tools.query.AND: IMAP criteria object.
"""
criteria = []

if read_status.lower() == "seen":
criteria.append(AND(seen=True))
elif read_status.lower() == "unseen":
criteria.append(AND(seen=False))

if subjects:
criteria.append(OR(subject=subjects))

if ignore_subjects:
for subject in ignore_subjects:
criteria.append(NOT(AND(subject=subject)))

if from_addresses:
criteria.append(OR(from_=from_addresses))

if ignore_from:
for sender in ignore_from:
criteria.append(NOT(AND(from_=sender)))

last_processed_uid = self.state.last_email_uid
base_read_criteria = None
if last_processed_uid == 0:
time_shift = int(self.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
base_read_criteria = AND(date_gte=last_read_timestamp.date())
else:
query = f'{int(last_processed_uid) + 1}:*'
base_read_criteria = AND(uid=query)

criteria.append(base_read_criteria)

# Combine all criteria with AND
return AND(*criteria)

@staticmethod
def comma_sep_string_to_list(comma_sep_string: str) -> List[str]:
"""
Convert comma separated string to list
"""
if not comma_sep_string:
return []
return [item.strip() for item in comma_sep_string.split(",") if item.strip()]

@staticmethod
def read_mails(bot: str) -> ([dict], str):
Expand All @@ -253,15 +324,17 @@ def read_mails(bot: str) -> ([dict], str):
try:
mp.login_imap()
is_logged_in = True
msgs = []
if last_processed_uid == 0:
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
else:
query = f'{int(last_processed_uid) + 1}:*'
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
for msg in msgs:
subject = mp.config.get('subjects', "")
subject = MailProcessor.comma_sep_string_to_list(subject)
ignore_subject = mp.config.get('ignore_subjects', "")
ignore_subject = MailProcessor.comma_sep_string_to_list(ignore_subject)
from_addresses = mp.config.get('from_emails', "")
from_addresses = MailProcessor.comma_sep_string_to_list(from_addresses)
ignore_from = mp.config.get('ignore_from_emails', "")
ignore_from = MailProcessor.comma_sep_string_to_list(ignore_from)
read_status = mp.config.get('seen_status', 'all')
criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status)
for msg in mp.mailbox.fetch(criteria, mark_seen=False):
if int(msg.uid) <= last_processed_uid:
continue
last_processed_uid = int(msg.uid)
Expand Down
19 changes: 17 additions & 2 deletions kairon/shared/channels/mail/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from urllib.parse import urljoin


from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.channels.mail.processor import MailProcessor
Expand Down Expand Up @@ -28,6 +26,23 @@ def request_epoch(bot: str):
if not resp['success']:
raise AppException("Failed to request email channel epoch")

@staticmethod
def request_stop(bot: str):
event_server_url = Utility.get_event_server_url()
if MailProcessor.does_mail_channel_exist(bot):
resp = Utility.execute_http_request(
"GET",
urljoin(
event_server_url,
f"/api/mail/stop/{bot}",
),
err_msg="Failed to request epoch",
)
if not resp['success']:
raise AppException("Failed to stop email channel reading")
else:
raise AppException("Mail channel does not exist")




Expand Down
6 changes: 6 additions & 0 deletions kairon/shared/chat/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def delete_channel_config(bot: Text, **kwargs):
:return: None
"""
kwargs.update({"bot": bot})
if kwargs.get("connector_type") != ChannelTypes.SLACK.value:
try:
from kairon.shared.channels.mail.scheduler import MailScheduler
MailScheduler.request_stop(bot)
except Exception as e:
logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}")
Utility.hard_delete_document([Channels], **kwargs)

@staticmethod
Expand Down
5 changes: 5 additions & 0 deletions metadata/integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ channels:
optional_fields:
- interval
- intent
- subjects
- ignore_subjects
- from_emails
- ignore_from_emails
- seen_status

actions:
pipedrive:
Expand Down
89 changes: 87 additions & 2 deletions tests/unit_test/channels/mail_channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest.mock import patch, MagicMock

import pytest
from imap_tools import MailMessage
from imap_tools import MailMessage, AND

from mongoengine import connect, disconnect
from uuid6 import uuid7
Expand Down Expand Up @@ -276,6 +276,53 @@ def test_process_mail(self, mock_get_channel_config):
assert result == "Hello John Doe, How can I help you today?"



@patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config")
@pytest.mark.asyncio
async def test_generate_criteria(self, mock_get_channel_config):
bot_id = pytest.mail_test_bot
mock_get_channel_config.return_value = {
'config': {
'email_account': "[email protected]",
'email_password': "password",
'imap_server': "imap.testuser.com",
}
}

mp = MailProcessor(bot=bot_id)
mp.state.last_email_uid = 123
#seen
criteria = mp.generate_criteria(read_status="seen")
print(criteria)
assert criteria == '((SEEN) (UID 124:*))'

#unseen
criteria = mp.generate_criteria(read_status="unseen")
assert criteria == '((UNSEEN) (UID 124:*))'

#default
criteria = mp.generate_criteria()
assert criteria == '((UID 124:*))'

#subjects
criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"])
assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))'

#from
criteria = mp.generate_criteria(from_addresses=["info", "[email protected]", "[email protected]"])
assert criteria == '((OR OR FROM "[email protected]" FROM "[email protected]" FROM "info") (UID 124:*))'

#mix
criteria = mp.generate_criteria(read_status="unseen",
subjects=["Test Subject", "another test subject", "happy"],
ignore_subjects=['cat'],
ignore_from=["info", "nomreply"],
from_addresses=["@digite.com", "@nimblework.com"])

assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))'



@patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap")
@patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task")
@patch("kairon.shared.channels.mail.processor.MailBox")
Expand Down Expand Up @@ -538,4 +585,42 @@ def test_get_log_exception(self):
BotSettings.objects(user="mail_channel_test_user_acc").delete()
Bot.objects(user="mail_channel_test_user_acc").delete()
Account.objects(user="mail_channel_test_user_acc").delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()


def test_comma_sep_string_to_list(self):
# Test input with empty elements
assert MailProcessor.comma_sep_string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"]

# Test input with only commas
assert MailProcessor.comma_sep_string_to_list(",,,") == []

# Test input with no commas (single element)
assert MailProcessor.comma_sep_string_to_list("apple") == ["apple"]

# Test input with empty string
assert MailProcessor.comma_sep_string_to_list("") == []

# Test input with None
assert MailProcessor.comma_sep_string_to_list(None) == []

# Test input with special characters
assert MailProcessor.comma_sep_string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"]

# Test input with numeric values
assert MailProcessor.comma_sep_string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"]

# Test input with spaces
assert MailProcessor.comma_sep_string_to_list("apple, banana, orange") == ["apple", "banana", "orange"]


@patch('kairon.shared.channels.mail.processor.Channels')
def test_mail_channel_exist(self, mock_channels):
mock_channels.objects.return_value.first.return_value = True
assert MailProcessor.does_mail_channel_exist("test_bot") is True

@patch('kairon.shared.channels.mail.processor.Channels')
def test_mail_channel_not_exist(self, mock_channels):
mock_channels.objects.return_value.first.return_value = False
assert MailProcessor.does_mail_channel_exist("test_bot") is False

Loading

0 comments on commit 4116ea2

Please sign in to comment.