From fc6a5e97e94782e42776cbac5b34798ba5c5ee93 Mon Sep 17 00:00:00 2001
From: Spandan Mondal <chocoboyxp@gmail.com>
Date: Fri, 13 Dec 2024 00:52:29 +0530
Subject: [PATCH] Mail channel filter (#1651)

* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

* mail read schedule stop mechanism, filters and their related tests

---------

Co-authored-by: spandan_mondal <spandan.mondal@nimblework.com>
---
 kairon/events/server.py                       |  8 +-
 kairon/events/utility.py                      | 16 +++-
 kairon/shared/channels/mail/processor.py      | 78 ++++++++++++++++---
 kairon/shared/channels/mail/scheduler.py      | 21 ++++-
 kairon/shared/chat/processor.py               |  5 ++
 kairon/shared/utils.py                        | 12 ++-
 metadata/integrations.yml                     |  5 ++
 tests/unit_test/channels/mail_channel_test.py | 54 ++++++++++++-
 .../unit_test/channels/mail_scheduler_test.py | 53 +++++++++++++
 tests/unit_test/utility_test.py               | 29 ++++++-
 10 files changed, 261 insertions(+), 20 deletions(-)

diff --git a/kairon/events/server.py b/kairon/events/server.py
index 3afa312b4..bbcb89f1f 100644
--- a/kairon/events/server.py
+++ b/kairon/events/server.py
@@ -149,4 +149,10 @@ def dispatch_scheduled_event(event_id: Text = Path(description="Event id")):
 @app.get('/api/mail/schedule/{bot}', response_model=Response)
 def request_epoch(bot: Text = Path(description="Bot id")):
     EventUtility.schedule_channel_mail_reading(bot)
-    return {"data": None, "message": "Mail scheduler epoch request!"}
\ No newline at end of file
+    return {"data": None, "message": "Mail scheduler epoch request!"}
+
+
+@app.get('/api/mail/stop/{bot}', response_model=Response)
+def stop_mail_reading(bot: Text = Path(description="Bot id")):
+    EventUtility.stop_channel_mail_reading(bot)
+    return {"data": None, "message": "Mail scheduler stopped!"}
\ No newline at end of file
diff --git a/kairon/events/utility.py b/kairon/events/utility.py
index 85fe35493..f201b2613 100644
--- a/kairon/events/utility.py
+++ b/kairon/events/utility.py
@@ -5,8 +5,7 @@
 from kairon.events.executors.factory import ExecutorFactory
 from kairon.events.scheduler.kscheduler import KScheduler
 from kairon.exceptions import AppException
-from kairon.shared.chat.data_objects import Channels
-from kairon.shared.constants import EventClass, ChannelTypes
+from kairon.shared.constants import EventClass
 from kairon.shared.data.constant import TASK_TYPE
 from loguru import logger
 
@@ -60,3 +59,16 @@ def schedule_channel_mail_reading(bot: str):
                                      EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
         except Exception as e:
             raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")
+
+    @staticmethod
+    def stop_channel_mail_reading(bot: str):
+        from kairon.shared.channels.mail.processor import MailProcessor
+
+        try:
+            mail_processor = MailProcessor(bot)
+            event_id = mail_processor.state.event_id
+            if event_id:
+                mail_processor.update_event_id(None)
+                KScheduler().delete_job(event_id)
+        except Exception as e:
+            raise AppException(f"Failed to stop mail reading for bot {bot}. Error: {str(e)}")
diff --git a/kairon/shared/channels/mail/processor.py b/kairon/shared/channels/mail/processor.py
index e19533258..739c33c3c 100644
--- a/kairon/shared/channels/mail/processor.py
+++ b/kairon/shared/channels/mail/processor.py
@@ -4,7 +4,9 @@
 from loguru import logger
 from pydantic.schema import timedelta
 from pydantic.validators import datetime
-from imap_tools import MailBox, AND
+from imap_tools import MailBox, AND, OR, NOT
+
+from kairon import Utility
 from kairon.exceptions import AppException
 from kairon.shared.account.data_objects import Bot
 from kairon.shared.channels.mail.constants import MailConstants
@@ -35,8 +37,9 @@ def update_event_id(self, event_id):
         self.state.event_id = event_id
         self.state.save()
 
+
     @staticmethod
-    def get_mail_channel_state_data(bot):
+    def get_mail_channel_state_data(bot:str):
         """
         Get mail channel state data
         """
@@ -227,6 +230,57 @@ def process_message_task(bot: str, message_batch: [dict]):
         """
         asyncio.run(MailProcessor.process_messages(bot, message_batch))
 
+    def generate_criteria(self, subjects=None, ignore_subjects=None, from_addresses=None, ignore_from=None,
+                          read_status="all"):
+        """
+        Generate IMAP criteria for fetching emails.
+
+        Args:
+            subjects (list[str], optional): List of subjects to include.
+            ignore_subjects (list[str], optional): List of subjects to exclude.
+            from_addresses (list[str], optional): List of senders to include.
+            ignore_from (list[str], optional): List of senders to exclude.
+            read_status (str): Read status filter ('all', 'seen', 'unseen').
+
+        Returns:
+            imap_tools.query.AND: IMAP criteria object.
+        """
+        criteria = []
+
+        if read_status.lower() == "seen":
+            criteria.append(AND(seen=True))
+        elif read_status.lower() == "unseen":
+            criteria.append(AND(seen=False))
+
+        if subjects:
+            criteria.append(OR(subject=subjects))
+
+        if ignore_subjects:
+            for subject in ignore_subjects:
+                criteria.append(NOT(AND(subject=subject)))
+
+        if from_addresses:
+            criteria.append(OR(from_=from_addresses))
+
+        if ignore_from:
+            for sender in ignore_from:
+                criteria.append(NOT(AND(from_=sender)))
+
+        last_processed_uid = self.state.last_email_uid
+        base_read_criteria = None
+        if last_processed_uid == 0:
+            time_shift = int(self.config.get('interval', 20 * 60))
+            last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
+            base_read_criteria = AND(date_gte=last_read_timestamp.date())
+        else:
+            query = f'{int(last_processed_uid) + 1}:*'
+            base_read_criteria = AND(uid=query)
+
+        criteria.append(base_read_criteria)
+
+        # Combine all criteria with AND
+        return AND(*criteria)
+
 
     @staticmethod
     def read_mails(bot: str) -> ([dict], str):
@@ -253,15 +307,17 @@ def read_mails(bot: str) -> ([dict], str):
         try:
             mp.login_imap()
             is_logged_in = True
-            msgs = []
-            if last_processed_uid == 0:
-                time_shift = int(mp.config.get('interval', 20 * 60))
-                last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
-                msgs = mp.mailbox.fetch(AND(date_gte=last_read_timestamp.date()), mark_seen=False)
-            else:
-                query = f'{int(last_processed_uid) + 1}:*'
-                msgs = mp.mailbox.fetch(AND(uid=query), mark_seen=False)
-            for msg in msgs:
+            subject = mp.config.get('subjects', "")
+            subject = Utility.string_to_list(subject)
+            ignore_subject = mp.config.get('ignore_subjects', "")
+            ignore_subject = Utility.string_to_list(ignore_subject)
+            from_addresses = mp.config.get('from_emails', "")
+            from_addresses = Utility.string_to_list(from_addresses)
+            ignore_from = mp.config.get('ignore_from_emails', "")
+            ignore_from = Utility.string_to_list(ignore_from)
+            read_status = mp.config.get('seen_status', 'all')
+            criteria = mp.generate_criteria(subject, ignore_subject, from_addresses, ignore_from, read_status)
+            for msg in mp.mailbox.fetch(criteria, mark_seen=False):
                 if int(msg.uid) <= last_processed_uid:
                     continue
                 last_processed_uid = int(msg.uid)
diff --git a/kairon/shared/channels/mail/scheduler.py b/kairon/shared/channels/mail/scheduler.py
index adff58319..8e5e246db 100644
--- a/kairon/shared/channels/mail/scheduler.py
+++ b/kairon/shared/channels/mail/scheduler.py
@@ -1,9 +1,9 @@
 from urllib.parse import urljoin
-
-
 from kairon import Utility
 from kairon.exceptions import AppException
 from kairon.shared.channels.mail.processor import MailProcessor
+from kairon.shared.chat.data_objects import Channels
+from kairon.shared.constants import ChannelTypes
 
 
 class MailScheduler:
@@ -28,6 +28,23 @@ def request_epoch(bot: str):
         if not resp['success']:
             raise AppException("Failed to request email channel epoch")
 
+    @staticmethod
+    def request_stop(bot: str):
+        event_server_url = Utility.get_event_server_url()
+        if Utility.is_exist(Channels, raise_error=False, bot=bot, connector_type=ChannelTypes.MAIL.value):
+            resp = Utility.execute_http_request(
+                "GET",
+                urljoin(
+                    event_server_url,
+                    f"/api/mail/stop/{bot}",
+                ),
+                err_msg="Failed to request epoch",
+            )
+            if not resp['success']:
+                raise AppException("Failed to stop email channel reading")
+        else:
+            raise AppException("Mail channel does not exist")
+
 
 
 
diff --git a/kairon/shared/chat/processor.py b/kairon/shared/chat/processor.py
index 77fc10500..c6c9693dc 100644
--- a/kairon/shared/chat/processor.py
+++ b/kairon/shared/chat/processor.py
@@ -70,6 +70,11 @@ def delete_channel_config(bot: Text, **kwargs):
         :return: None
         """
         kwargs.update({"bot": bot})
+        try:
+            from kairon.shared.channels.mail.scheduler import MailScheduler
+            MailScheduler.request_stop(bot)
+        except Exception as e:
+            logger.error(f"Error while stopping mail scheduler for bot {bot}. Error: {str(e)}")
         Utility.hard_delete_document([Channels], **kwargs)
 
     @staticmethod
diff --git a/kairon/shared/utils.py b/kairon/shared/utils.py
index 0407c1248..3479952e6 100644
--- a/kairon/shared/utils.py
+++ b/kairon/shared/utils.py
@@ -2223,6 +2223,15 @@ def delete_documents(document: Document, user: str = "System"):
             document["user"] = user
             document.delete()
 
+    @staticmethod
+    def string_to_list(comma_sep_string: str, delimilter: str = ",") -> List[str]:
+        """
+        Convert comma separated string to list
+        """
+        if not comma_sep_string:
+            return []
+        return [item.strip() for item in comma_sep_string.split(delimilter) if item.strip()]
+
 
 class StoryValidator:
     @staticmethod
@@ -2755,4 +2764,5 @@ def __handle_member_left_bot(**kwargs):
         body = body.replace("MEMBER_NAME", user_name)
         body = body.replace("BOT_NAME", bot_name)
         subject = f"Notification: {user_name} has left the {bot_name} bot"
-        return body, subject
\ No newline at end of file
+        return body, subject
+
diff --git a/metadata/integrations.yml b/metadata/integrations.yml
index d433d18f4..5b8b5755d 100644
--- a/metadata/integrations.yml
+++ b/metadata/integrations.yml
@@ -85,6 +85,11 @@ channels:
     optional_fields:
       - interval
       - intent
+      - subjects
+      - ignore_subjects
+      - from_emails
+      - ignore_from_emails
+      - seen_status
 
 actions:
   pipedrive:
diff --git a/tests/unit_test/channels/mail_channel_test.py b/tests/unit_test/channels/mail_channel_test.py
index 7a6e7f103..c748d5a57 100644
--- a/tests/unit_test/channels/mail_channel_test.py
+++ b/tests/unit_test/channels/mail_channel_test.py
@@ -3,7 +3,7 @@
 from unittest.mock import patch, MagicMock
 
 import pytest
-from imap_tools import MailMessage
+from imap_tools import MailMessage, AND
 
 from mongoengine import connect, disconnect
 from uuid6 import uuid7
@@ -276,6 +276,53 @@ def test_process_mail(self,  mock_get_channel_config):
         assert result == "Hello John Doe, How can I help you today?"
 
 
+
+    @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config")
+    @pytest.mark.asyncio
+    async def test_generate_criteria(self, mock_get_channel_config):
+        bot_id = pytest.mail_test_bot
+        mock_get_channel_config.return_value = {
+            'config': {
+                'email_account': "mail_channel_test_user_acc@testuser.com",
+                'email_password': "password",
+                'imap_server': "imap.testuser.com",
+            }
+        }
+
+        mp = MailProcessor(bot=bot_id)
+        mp.state.last_email_uid = 123
+        #seen
+        criteria = mp.generate_criteria(read_status="seen")
+        print(criteria)
+        assert criteria == '((SEEN) (UID 124:*))'
+
+        #unseen
+        criteria = mp.generate_criteria(read_status="unseen")
+        assert criteria == '((UNSEEN) (UID 124:*))'
+
+        #default
+        criteria = mp.generate_criteria()
+        assert criteria == '((UID 124:*))'
+
+        #subjects
+        criteria = mp.generate_criteria(subjects=["Test Subject", "another test subject"])
+        assert criteria == '((OR SUBJECT "Test Subject" SUBJECT "another test subject") (UID 124:*))'
+
+        #from
+        criteria = mp.generate_criteria(from_addresses=["info", "important1@gmail.com", "anotherparrtern@gmail.com"])
+        assert criteria == '((OR OR FROM "anotherparrtern@gmail.com" FROM "important1@gmail.com" FROM "info") (UID 124:*))'
+
+        #mix
+        criteria = mp.generate_criteria(read_status="unseen",
+                                        subjects=["Test Subject", "another test subject", "happy"],
+                                        ignore_subjects=['cat'],
+                                        ignore_from=["info", "nomreply"],
+                                        from_addresses=["@digite.com", "@nimblework.com"])
+
+        assert criteria == '((UNSEEN) (OR OR SUBJECT "Test Subject" SUBJECT "another test subject" SUBJECT "happy") NOT ((SUBJECT "cat")) (OR FROM "@digite.com" FROM "@nimblework.com") NOT ((FROM "info")) NOT ((FROM "nomreply")) (UID 124:*))'
+
+
+
     @patch("kairon.shared.channels.mail.processor.MailProcessor.logout_imap")
     @patch("kairon.shared.channels.mail.processor.MailProcessor.process_message_task")
     @patch("kairon.shared.channels.mail.processor.MailBox")
@@ -538,4 +585,7 @@ def test_get_log_exception(self):
         BotSettings.objects(user="mail_channel_test_user_acc").delete()
         Bot.objects(user="mail_channel_test_user_acc").delete()
         Account.objects(user="mail_channel_test_user_acc").delete()
-        Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()
\ No newline at end of file
+        Channels.objects(connector_type=ChannelTypes.MAIL.value).delete()
+
+
+
diff --git a/tests/unit_test/channels/mail_scheduler_test.py b/tests/unit_test/channels/mail_scheduler_test.py
index f7073edc9..bd89b266d 100644
--- a/tests/unit_test/channels/mail_scheduler_test.py
+++ b/tests/unit_test/channels/mail_scheduler_test.py
@@ -108,3 +108,56 @@ def test_schedule_channel_mail_reading_exception(mock_mongo_client, mock_mail_pr
         EventUtility.schedule_channel_mail_reading(bot)
     assert str(excinfo.value) == f"Failed to schedule mail reading for bot {bot}. Error: Test Exception"
 
+@patch('kairon.events.utility.KScheduler.delete_job')
+@patch('kairon.events.utility.KScheduler.__init__', return_value=None)
+@patch('kairon.shared.channels.mail.processor.MailProcessor')
+@patch('pymongo.MongoClient', autospec=True)
+def test_stop_channel_mail_reading(mock_mongo, mock_mail_processor, mock_kscheduler, mock_delete_job):
+    from kairon.events.utility import EventUtility
+
+    bot = "test_bot"
+    mock_mail_processor_instance = mock_mail_processor.return_value
+    mock_mail_processor_instance.config = {"interval": 1}
+    mock_mail_processor_instance.state.event_id = 'existing_event_id'
+    mock_mail_processor_instance.bot_settings.user = "test_user"
+
+#     # Test case when event_id is None
+    EventUtility.stop_channel_mail_reading(bot)
+    mock_delete_job.assert_called_once()
+
+@patch('kairon.shared.utils.Utility.is_exist')
+@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
+@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
+def test_request_stop_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
+    mock_get_event_server_url.return_value = "http://localhost"
+    mock_execute_http_request.return_value = {'success': True}
+    mock_imp.return_value = True
+    bot = "test_bot"
+    try:
+        MailScheduler.request_stop(bot)
+    except AppException:
+        pytest.fail("request_epoch() raised AppException unexpectedly!")
+
+
+@patch('kairon.shared.utils.Utility.is_exist')
+@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
+@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
+def test_request_stop_response_not_success(mock_execute_http_request, mock_get_event_server_url, mock_imp):
+    mock_get_event_server_url.return_value = "http://localhost"
+    mock_execute_http_request.return_value = {'success': False}
+    mock_imp.return_value = True
+    bot = "test_bot"
+    with pytest.raises(AppException):
+        MailScheduler.request_stop(bot)
+
+@patch('kairon.shared.utils.Utility.is_exist')
+@patch('kairon.shared.channels.mail.scheduler.Utility.get_event_server_url')
+@patch('kairon.shared.channels.mail.scheduler.Utility.execute_http_request')
+def test_request_stop_no_channel_exist_exception(mock_execute_http_request, mock_get_event_server_url, mock_imp):
+    mock_get_event_server_url.return_value = "http://localhost"
+    mock_execute_http_request.return_value = {'success': True}
+    mock_imp.return_value = False
+    bot = "test_bot"
+    with pytest.raises(AppException):
+        MailScheduler.request_stop(bot)
+
diff --git a/tests/unit_test/utility_test.py b/tests/unit_test/utility_test.py
index 5427328aa..e9ca44ee1 100644
--- a/tests/unit_test/utility_test.py
+++ b/tests/unit_test/utility_test.py
@@ -3305,4 +3305,31 @@ async def test_handle_first_name_member_update_email_template(self, validate_and
         )
         validate_and_send_mail_mock.assert_called_once_with(
             email, expected_subject, expected_body
-        )
\ No newline at end of file
+        )
+
+
+    def test_comma_sep_string_to_list(self):
+        # Test input with empty elements
+        assert Utility.string_to_list("apple, , banana, , orange") == ["apple", "banana", "orange"]
+
+        # Test input with only commas
+        assert Utility.string_to_list(",,,") == []
+
+        # Test input with no commas (single element)
+        assert Utility.string_to_list("apple") == ["apple"]
+
+        # Test input with empty string
+        assert Utility.string_to_list("") == []
+
+        # Test input with None
+        assert Utility.string_to_list(None) == []
+
+        # Test input with special characters
+        assert Utility.string_to_list("apple,@banana, #orange$") == ["apple", "@banana", "#orange$"]
+
+        # Test input with numeric values
+        assert Utility.string_to_list("1, 2, 3, 4") == ["1", "2", "3", "4"]
+
+        # Test input with spaces
+        assert Utility.string_to_list("apple, banana, orange") == ["apple", "banana", "orange"]
+