Skip to content

Commit

Permalink
Mail channel filter (#1651)
Browse files Browse the repository at this point in the history
* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

---------

Co-authored-by: spandan_mondal <[email protected]>
  • Loading branch information
hasinaxp and spandan_mondal authored Dec 12, 2024
1 parent 3586fe9 commit fc6a5e9
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 20 deletions.
8 changes: 7 additions & 1 deletion kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,10 @@ def dispatch_scheduled_event(event_id: Text = Path(description="Event id")):
@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id")):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
return {"data": None, "message": "Mail scheduler epoch request!"}


@app.get('/api/mail/stop/{bot}', response_model=Response)
def stop_mail_reading(bot: Text = Path(description="Bot id")):
EventUtility.stop_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler stopped!"}
16 changes: 14 additions & 2 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.kscheduler import KScheduler
from kairon.exceptions import AppException
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import EventClass, ChannelTypes
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import TASK_TYPE
from loguru import logger

Expand Down Expand Up @@ -60,3 +59,16 @@ def schedule_channel_mail_reading(bot: str):
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")

@staticmethod
def stop_channel_mail_reading(bot: str):
from kairon.shared.channels.mail.processor import MailProcessor

try:
mail_processor = MailProcessor(bot)
event_id = mail_processor.state.event_id
if event_id:
mail_processor.update_event_id(None)
KScheduler().delete_job(event_id)
except Exception as e:
raise AppException(f"Failed to stop mail reading for bot {bot}. Error: {str(e)}")
78 changes: 67 additions & 11 deletions kairon/shared/channels/mail/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from loguru import logger
from pydantic.schema import timedelta
from pydantic.validators import datetime
from imap_tools import MailBox, AND
from imap_tools import MailBox, AND, OR, NOT

from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.account.data_objects import Bot
from kairon.shared.channels.mail.constants import MailConstants
Expand Down Expand Up @@ -35,8 +37,9 @@ def update_event_id(self, event_id):
self.state.event_id = event_id
self.state.save()


@staticmethod
def get_mail_channel_state_data(bot):
def get_mail_channel_state_data(bot:str):
"""
Get mail channel state data
"""
Expand Down Expand Up @@ -227,6 +230,57 @@ def process_message_task(bot: str, message_batch: [dict]):
"""
asyncio.run(MailProcessor.process_messages(bot, message_batch))

def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None,
read_status="all"):
"""
Generate IMAP criteria for fetching emails.
Args:
subjects (list[str], optional): List of subjects to include.
ignore_subjects (list[str], optional): List of subjects to exclude.
from_addresses (list[str], optional): List of senders to include.
ignore_from (list[str], optional): List of senders to exclude.
read_status (str): Read status filter ('all', 'seen', 'unseen').
Returns:
imap_tools.query.AND: IMAP criteria object.
"""
criteria = []

if read_status.lower() == "seen":
criteria.append(AND(seen=True))
elif read_status.lower() == "unseen":
criteria.append(AND(seen=False))

if subjects:
criteria.append(OR(subject=subjects))

if ignore_subjects:
for subject in ignore_subjects:
criteria.append(NOT(AND(subject=subject)))

if from_addresses:
criteria.append(OR(from_=from_addresses))

if ignore_from:
for sender in ignore_from:
criteria.append(NOT(AND(from_=sender)))

last_processed_uid = self.state.last_email_uid
base_read_criteria = None
if last_processed_uid == 0:
time_shift = int(self.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
base_read_criteria = AND(date_gte=last_read_timestamp.date())
else:
query = f'{int(last_processed_uid) + 1}:*'
base_read_criteria = AND(uid=query)

criteria.append(base_read_criteria)

# Combine all criteria with AND
return AND(*criteria)


@staticmethod
def read_mails(bot: str) -> ([dict], str):
Expand All @@ -253,15 +307,17 @@ def read_mails(bot: str) -> ([dict], str):
try:
mp.login_imap()
is_logged_in = True
msgs = []
if last_processed_uid == 0:
time_shift = int(mp.config.get('interval', 20 * 60))
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
else:
query = f'{int(last_processed_uid) + 1}:*'
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
for msg in msgs:
subject = mp.config.get('subjects', "")
subject = Utility.string_to_list(subject)
ignore_subject = mp.config.get('ignore_subjects', "")
ignore_subject = Utility.string_to_list(ignore_subject)
from_addresses = mp.config.get('from_emails', "")
from_addresses = Utility.string_to_list(from_addresses)
ignore_from = mp.config.get('ignore_from_emails', "")
ignore_from = Utility.string_to_list(ignore_from)
read_status = mp.config.get('seen_status', 'all')
criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status)
for msg in mp.mailbox.fetch(criteria, mark_seen=False):
if int(msg.uid) <= last_processed_uid:
continue
last_processed_uid = int(msg.uid)
Expand Down
21 changes: 19 additions & 2 deletions kairon/shared/channels/mail/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from urllib.parse import urljoin


from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import ChannelTypes


class MailScheduler:
Expand All @@ -28,6 +28,23 @@ def request_epoch(bot: str):
if not resp['success']:
raise AppException("Failed to request email channel epoch")

@staticmethod
def request_stop(bot: str):
event_server_url = Utility.get_event_server_url()
if Utility.is_exist(Channels, raise_error=False, bot=bot, connector_type=ChannelTypes.MAIL.value):
resp = Utility.execute_http_request(
"GET",
urljoin(
event_server_url,
f"/api/mail/stop/{bot}",
),
err_msg="Failed to request epoch",
)
if not resp['success']:
raise AppException("Failed to stop email channel reading")
else:
raise AppException("Mail channel does not exist")




Expand Down
5 changes: 5 additions & 0 deletions kairon/shared/chat/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def delete_channel_config(bot: Text, **kwargs):
:return: None
"""
kwargs.update({"bot": bot})
try:
from kairon.shared.channels.mail.scheduler import MailScheduler
MailScheduler.request_stop(bot)
except Exception as e:
logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}")
Utility.hard_delete_document([Channels], **kwargs)

@staticmethod
Expand Down
12 changes: 11 additions & 1 deletion kairon/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2223,6 +2223,15 @@ def delete_documents(document: Document, user: str = "System"):
document["user"] = user
document.delete()

@staticmethod
def string_to_list(comma_sep_string: str, delimilter: str = ",") -> List[str]:
"""
Convert comma separated string to list
"""
if not comma_sep_string:
return []
return [item.strip() for item in comma_sep_string.split(delimilter) if item.strip()]


class StoryValidator:
@staticmethod
Expand Down Expand Up @@ -2755,4 +2764,5 @@ def __handle_member_left_bot(**kwargs):
body = body.replace("MEMBER_NAME", user_name)
body = body.replace("BOT_NAME", bot_name)
subject = f"Notification: {user_name} has left the {bot_name} bot"
return body, subject
return body, subject

5 changes: 5 additions & 0 deletions metadata/integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ channels:
optional_fields:
- interval
- intent
- subjects
- ignore_subjects
- from_emails
- ignore_from_emails
- seen_status

actions:
pipedrive:
Expand Down
54 changes: 52 additions & 2 deletions tests/unit_test/channels/mail_channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest.mock import patch, MagicMock

import pytest
from imap_tools import MailMessage
from imap_tools import MailMessage, AND

from mongoengine import connect, disconnect
from uuid6 import uuid7
Expand Down Expand Up @@ -276,6 +276,53 @@ def test_process_mail(self, mock_get_channel_config):
assert result == "Hello John Doe, How can I help you today?"



@patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config")
@pytest.mark.asyncio
async def test_generate_criteria(self, mock_get_channel_config):
bot_id = pytest.mail_test_bot
mock_get_channel_config.return_value = {
'config': {
'email_account': "[email protected]",
'email_password': "password",
'imap_server': "imap.testuser.com",
}
}

mp = MailProcessor(bot=bot_id)
mp.state.last_email_uid = 123
#seen
criteria = mp.generate_criteria(read_status="seen")
print(criteria)
assert criteria == '((SEEN) (UID 124:*))'

#unseen
criteria = mp.generate_criteria(read_status="unseen")
assert criteria == '((UNSEEN) (UID 124:*))'

#default
criteria = mp.generate_criteria()
assert criteria == '((UID 124:*))'

#subjects
criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"])
assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))'

#from
criteria = mp.generate_criteria(from_addresses=["info", "[email protected]", "[email protected]"])
assert criteria == '((OR OR FROM "[email protected]" FROM "[email protected]" FROM "info") (UID 124:*))'

#mix
criteria = mp.generate_criteria(read_status="unseen",
subjects=["Test Subject", "another test subject", "happy"],
ignore_subjects=['cat'],
ignore_from=["info", "nomreply"],
from_addresses=["@digite.com", "@nimblework.com"])

assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))'



@patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap")
@patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task")
@patch("kairon.shared.channels.mail.processor.MailBox")
Expand Down Expand Up @@ -538,4 +585,7 @@ def test_get_log_exception(self):
BotSettings.objects(user="mail_channel_test_user_acc").delete()
Bot.objects(user="mail_channel_test_user_acc").delete()
Account.objects(user="mail_channel_test_user_acc").delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()



53 changes: 53 additions & 0 deletions tests/unit_test/channels/mail_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,56 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr
EventUtility.schedule_channel_mail_reading(bot)
assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception"

@patch('kairon.events.utility.KScheduler.delete_job')
@patch('kairon.events.utility.KScheduler.__init__', return_value=None)
@patch('kairon.shared.channels.mail.processor.MailProcessor')
@patch('pymongo.MongoClient', autospec=True)
def test_stop_channel_mail_reading(mock_mongo, mock_mail_processor, mock_kscheduler, mock_delete_job):
from kairon.events.utility import EventUtility

bot = "test_bot"
mock_mail_processor_instance = mock_mail_processor.return_value
mock_mail_processor_instance.config = {"interval": 1}
mock_mail_processor_instance.state.event_id = 'existing_event_id'
mock_mail_processor_instance.bot_settings.user = "test_user"

# # Test case when event_id is None
EventUtility.stop_channel_mail_reading(bot)
mock_delete_job.assert_called_once()

@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': True}
mock_imp.return_value = True
bot = "test_bot"
try:
MailScheduler.request_stop(bot)
except AppException:
pytest.fail("request_epoch() raised AppException unexpectedly!")


@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_response_not_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': False}
mock_imp.return_value = True
bot = "test_bot"
with pytest.raises(AppException):
MailScheduler.request_stop(bot)

@patch('kairon.shared.utils.Utility.is_exist')
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
def test_request_stop_no_channel_exist_exception(mock_execute_http_request, mock_get_event_server_url, mock_imp):
mock_get_event_server_url.return_value = "http://localhost"
mock_execute_http_request.return_value = {'success': True}
mock_imp.return_value = False
bot = "test_bot"
with pytest.raises(AppException):
MailScheduler.request_stop(bot)

29 changes: 28 additions & 1 deletion tests/unit_test/utility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3305,4 +3305,31 @@ async def test_handle_first_name_member_update_email_template(self, validate_and
)
validate_and_send_mail_mock.assert_called_once_with(
email, expected_subject, expected_body
)
)


def test_comma_sep_string_to_list(self):
# Test input with empty elements
assert Utility.string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"]

# Test input with only commas
assert Utility.string_to_list(",,,") == []

# Test input with no commas (single element)
assert Utility.string_to_list("apple") == ["apple"]

# Test input with empty string
assert Utility.string_to_list("") == []

# Test input with None
assert Utility.string_to_list(None) == []

# Test input with special characters
assert Utility.string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"]

# Test input with numeric values
assert Utility.string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"]

# Test input with spaces
assert Utility.string_to_list("apple, banana, orange") == ["apple", "banana", "orange"]

0 comments on commit fc6a5e9

Please sign in to comment.