-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mail read schedule stop mechanism, filters and their related tests #1645
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,16 @@ | ||
import asyncio | ||
import time | ||
from typing import List | ||
|
||
from loguru import logger | ||
from pydantic.schema import timedelta | ||
from pydantic.validators import datetime | ||
from imap_tools import MailBox, AND | ||
from imap_tools import MailBox, AND, OR, NOT | ||
Comment on lines
7
to
+8
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix incorrect imports of The Apply this diff to correct the imports: -from pydantic.schema import timedelta
-from pydantic.validators import datetime
+from datetime import datetime, timedelta
|
||
from kairon.exceptions import AppException | ||
from kairon.shared.account.data_objects import Bot | ||
from kairon.shared.channels.mail.constants import MailConstants | ||
from kairon.shared.channels.mail.data_objects import MailResponseLog, MailStatus, MailChannelStateData | ||
from kairon.shared.chat.data_objects import Channels | ||
from kairon.shared.chat.processor import ChatDataProcessor | ||
from kairon.shared.constants import ChannelTypes | ||
from kairon.shared.data.data_objects import BotSettings | ||
|
@@ -35,8 +37,18 @@ def update_event_id(self, event_id): | |
self.state.event_id = event_id | ||
self.state.save() | ||
|
||
|
||
@staticmethod | ||
def does_mail_channel_exist(bot:str): | ||
""" | ||
Check if mail channel exists | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use existing function Utility.is_exist() |
||
""" | ||
if Channels.objects(bot=bot, connector_type=ChannelTypes.MAIL.value).first(): | ||
return True | ||
return False | ||
|
||
@staticmethod | ||
def get_mail_channel_state_data(bot): | ||
def get_mail_channel_state_data(bot:str): | ||
""" | ||
Get mail channel state data | ||
""" | ||
|
@@ -227,6 +239,65 @@ def process_message_task(bot: str, message_batch: [dict]): | |
""" | ||
asyncio.run(MailProcessor.process_messages(bot, message_batch)) | ||
|
||
def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None, | ||
read_status="all"): | ||
""" | ||
Generate IMAP criteria for fetching emails. | ||
|
||
Args: | ||
subjects (list[str], optional): List of subjects to include. | ||
ignore_subjects (list[str], optional): List of subjects to exclude. | ||
from_addresses (list[str], optional): List of senders to include. | ||
ignore_from (list[str], optional): List of senders to exclude. | ||
read_status (str): Read status filter ('all', 'seen', 'unseen'). | ||
|
||
Returns: | ||
imap_tools.query.AND: IMAP criteria object. | ||
""" | ||
criteria = [] | ||
|
||
if read_status.lower() == "seen": | ||
criteria.append(AND(seen=True)) | ||
elif read_status.lower() == "unseen": | ||
criteria.append(AND(seen=False)) | ||
|
||
if subjects: | ||
criteria.append(OR(subject=subjects)) | ||
|
||
if ignore_subjects: | ||
for subject in ignore_subjects: | ||
criteria.append(NOT(AND(subject=subject))) | ||
|
||
if from_addresses: | ||
criteria.append(OR(from_=from_addresses)) | ||
|
||
if ignore_from: | ||
for sender in ignore_from: | ||
criteria.append(NOT(AND(from_=sender))) | ||
|
||
last_processed_uid = self.state.last_email_uid | ||
base_read_criteria = None | ||
if last_processed_uid == 0: | ||
time_shift = int(self.config.get('interval', 20 * 60)) | ||
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) | ||
base_read_criteria = AND(date_gte=last_read_timestamp.date()) | ||
else: | ||
query = f'{int(last_processed_uid) + 1}:*' | ||
base_read_criteria = AND(uid=query) | ||
|
||
criteria.append(base_read_criteria) | ||
|
||
# Combine all criteria with AND | ||
return AND(*criteria) | ||
|
||
@staticmethod | ||
def comma_sep_string_to_list(comma_sep_string: str) -> List[str]: | ||
""" | ||
Convert comma separated string to list | ||
""" | ||
if not comma_sep_string: | ||
return [] | ||
return [item.strip() for item in comma_sep_string.split(",") if item.strip()] | ||
|
||
@staticmethod | ||
def read_mails(bot: str) -> ([dict], str): | ||
|
@@ -253,15 +324,17 @@ def read_mails(bot: str) -> ([dict], str): | |
try: | ||
mp.login_imap() | ||
is_logged_in = True | ||
msgs = [] | ||
if last_processed_uid == 0: | ||
time_shift = int(mp.config.get('interval', 20 * 60)) | ||
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) | ||
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False) | ||
else: | ||
query = f'{int(last_processed_uid) + 1}:*' | ||
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False) | ||
for msg in msgs: | ||
subject = mp.config.get('subjects', "") | ||
subject = MailProcessor.comma_sep_string_to_list(subject) | ||
ignore_subject = mp.config.get('ignore_subjects', "") | ||
ignore_subject = MailProcessor.comma_sep_string_to_list(ignore_subject) | ||
from_addresses = mp.config.get('from_emails', "") | ||
from_addresses = MailProcessor.comma_sep_string_to_list(from_addresses) | ||
ignore_from = mp.config.get('ignore_from_emails', "") | ||
ignore_from = MailProcessor.comma_sep_string_to_list(ignore_from) | ||
read_status = mp.config.get('seen_status', 'all') | ||
criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status) | ||
for msg in mp.mailbox.fetch(criteria, mark_seen=False): | ||
if int(msg.uid) <= last_processed_uid: | ||
continue | ||
last_processed_uid = int(msg.uid) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,4 @@ | ||||||||||||||||||||||||||||||||||
from urllib.parse import urljoin | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
from kairon import Utility | ||||||||||||||||||||||||||||||||||
from kairon.exceptions import AppException | ||||||||||||||||||||||||||||||||||
from kairon.shared.channels.mail.processor import MailProcessor | ||||||||||||||||||||||||||||||||||
|
@@ -28,6 +26,23 @@ def request_epoch(bot: str): | |||||||||||||||||||||||||||||||||
if not resp['success']: | ||||||||||||||||||||||||||||||||||
raise AppException("Failed to request email channel epoch") | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||||||||
def request_stop(bot: str): | ||||||||||||||||||||||||||||||||||
event_server_url = Utility.get_event_server_url() | ||||||||||||||||||||||||||||||||||
if MailProcessor.does_mail_channel_exist(bot): | ||||||||||||||||||||||||||||||||||
resp = Utility.execute_http_request( | ||||||||||||||||||||||||||||||||||
"GET", | ||||||||||||||||||||||||||||||||||
urljoin( | ||||||||||||||||||||||||||||||||||
event_server_url, | ||||||||||||||||||||||||||||||||||
f"/api/mail/stop/{bot}", | ||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||
err_msg="Failed to request epoch", | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
Comment on lines
+33
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the error message in The Apply this diff to correct the error message: resp = Utility.execute_http_request(
"GET",
urljoin(
event_server_url,
f"/api/mail/stop/{bot}",
),
- err_msg="Failed to request epoch",
+ err_msg="Failed to stop mail channel",
) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
if not resp['success']: | ||||||||||||||||||||||||||||||||||
raise AppException("Failed to stop email channel reading") | ||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||
raise AppException("Mail channel does not exist") | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,12 @@ def delete_channel_config(bot: Text, **kwargs): | |
:return: None | ||
""" | ||
kwargs.update({"bot": bot}) | ||
if kwargs.get("connector_type") != ChannelTypes.SLACK.value: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check not needed |
||
try: | ||
from kairon.shared.channels.mail.scheduler import MailScheduler | ||
MailScheduler.request_stop(bot) | ||
except Exception as e: | ||
logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}") | ||
Utility.hard_delete_document([Channels], **kwargs) | ||
|
||
@staticmethod | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
from unittest.mock import patch, MagicMock | ||
|
||
import pytest | ||
from imap_tools import MailMessage | ||
from imap_tools import MailMessage, AND | ||
|
||
from mongoengine import connect, disconnect | ||
from uuid6 import uuid7 | ||
|
@@ -276,6 +276,53 @@ def test_process_mail(self, mock_get_channel_config): | |
assert result == "Hello John Doe, How can I help you today?" | ||
|
||
|
||
|
||
@patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") | ||
@pytest.mark.asyncio | ||
async def test_generate_criteria(self, mock_get_channel_config): | ||
bot_id = pytest.mail_test_bot | ||
mock_get_channel_config.return_value = { | ||
'config': { | ||
'email_account': "[email protected]", | ||
'email_password': "password", | ||
'imap_server': "imap.testuser.com", | ||
} | ||
} | ||
|
||
mp = MailProcessor(bot=bot_id) | ||
mp.state.last_email_uid = 123 | ||
#seen | ||
criteria = mp.generate_criteria(read_status="seen") | ||
print(criteria) | ||
assert criteria == '((SEEN) (UID 124:*))' | ||
|
||
#unseen | ||
criteria = mp.generate_criteria(read_status="unseen") | ||
assert criteria == '((UNSEEN) (UID 124:*))' | ||
|
||
#default | ||
criteria = mp.generate_criteria() | ||
assert criteria == '((UID 124:*))' | ||
|
||
#subjects | ||
criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"]) | ||
assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))' | ||
|
||
#from | ||
criteria = mp.generate_criteria(from_addresses=["info", "[email protected]", "[email protected]"]) | ||
assert criteria == '((OR OR FROM "[email protected]" FROM "[email protected]" FROM "info") (UID 124:*))' | ||
|
||
#mix | ||
criteria = mp.generate_criteria(read_status="unseen", | ||
subjects=["Test Subject", "another test subject", "happy"], | ||
ignore_subjects=['cat'], | ||
ignore_from=["info", "nomreply"], | ||
from_addresses=["@digite.com", "@nimblework.com"]) | ||
|
||
assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))' | ||
|
||
|
||
|
||
@patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap") | ||
@patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task") | ||
@patch("kairon.shared.channels.mail.processor.MailBox") | ||
|
@@ -538,4 +585,42 @@ def test_get_log_exception(self): | |
BotSettings.objects(user="mail_channel_test_user_acc").delete() | ||
Bot.objects(user="mail_channel_test_user_acc").delete() | ||
Account.objects(user="mail_channel_test_user_acc").delete() | ||
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete() | ||
Channels.objects(connector_type=ChannelTypes.MAIL.value).delete() | ||
|
||
|
||
def test_comma_sep_string_to_list(self): | ||
# Test input with empty elements | ||
assert MailProcessor.comma_sep_string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"] | ||
|
||
# Test input with only commas | ||
assert MailProcessor.comma_sep_string_to_list(",,,") == [] | ||
|
||
# Test input with no commas (single element) | ||
assert MailProcessor.comma_sep_string_to_list("apple") == ["apple"] | ||
|
||
# Test input with empty string | ||
assert MailProcessor.comma_sep_string_to_list("") == [] | ||
|
||
# Test input with None | ||
assert MailProcessor.comma_sep_string_to_list(None) == [] | ||
|
||
# Test input with special characters | ||
assert MailProcessor.comma_sep_string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"] | ||
|
||
# Test input with numeric values | ||
assert MailProcessor.comma_sep_string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"] | ||
|
||
# Test input with spaces | ||
assert MailProcessor.comma_sep_string_to_list("apple, banana, orange") == ["apple", "banana", "orange"] | ||
|
||
|
||
@patch('kairon.shared.channels.mail.processor.Channels') | ||
def test_mail_channel_exist(self, mock_channels): | ||
mock_channels.objects.return_value.first.return_value = True | ||
assert MailProcessor.does_mail_channel_exist("test_bot") is True | ||
|
||
@patch('kairon.shared.channels.mail.processor.Channels') | ||
def test_mail_channel_not_exist(self, mock_channels): | ||
mock_channels.objects.return_value.first.return_value = False | ||
assert MailProcessor.does_mail_channel_exist("test_bot") is False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid function call in parameter defaults
The
Path
function call in the parameter default could lead to runtime issues. Instead, declare the parameter type annotation separately.🧰 Tools
🪛 Ruff (0.8.2)
156-156: Do not perform function call
Path
in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable(B008)