Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mail channel first iteration #1620

Merged
merged 27 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion kairon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@


def create_argument_parser():
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs, message_broadcast,content_importer
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\
message_broadcast,content_importer, mail_channel_process, mail_channel_read

parser = ArgumentParser(
prog="kairon",
Expand All @@ -62,6 +63,8 @@ def create_argument_parser():
delete_logs.add_subparser(subparsers, parents=parent_parsers)
message_broadcast.add_subparser(subparsers, parents=parent_parsers)
content_importer.add_subparser(subparsers, parents=parent_parsers)
mail_channel_process.add_subparser(subparsers, parents=parent_parsers)
mail_channel_read.add_subparser(subparsers, parents=parent_parsers)
return parser


Expand Down
12 changes: 12 additions & 0 deletions kairon/api/app/routers/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kairon.shared.account.activity_log import UserActivityLogger
from kairon.shared.actions.data_objects import ActionServerLogs
from kairon.shared.auth import Authentication
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.constants import TESTER_ACCESS, DESIGNER_ACCESS, CHAT_ACCESS, UserActivityType, ADMIN_ACCESS, \
EventClass, AGENT_ACCESS
from kairon.shared.content_importer.content_processor import ContentImporterLogProcessor
Expand Down Expand Up @@ -1658,3 +1659,14 @@ async def get_slot_actions(
llm_models = MongoProcessor.get_slot_mapped_actions(current_user.get_bot(), slot_name)
return Response(data=llm_models)



@router.get("/mail_channel/logs", response_model=Response)
async def get_mail_channel_logs(start_idx: int = 0, page_size: int = 10,
current_user: User = Security(Authentication.get_current_user_and_bot,
scopes=TESTER_ACCESS)):
"""
Retrieves mail channel related logs for the bot.
"""
data = MailProcessor.get_log(current_user.get_bot(), start_idx, page_size)
return Response(data=data)
19 changes: 19 additions & 0 deletions kairon/chat/agent_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,22 @@ async def handle_channel_message(bot: Text, userdata: UserMessage):
if not is_live_agent_enabled:
return await AgentProcessor.get_agent(bot).handle_message(userdata)
return await LiveAgentHandler.process_live_agent(bot, userdata)

@staticmethod
def get_agent_without_cache(bot: str, use_store: bool = True) -> Agent:
endpoint = AgentProcessor.mongo_processor.get_endpoints(
bot, raise_exception=False
)
action_endpoint = Utility.get_action_url(endpoint)
model_path = Utility.get_latest_model(bot)
domain = AgentProcessor.mongo_processor.load_domain(bot)
if use_store:
mongo_store = Utility.get_local_mongo_store(bot, domain)
lock_store_endpoint = Utility.get_lock_store(bot)
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store,
lock_store=lock_store_endpoint)
else:
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint)

agent.model_ver = model_path.split("/")[-1]
return agent
Comment on lines +82 to +99
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Add error handling and documentation to get_agent_without_cache method

  1. The method needs proper error handling similar to reload():
 @staticmethod
 def get_agent_without_cache(bot: str, use_store: bool = True) -> Agent:
+    """
+    Load a bot agent without using cache
+
+    Args:
+        bot: Bot ID
+        use_store: Whether to use local MongoDB store and lock store
+
+    Returns:
+        Agent: Loaded bot agent
+
+    Raises:
+        AppException: If bot has not been trained yet
+    """
+    try:
         endpoint = AgentProcessor.mongo_processor.get_endpoints(
             bot, raise_exception=False
         )
         action_endpoint = Utility.get_action_url(endpoint)
         model_path = Utility.get_latest_model(bot)
         domain = AgentProcessor.mongo_processor.load_domain(bot)
         if use_store:
             mongo_store = Utility.get_local_mongo_store(bot, domain)
             lock_store_endpoint = Utility.get_lock_store(bot)
             agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store,
                                      lock_store=lock_store_endpoint)
         else:
             agent = KaironAgent.load(model_path, action_endpoint=action_endpoint)

         agent.model_ver = model_path.split("/")[-1]
         return agent
+    except Exception as e:
+        logging.exception(e)
+        raise AppException("Bot has not been trained yet!")
  1. Consider refactoring common code between reload() and this method to reduce duplication.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@staticmethod
def get_agent_without_cache(bot: str, use_store: bool = True) -> Agent:
endpoint = AgentProcessor.mongo_processor.get_endpoints(
bot, raise_exception=False
)
action_endpoint = Utility.get_action_url(endpoint)
model_path = Utility.get_latest_model(bot)
domain = AgentProcessor.mongo_processor.load_domain(bot)
if use_store:
mongo_store = Utility.get_local_mongo_store(bot, domain)
lock_store_endpoint = Utility.get_lock_store(bot)
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store,
lock_store=lock_store_endpoint)
else:
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint)
agent.model_ver = model_path.split("/")[-1]
return agent
@staticmethod
def get_agent_without_cache(bot: str, use_store: bool = True) -> Agent:
"""
Load a bot agent without using cache
Args:
bot: Bot ID
use_store: Whether to use local MongoDB store and lock store
Returns:
Agent: Loaded bot agent
Raises:
AppException: If bot has not been trained yet
"""
try:
endpoint = AgentProcessor.mongo_processor.get_endpoints(
bot, raise_exception=False
)
action_endpoint = Utility.get_action_url(endpoint)
model_path = Utility.get_latest_model(bot)
domain = AgentProcessor.mongo_processor.load_domain(bot)
if use_store:
mongo_store = Utility.get_local_mongo_store(bot, domain)
lock_store_endpoint = Utility.get_lock_store(bot)
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store,
lock_store=lock_store_endpoint)
else:
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint)
agent.model_ver = model_path.split("/")[-1]
return agent
except Exception as e:
logging.exception(e)
raise AppException("Bot has not been trained yet!")

31 changes: 31 additions & 0 deletions kairon/chat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,37 @@ async def chat(
)
return chat_response

@staticmethod
async def process_messages_via_bot(
messages: [str],
account: int,
bot: str,
user: str,
is_integration_user: bool = False,
metadata: Dict = None,
):
"""Process a list of messages through the bot.
Args:
messages: List of messages to process
account: Account ID
bot: Bot ID
user: User ID
is_integration_user: Flag indicating if user is integration user
metadata: Additional metadata

Returns:
List of responses from the bot
"""
responses = []
uncached_model = AgentProcessor.get_agent_without_cache(bot, False)
metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
for message in messages:
msg = UserMessage(message, sender_id=user, metadata=metadata)
chat_response = await uncached_model.handle_message(msg)
responses.append(chat_response)
return responses
Comment on lines +67 to +74
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and input validation.

Consider adding:

  1. Input validation for empty messages list
  2. Error handling for failed message processing
  3. Type validation for message strings
@staticmethod
async def process_messages_via_bot(
        messages: [str],
        account: int,
        bot: str,
        user: str,
        is_integration_user: bool = False,
        metadata: Dict = None,
):
    """Process a list of messages through the bot.
     Args:
         messages: List of messages to process
         account: Account ID
         bot: Bot ID
         user: User ID
         is_integration_user: Flag indicating if user is integration user
         metadata: Additional metadata

     Returns:
         List of responses from the bot
    """
+   if not messages:
+       raise ValueError("Messages list cannot be empty")
+   if not all(isinstance(msg, str) for msg in messages):
+       raise ValueError("All messages must be strings")
    
    responses = []
    uncached_model = AgentProcessor.get_agent_without_cache(bot, False)
    metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
    for message in messages:
+       try:
            msg = UserMessage(message, sender_id=user, metadata=metadata)
            chat_response = await uncached_model.handle_message(msg)
            responses.append(chat_response)
+       except Exception as e:
+           logger.error(f"Error processing message '{message}': {str(e)}")
+           responses.append({"error": f"Failed to process message: {str(e)}"})
    return responses
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
responses = []
uncached_model = AgentProcessor.get_agent_without_cache(bot, False)
metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
for message in messages:
msg = UserMessage(message, sender_id=user, metadata=metadata)
chat_response = await uncached_model.handle_message(msg)
responses.append(chat_response)
return responses
if not messages:
raise ValueError("Messages list cannot be empty")
if not all(isinstance(msg, str) for msg in messages):
raise ValueError("All messages must be strings")
responses = []
uncached_model = AgentProcessor.get_agent_without_cache(bot, False)
metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
for message in messages:
try:
msg = UserMessage(message, sender_id=user, metadata=metadata)
chat_response = await uncached_model.handle_message(msg)
responses.append(chat_response)
except Exception as e:
logger.error(f"Error processing message '{message}': {str(e)}")
responses.append({"error": f"Failed to process message: {str(e)}"})
return responses



@staticmethod
def reload(bot: Text, user: Text):
exc = None
Expand Down
36 changes: 36 additions & 0 deletions kairon/cli/mail_channel_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import List
from rasa.cli import SubParsersAction

from kairon.events.definitions.mail_channel import MailProcessEvent


def process_channel_mails(args):
mails = json.loads(args.mails)
if not isinstance(mails, list):
raise ValueError("Mails should be a list")
MailProcessEvent(args.bot, args.user).execute(mails=mails)
Comment on lines +9 to +13
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and error handling

The JSON parsing and validation needs additional safety measures:

  1. Add size limit validation for the input JSON
  2. Add proper error handling for JSON parsing
  3. Validate the structure of mail objects in the list

Consider applying this improvement:

 def process_channel_mails(args):
+    if len(args.mails) > MAX_MAIL_JSON_SIZE:
+        raise ValueError(f"Mail JSON exceeds maximum size of {MAX_MAIL_JSON_SIZE} bytes")
+    try:
         mails = json.loads(args.mails)
         if not isinstance(mails, list):
             raise ValueError("Mails should be a list")
+        for mail in mails:
+            if not isinstance(mail, dict) or not all(k in mail for k in REQUIRED_MAIL_FIELDS):
+                raise ValueError("Invalid mail format in list")
+    except json.JSONDecodeError as e:
+        raise ValueError(f"Invalid JSON format: {str(e)}")

Committable suggestion skipped: line range outside the PR's diff.



def add_subparser(subparsers: SubParsersAction, parents: List[ArgumentParser]):
mail_parser = subparsers.add_parser(
"mail-channel-process",
conflict_handler="resolve",
formatter_class=ArgumentDefaultsHelpFormatter,
parents=parents,
help="Mail channel process mails"
)
mail_parser.add_argument('bot',
type=str,
help="Bot id for which command is executed", action='store')

mail_parser.add_argument('user',
type=str,
help="Kairon user who is initiating the command", action='store')

mail_parser.add_argument('mails',
type=str,
help="json representing List of mails to be processed", action='store')
Comment on lines +24 to +34
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add argument validation in CLI parser

The CLI arguments need format validation to ensure they meet expected patterns:

  1. Bot ID format validation
  2. User ID format validation
  3. Help text should specify the expected JSON structure

Consider enhancing the argument definitions:

     mail_parser.add_argument('bot',
                             type=str,
+                            pattern=r'^[a-f0-9]{24}$',  # Assuming MongoDB ObjectId format
                             help="Bot id for which command is executed", action='store')

     mail_parser.add_argument('user',
                             type=str,
+                            pattern=r'^[a-f0-9]{24}$',  # Assuming MongoDB ObjectId format
                             help="Kairon user who is initiating the command", action='store')

     mail_parser.add_argument('mails',
                             type=str,
-                            help="json representing List of mails to be processed",
+                            help="JSON array of mail objects. Each object must contain: subject, body, from_email",
                             action='store')

Committable suggestion skipped: line range outside the PR's diff.


mail_parser.set_defaults(func=process_channel_mails)
28 changes: 28 additions & 0 deletions kairon/cli/mail_channel_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import List
from rasa.cli import SubParsersAction

from kairon.events.definitions.mail_channel import MailReadEvent


def read_channel_mails(args):
MailReadEvent(args.bot, args.user).execute()
Comment on lines +8 to +9
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error handling and logging

The read_channel_mails function directly executes the MailReadEvent without any error handling. Consider wrapping the execution in a try-catch block and adding logging for better observability.

 def read_channel_mails(args):
+    try:
         MailReadEvent(args.bot, args.user).execute()
+        logging.info(f"Successfully executed mail read for bot: {args.bot}")
+    except Exception as e:
+        logging.error(f"Failed to read mail for bot {args.bot}: {str(e)}")
+        raise

Committable suggestion skipped: line range outside the PR's diff.



def add_subparser(subparsers: SubParsersAction, parents: List[ArgumentParser]):
mail_parser = subparsers.add_parser(
"mail-channel-read",
conflict_handler="resolve",
formatter_class=ArgumentDefaultsHelpFormatter,
parents=parents,
help="Mail channel initiate reading"
)
mail_parser.add_argument('bot',
type=str,
help="Bot id for which command is executed", action='store')

mail_parser.add_argument('user',
type=str,
help="Kairon user who is initiating the command", action='store')

mail_parser.set_defaults(func=read_channel_mails)
5 changes: 4 additions & 1 deletion kairon/events/definitions/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
from kairon.events.definitions.history_delete import DeleteHistoryEvent
from kairon.events.definitions.mail_channel import MailProcessEvent, MailReadEvent
from kairon.events.definitions.message_broadcast import MessageBroadcastEvent
from kairon.events.definitions.model_testing import ModelTestingEvent
from kairon.events.definitions.model_training import ModelTrainingEvent
Expand All @@ -20,7 +21,9 @@ class EventFactory:
EventClass.multilingual: MultilingualEvent,
EventClass.faq_importer: FaqDataImporterEvent,
EventClass.message_broadcast: MessageBroadcastEvent,
EventClass.content_importer: DocContentImporterEvent
EventClass.content_importer: DocContentImporterEvent,
EventClass.mail_channel_read_mails: MailReadEvent,
EventClass.mail_channel_process_mails: MailProcessEvent
}

@staticmethod
Expand Down
98 changes: 98 additions & 0 deletions kairon/events/definitions/mail_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import Text
from loguru import logger
from kairon import Utility
from kairon.events.definitions.base import EventsBase
from kairon.exceptions import AppException
from kairon.shared.channels.mail.processor import MailProcessor
from kairon.shared.constants import EventClass


class MailProcessEvent(EventsBase):
"""
Event to start mail channel scheduler if not already running.
"""

def __init__(self, bot: Text, user: Text, **kwargs):
"""
Initialise event.
"""
self.bot = bot
self.user = user

def validate(self):
"""
validate mail channel exists
"""
return MailProcessor.validate_smtp_connection(self.bot)


def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
mails: list = kwargs.get('mails', [])
payload = {'bot': self.bot, 'user': self.user, 'mails': mails}
Utility.request_event_server(EventClass.mail_channel_process_mails, payload)
except Exception as e:
logger.error(str(e))
raise AppException(e)
Comment on lines +37 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, it's recommended to use exception chaining with raise ... from e to preserve the original traceback and provide better context.

Apply this diff to fix the issue:

37     except Exception as e:
38         logger.error(str(e))
-39         raise AppException(e)
+39         raise AppException(e) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
logger.error(str(e))
raise AppException(e)
except Exception as e:
logger.error(str(e))
raise AppException(e) from e
🧰 Tools
🪛 Ruff (0.8.0)

39-39: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


def execute(self, **kwargs):
"""
Execute the event.
"""
try:
mails = kwargs.get('mails')
if mails:
MailProcessor.process_message_task(self.bot, mails)
except Exception as e:
logger.error(str(e))
raise AppException(e)
Comment on lines +49 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, use exception chaining with raise ... from e to preserve the original traceback and provide better context.

Apply this diff to fix the issue:

49     except Exception as e:
50         logger.error(str(e))
-51         raise AppException(e)
+51         raise AppException(e) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
logger.error(str(e))
raise AppException(e)
except Exception as e:
logger.error(str(e))
raise AppException(e) from e
🧰 Tools
🪛 Ruff (0.8.0)

51-51: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)




class MailReadEvent(EventsBase):
"""
Event to read mails from mail channel and create events for each mail tp process them via bot
"""

def __init__(self, bot: Text, user: Text, **kwargs):
"""
Initialise event.
"""
self.bot = bot
self.user = user

def validate(self):
"""
validate mail channel exists
"""
return MailProcessor.validate_imap_connection(self.bot)

def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
payload = {'bot': self.bot, 'user': self.user}
Utility.request_event_server(EventClass.mail_channel_read_mails, payload)
except Exception as e:
logger.error(str(e))
raise AppException(e)

Comment on lines +81 to +83
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, use exception chaining with raise ... from e to preserve the original traceback and provide better context.

Apply this diff to fix the issue:

81     except Exception as e:
82         logger.error(str(e))
-83         raise AppException(e)
+83         raise AppException(e) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.error(str(e))
raise AppException(e)
logger.error(str(e))
raise AppException(e) from e
🧰 Tools
🪛 Ruff (0.8.0)

82-82: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

def execute(self, **kwargs):
"""
Execute the event.
"""
try:
vals = MailProcessor.read_mails(self.bot)
print(vals)
emails, _, _ = vals
for email in emails:
ev = MailProcessEvent(self.bot, self.user)
ev.validate()
ev.enqueue(mails=[email])

except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}")
Comment on lines +97 to +98
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, use exception chaining with raise ... from e to preserve the original traceback and provide better context.

Apply this diff to fix the issue:

97     except Exception as e:
-98         raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}")
+98         raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}")
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {self.bot}. Error: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.8.0)

98-98: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

7 changes: 7 additions & 0 deletions kairon/events/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def lifespan(app: FastAPI):
""" MongoDB is connected on the bot trainer startup """
config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"])
connect(**config)
EventUtility.reschedule_all_bots_channel_mail_reading()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for mail rescheduling during startup

The mail rescheduling call during startup could fail silently. Consider wrapping it in a try-catch block to log any failures without preventing server startup.

-    EventUtility.reschedule_all_bots_channel_mail_reading()
+    try:
+        EventUtility.reschedule_all_bots_channel_mail_reading()
+    except Exception as e:
+        logger.error(f"Failed to reschedule mail reading during startup: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
EventUtility.reschedule_all_bots_channel_mail_reading()
try:
EventUtility.reschedule_all_bots_channel_mail_reading()
except Exception as e:
logger.error(f"Failed to reschedule mail reading during startup: {str(e)}")

yield
disconnect()

Expand Down Expand Up @@ -144,3 +145,9 @@ def delete_scheduled_event(event_id: Text = Path(description="Event id")):
def dispatch_scheduled_event(event_id: Text = Path(description="Event id")):
KScheduler().dispatch_event(event_id)
return {"data": None, "message": "Scheduled event dispatch!"}


@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id")):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
Comment on lines +150 to +153
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for bot parameter

The endpoint should validate the bot ID parameter to ensure it's not empty or malformed.

 @app.get('/api/mail/schedule/{bot}', response_model=Response)
-def request_epoch(bot: Text = Path(description="Bot id")):
+def request_epoch(bot: Text = Path(description="Bot id", min_length=1)):
     EventUtility.schedule_channel_mail_reading(bot)
     return {"data": None, "message": "Mail scheduler epoch request!"}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id")):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
@app.get('/api/mail/schedule/{bot}', response_model=Response)
def request_epoch(bot: Text = Path(description="Bot id", min_length=1)):
EventUtility.schedule_channel_mail_reading(bot)
return {"data": None, "message": "Mail scheduler epoch request!"}
🧰 Tools
🪛 Ruff (0.8.0)

151-151: Do not perform function call Path in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

38 changes: 38 additions & 0 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import Dict, Text

from uuid6 import uuid7

from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.kscheduler import KScheduler
from kairon.exceptions import AppException
from kairon.shared.chat.data_objects import Channels
from kairon.shared.constants import EventClass, ChannelTypes
from kairon.shared.data.constant import TASK_TYPE
from loguru import logger


class EventUtility:
Expand Down Expand Up @@ -32,3 +37,36 @@ def update_job(event_type: Text, request_data: Dict, is_scheduled: bool):
KScheduler().update_job(event_class=event_type, event_id=event_id, task_type=TASK_TYPE.EVENT.value,
**request_data)
return None, 'Scheduled event updated!'

@staticmethod
def schedule_channel_mail_reading(bot: str):
from kairon.shared.channels.mail.processor import MailProcessor

try:
mail_processor = MailProcessor(bot)
interval = mail_processor.config.get("interval", 60)
event_id = mail_processor.state.event_id
if event_id:
KScheduler().update_job(event_id,
TASK_TYPE.EVENT,
f"*/{interval} * * * *",
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
else:
event_id = uuid7().hex
mail_processor.update_event_id(event_id)
KScheduler().add_job(event_id,
TASK_TYPE.EVENT,
f"*/{interval} * * * *",
EventClass.mail_channel_read_mails, {"bot": bot, "user": mail_processor.bot_settings.user})
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")
Comment on lines +61 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, it's recommended to use exception chaining with raise ... from e to provide better context and preserve the original traceback.

Apply this diff to fix the issue:

61     except Exception as e:
-62         raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")
+62         raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}")
except Exception as e:
raise AppException(f"Failed to schedule mail reading for bot {bot}. Error: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.8.0)

62-62: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


@staticmethod
def reschedule_all_bots_channel_mail_reading():
try:
bots = list(Channels.objects(connector_type= ChannelTypes.MAIL.value).distinct("bot"))
for bot in bots:
logger.info(f"Rescheduling mail reading for bot {bot}")
EventUtility.schedule_channel_mail_reading(bot)
except Exception as e:
raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}")
Comment on lines +71 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use exception chaining when re-raising exceptions

When re-raising an exception inside an except block, use exception chaining with raise ... from e to provide better context and preserve the original traceback.

Apply this diff to fix the issue:

71     except Exception as e:
-72         raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}")
+72         raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}")
except Exception as e:
raise AppException(f"Failed to reschedule mail reading events. Error: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.8.0)

72-72: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Empty file.
11 changes: 11 additions & 0 deletions kairon/shared/channels/mail/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@


class MailConstants:
DEFAULT_SMTP_SERVER = 'smtp.gmail.com'
DEFAULT_IMAP_SERVER = 'imap.gmail.com'
DEFAULT_SMTP_PORT = 587

DEFAULT_TEMPLATE = "<p>{bot_response}</p> <br/><br/><span style='color:#999;'> Generated by kAIron AI.</span>\n"

PROCESS_MESSAGE_BATCH_SIZE = 4

Loading
Loading