-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mail channel filter and stop #1649
Changes from all commits
6c99982
4c999fc
34821e5
96c9311
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,9 @@ | |
from loguru import logger | ||
from pydantic.schema import timedelta | ||
from pydantic.validators import datetime | ||
from imap_tools import MailBox, AND | ||
from imap_tools import MailBox, AND, OR, NOT | ||
|
||
from kairon import Utility | ||
from kairon.exceptions import AppException | ||
from kairon.shared.account.data_objects import Bot | ||
from kairon.shared.channels.mail.constants import MailConstants | ||
|
@@ -35,8 +37,9 @@ def update_event_id(self, event_id): | |
self.state.event_id = event_id | ||
self.state.save() | ||
|
||
|
||
@staticmethod | ||
def get_mail_channel_state_data(bot): | ||
def get_mail_channel_state_data(bot:str): | ||
""" | ||
Get mail channel state data | ||
""" | ||
|
@@ -227,6 +230,57 @@ def process_message_task(bot: str, message_batch: [dict]): | |
""" | ||
asyncio.run(MailProcessor.process_messages(bot, message_batch)) | ||
|
||
def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None, | ||
read_status="all"): | ||
""" | ||
Generate IMAP criteria for fetching emails. | ||
|
||
Args: | ||
subjects (list[str], optional): List of subjects to include. | ||
ignore_subjects (list[str], optional): List of subjects to exclude. | ||
from_addresses (list[str], optional): List of senders to include. | ||
ignore_from (list[str], optional): List of senders to exclude. | ||
read_status (str): Read status filter ('all', 'seen', 'unseen'). | ||
|
||
Returns: | ||
imap_tools.query.AND: IMAP criteria object. | ||
""" | ||
criteria = [] | ||
|
||
if read_status.lower() == "seen": | ||
criteria.append(AND(seen=True)) | ||
elif read_status.lower() == "unseen": | ||
criteria.append(AND(seen=False)) | ||
|
||
if subjects: | ||
criteria.append(OR(subject=subjects)) | ||
|
||
if ignore_subjects: | ||
for subject in ignore_subjects: | ||
criteria.append(NOT(AND(subject=subject))) | ||
|
||
if from_addresses: | ||
criteria.append(OR(from_=from_addresses)) | ||
|
||
if ignore_from: | ||
for sender in ignore_from: | ||
criteria.append(NOT(AND(from_=sender))) | ||
|
||
last_processed_uid = self.state.last_email_uid | ||
base_read_criteria = None | ||
if last_processed_uid == 0: | ||
time_shift = int(self.config.get('interval', 20 * 60)) | ||
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) | ||
base_read_criteria = AND(date_gte=last_read_timestamp.date()) | ||
else: | ||
query = f'{int(last_processed_uid) + 1}:*' | ||
base_read_criteria = AND(uid=query) | ||
|
||
criteria.append(base_read_criteria) | ||
|
||
# Combine all criteria with AND | ||
return AND(*criteria) | ||
|
||
|
||
@staticmethod | ||
def read_mails(bot: str) -> ([dict], str): | ||
|
@@ -253,15 +307,17 @@ def read_mails(bot: str) -> ([dict], str): | |
try: | ||
mp.login_imap() | ||
is_logged_in = True | ||
msgs = [] | ||
if last_processed_uid == 0: | ||
time_shift = int(mp.config.get('interval', 20 * 60)) | ||
last_read_timestamp = datetime.now() - timedelta(seconds=time_shift) | ||
msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False) | ||
else: | ||
query = f'{int(last_processed_uid) + 1}:*' | ||
msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False) | ||
for msg in msgs: | ||
subject = mp.config.get('subjects', "") | ||
subject = Utility.string_to_list(subject) | ||
ignore_subject = mp.config.get('ignore_subjects', "") | ||
ignore_subject = Utility.string_to_list(ignore_subject) | ||
from_addresses = mp.config.get('from_emails', "") | ||
from_addresses = Utility.string_to_list(from_addresses) | ||
ignore_from = mp.config.get('ignore_from_emails', "") | ||
ignore_from = Utility.string_to_list(ignore_from) | ||
read_status = mp.config.get('seen_status', 'all') | ||
criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status) | ||
Comment on lines
+310
to
+319
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add error handling for malformed configuration values The configuration retrieval could fail if the values are malformed. Consider adding proper error handling. Add error handling for configuration parsing: - subject = mp.config.get('subjects', "")
- subject = Utility.string_to_list(subject)
+ try:
+ subject = mp.config.get('subjects', "")
+ subject = Utility.string_to_list(subject)
+ except Exception as e:
+ logger.error(f"Error parsing subjects configuration: {str(e)}")
+ subject = [] Apply similar error handling for other configuration values.
|
||
for msg in mp.mailbox.fetch(criteria, mark_seen=False): | ||
if int(msg.uid) <= last_processed_uid: | ||
continue | ||
last_processed_uid = int(msg.uid) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
from urllib.parse import urljoin | ||
|
||
|
||
from kairon import Utility | ||
from kairon.exceptions import AppException | ||
from kairon.shared.channels.mail.processor import MailProcessor | ||
from kairon.shared.chat.data_objects import Channels | ||
from kairon.shared.constants import ChannelTypes | ||
|
||
|
||
class MailScheduler: | ||
|
@@ -28,6 +28,23 @@ def request_epoch(bot: str): | |
if not resp['success']: | ||
raise AppException("Failed to request email channel epoch") | ||
|
||
@staticmethod | ||
def request_stop(bot: str): | ||
event_server_url = Utility.get_event_server_url() | ||
if Utility.is_exist(Channels, raise_error=False, bot=bot, connector_type=ChannelTypes.MAIL.value): | ||
resp = Utility.execute_http_request( | ||
"GET", | ||
urljoin( | ||
event_server_url, | ||
f"/api/mail/stop/{bot}", | ||
), | ||
err_msg="Failed to request epoch", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the error message in 'err_msg' parameter In line 41, the Apply this diff to correct the error message: err_msg="Failed to request epoch",
+ err_msg="Failed to stop email channel reading",
|
||
) | ||
if not resp['success']: | ||
raise AppException("Failed to stop email channel reading") | ||
else: | ||
raise AppException("Mail channel does not exist") | ||
|
||
|
||
|
||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -85,6 +85,11 @@ channels: | |||||||||||||||||||||
optional_fields: | ||||||||||||||||||||||
- interval | ||||||||||||||||||||||
- intent | ||||||||||||||||||||||
- subjects | ||||||||||||||||||||||
- ignore_subjects | ||||||||||||||||||||||
- from_emails | ||||||||||||||||||||||
- ignore_from_emails | ||||||||||||||||||||||
- seen_status | ||||||||||||||||||||||
Comment on lines
+88
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Ensure consistent naming of 'from' address fields In the Optionally, apply this diff to update the field names: optional_fields:
- interval
- intent
- subjects
- ignore_subjects
- - from_emails
- - ignore_from_emails
+ - from_addresses
+ - ignore_from
- seen_status Ensure that the code and documentation are updated accordingly to reflect these changes. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||
|
||||||||||||||||||||||
actions: | ||||||||||||||||||||||
pipedrive: | ||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -108,3 +108,56 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr | |||||||||||||||||
EventUtility.schedule_channel_mail_reading(bot) | ||||||||||||||||||
assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception" | ||||||||||||||||||
|
||||||||||||||||||
@patch('kairon.events.utility.KScheduler.delete_job') | ||||||||||||||||||
@patch('kairon.events.utility.KScheduler.__init__', return_value=None) | ||||||||||||||||||
@patch('kairon.shared.channels.mail.processor.MailProcessor') | ||||||||||||||||||
@patch('pymongo.MongoClient', autospec=True) | ||||||||||||||||||
def test_stop_channel_mail_reading(mock_mongo, mock_mail_processor, mock_kscheduler, mock_delete_job): | ||||||||||||||||||
from kairon.events.utility import EventUtility | ||||||||||||||||||
|
||||||||||||||||||
bot = "test_bot" | ||||||||||||||||||
mock_mail_processor_instance = mock_mail_processor.return_value | ||||||||||||||||||
mock_mail_processor_instance.config = {"interval": 1} | ||||||||||||||||||
mock_mail_processor_instance.state.event_id = 'existing_event_id' | ||||||||||||||||||
mock_mail_processor_instance.bot_settings.user = "test_user" | ||||||||||||||||||
|
||||||||||||||||||
# # Test case when event_id is None | ||||||||||||||||||
EventUtility.stop_channel_mail_reading(bot) | ||||||||||||||||||
mock_delete_job.assert_called_once() | ||||||||||||||||||
|
||||||||||||||||||
@patch('kairon.shared.utils.Utility.is_exist') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') | ||||||||||||||||||
def test_request_stop_success(mock_execute_http_request, mock_get_event_server_url, mock_imp): | ||||||||||||||||||
mock_get_event_server_url.return_value = "http://localhost" | ||||||||||||||||||
mock_execute_http_request.return_value = {'success': True} | ||||||||||||||||||
mock_imp.return_value = True | ||||||||||||||||||
bot = "test_bot" | ||||||||||||||||||
try: | ||||||||||||||||||
MailScheduler.request_stop(bot) | ||||||||||||||||||
except AppException: | ||||||||||||||||||
pytest.fail("request_epoch() raised AppException unexpectedly!") | ||||||||||||||||||
Comment on lines
+136
to
+139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix incorrect error message in test assertion. The error message references try:
MailScheduler.request_stop(bot)
except AppException:
- pytest.fail("request_epoch() raised AppException unexpectedly!")
+ pytest.fail("request_stop() raised AppException unexpectedly!") 📝 Committable suggestion
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
@patch('kairon.shared.utils.Utility.is_exist') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') | ||||||||||||||||||
def test_request_stop_response_not_success(mock_execute_http_request, mock_get_event_server_url, mock_imp): | ||||||||||||||||||
mock_get_event_server_url.return_value = "http://localhost" | ||||||||||||||||||
mock_execute_http_request.return_value = {'success': False} | ||||||||||||||||||
mock_imp.return_value = True | ||||||||||||||||||
bot = "test_bot" | ||||||||||||||||||
with pytest.raises(AppException): | ||||||||||||||||||
MailScheduler.request_stop(bot) | ||||||||||||||||||
|
||||||||||||||||||
@patch('kairon.shared.utils.Utility.is_exist') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url') | ||||||||||||||||||
@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request') | ||||||||||||||||||
def test_request_stop_no_channel_exist_exception(mock_execute_http_request, mock_get_event_server_url, mock_imp): | ||||||||||||||||||
mock_get_event_server_url.return_value = "http://localhost" | ||||||||||||||||||
mock_execute_http_request.return_value = {'success': True} | ||||||||||||||||||
mock_imp.return_value = False | ||||||||||||||||||
bot = "test_bot" | ||||||||||||||||||
with pytest.raises(AppException): | ||||||||||||||||||
MailScheduler.request_stop(bot) | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use 'raise ... from e' to preserve exception context
In line 74, within the
except
block ofstop_channel_mail_reading
, re-raising an exception without specifyingfrom e
loses the original exception's context. Includingfrom e
preserves the exception chain and aids in debugging.Apply this diff to include the original exception context:
📝 Committable suggestion
🧰 Tools
🪛 Ruff (0.8.2)
74-74: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)