Skip to content

Commit

Permalink
Bot Specific Executor Logs (#1572)
Browse files Browse the repository at this point in the history
* Added code related to bot specific executor logs.
Added and fixed test cases related to the same.

* Added code related to bot specific executor logs.
Added and fixed test cases related to the same.

* Added code related to bot specific executor logs.
Added and fixed test cases related to the same.

* changed tests according to code rabbit suggestions.
  • Loading branch information
maheshsattala authored Oct 18, 2024
1 parent 936930c commit 8203cc5
Show file tree
Hide file tree
Showing 12 changed files with 905 additions and 44 deletions.
4 changes: 3 additions & 1 deletion kairon/actions/definitions/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kairon.shared.actions.models import ActionType
from kairon.shared.actions.utils import ActionUtility
from kairon.shared.callback.data_objects import CallbackConfig
from kairon.shared.data.constant import TASK_TYPE


class ActionSchedule(ActionsBase):
Expand Down Expand Up @@ -136,12 +137,13 @@ async def add_schedule_job(self,
date_time: datetime,
data: Dict,
timezone: Text,
kwargs=None):
**kwargs):
func = obj_to_ref(ExecutorFactory.get_executor().execute_task)

_id = uuid7().hex
data['predefined_objects']['event'] = _id
args = (func, "scheduler_evaluator", data,)
kwargs.update({'task_type': TASK_TYPE.ACTION.value})
trigger = DateTrigger(run_date=date_time, timezone=timezone)

next_run_time = trigger.get_next_fire_time(None, datetime.now(astimezone(timezone) or get_localzone()))
Expand Down
3 changes: 2 additions & 1 deletion kairon/actions/definitions/web_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
if not ActionUtility.is_empty(latest_msg):
results = ActionUtility.perform_web_search(latest_msg,
topn=action_config.get("topn"),
website=action_config.get("website"))
website=action_config.get("website"),
bot=self.bot)
if results:
bot_response = ActionUtility.format_search_result(results)
if not ActionUtility.is_empty(action_config.get('set_slot')):
Expand Down
22 changes: 22 additions & 0 deletions kairon/api/app/routers/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from kairon.shared.data.processor import MongoProcessor
from kairon.shared.data.training_data_generation_processor import TrainingDataGenerationProcessor
from kairon.shared.data.utils import DataUtility
from kairon.shared.events.processor import ExecutorProcessor
from kairon.shared.importer.data_objects import ValidationLogs
from kairon.shared.importer.processor import DataImporterLogProcessor
from kairon.shared.live_agent.live_agent import LiveAgentHandler
Expand Down Expand Up @@ -1704,6 +1705,27 @@ async def get_llm_logs(
return Response(data=data)


@router.get("/executor/logs", response_model=Response)
async def get_executor_logs(
start_idx: int = 0, page_size: int = 10,
event_class: str = None, task_type: str = None,
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=TESTER_ACCESS)
):
"""
Get executor logs data based on filters provided.
"""
logs = list(ExecutorProcessor.get_executor_logs(current_user.get_bot(), start_idx, page_size,
event_class=event_class, task_type=task_type))
row_cnt = ExecutorProcessor.get_row_count(current_user.get_bot(),
event_class=event_class,
task_type=task_type)
data = {
"logs": logs,
"total": row_cnt
}
return Response(data=data)


@router.get("/metadata/llm", response_model=Response)
async def get_llm_metadata(
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=TESTER_ACCESS)) -> Response:
Expand Down
4 changes: 2 additions & 2 deletions kairon/events/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod

from typing import Any

from kairon.shared.constants import EventClass
from kairon.shared.data.constant import EVENT_STATUS, TASK_TYPE
Expand All @@ -13,7 +13,7 @@ class ExecutorBase:
def execute_task(self, event_class: EventClass, data: dict, **kwargs):
raise NotImplementedError("Provider not implemented")

def log_task(self, event_class: EventClass, task_type: TASK_TYPE, data: dict, status: EVENT_STATUS, **kwargs):
def log_task(self, event_class: EventClass, task_type: TASK_TYPE, data: Any, status: EVENT_STATUS, **kwargs):
from bson import ObjectId
from kairon.shared.cloud.utils import CloudUtility

Expand Down
2 changes: 1 addition & 1 deletion kairon/shared/actions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def perform_web_search(search_term: str, **kwargs):
trigger_task = Utility.environment['web_search']['trigger_task']
search_engine_url = Utility.environment['web_search']['url']
website = kwargs.get('website') if kwargs.get('website') else ''
request_body = {"text": search_term, "site": website, "topn": kwargs.get("topn")}
request_body = {"text": search_term, "site": website, "topn": kwargs.get("topn"), "bot": kwargs.get("bot")}
results = []
try:
if trigger_task:
Expand Down
45 changes: 44 additions & 1 deletion kairon/shared/cloud/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

import ujson as json
import os
import time
Expand Down Expand Up @@ -57,7 +59,7 @@ def delete_file(bucket, file):
s3.delete_object(Bucket=bucket, Key=file)

@staticmethod
def trigger_lambda(event_class: EventClass, env_data: dict, task_type: TASK_TYPE = TASK_TYPE.ACTION.value,
def trigger_lambda(event_class: EventClass, env_data: Any, task_type: TASK_TYPE = TASK_TYPE.ACTION.value,
from_executor: bool = False):
"""
Triggers lambda based on the event class.
Expand Down Expand Up @@ -103,6 +105,9 @@ def log_task(event_class: EventClass, task_type: TASK_TYPE, data: dict, status:
from kairon.shared.events.data_objects import ExecutorLogs

executor_log_id = kwargs.get("executor_log_id") if kwargs.get("executor_log_id") else ObjectId().__str__()
bot_id = CloudUtility.get_bot_id_from_env_data(event_class, data,
from_executor=kwargs.get("from_executor", False),
task_type=task_type)

try:
log = ExecutorLogs.objects(executor_log_id=executor_log_id, task_type=task_type, event_class=event_class,
Expand All @@ -116,9 +121,47 @@ def log_task(event_class: EventClass, task_type: TASK_TYPE, data: dict, status:
for key, value in kwargs.items():
if not getattr(log, key, None) and Utility.is_picklable_for_mongo({key: value}):
setattr(log, key, value)
log.bot = bot_id
log.save()
return executor_log_id

@staticmethod
def get_bot_id_from_env_data(event_class: EventClass, data: Any, **kwargs):
bot = None
from_executor = kwargs.get("from_executor")

if isinstance(data, dict) and 'bot' in data:
bot = data['bot']

elif event_class == EventClass.web_search:
bot = data.get('bot')

elif event_class == EventClass.pyscript_evaluator:
predefined_objects = data.get('predefined_objects', {})

if 'slot' in predefined_objects and 'bot' in predefined_objects['slot']:
bot = predefined_objects['slot']['bot']

task_type = kwargs.get("task_type")
if task_type == "Callback" and 'bot' in predefined_objects:
bot = predefined_objects['bot']

elif event_class == EventClass.scheduler_evaluator and isinstance(data, list):
for item in data:
if item.get('name') == 'PREDEFINED_OBJECTS':
predefined_objects = item.get('value', {})
if 'bot' in predefined_objects:
bot = predefined_objects['bot']
break

elif from_executor and isinstance(data, list):
for item in data:
if item.get('name') == 'BOT':
bot = item.get('value')
break

return bot

@staticmethod
def lambda_execution_failed(response):
return (response['StatusCode'] != 200 or
Expand Down
41 changes: 41 additions & 0 deletions kairon/shared/events/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from kairon.shared.events.data_objects import ExecutorLogs


class ExecutorProcessor:

@staticmethod
def get_executor_logs(bot: str, start_idx: int = 0, page_size: int = 10, **kwargs):
"""
Get all executor logs data .
@param bot: bot id.
@param start_idx: start index
@param page_size: page size
@return: list of logs.
"""
event_class = kwargs.get("event_class")
task_type = kwargs.get("task_type")
query = {"bot": bot}
if event_class:
query.update({"event_class": event_class})
if task_type:
query.update({"task_type": task_type})
for log in ExecutorLogs.objects(**query).order_by("-timestamp").skip(start_idx).limit(page_size):
executor_logs = log.to_mongo().to_dict()
executor_logs.pop('_id')
yield executor_logs

@staticmethod
def get_row_count(bot: str, **kwargs):
"""
Gets the count of rows in a ExecutorLogs for a particular bot.
:param bot: bot id
:return: Count of rows
"""
event_class = kwargs.get("event_class")
task_type = kwargs.get("task_type")
query = {"bot": bot}
if event_class:
query.update({"event_class": event_class})
if task_type:
query.update({"task_type": task_type})
return ExecutorLogs.objects(**query).count()
10 changes: 5 additions & 5 deletions tests/integration_test/action_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8312,7 +8312,7 @@ def test_process_web_search_action():

def _perform_web_search(*args, **kwargs):
assert args == ('What is data?',)
assert kwargs == {'topn': 1, 'website': 'https://www.w3schools.com/'}
assert kwargs == {'topn': 1, 'website': 'https://www.w3schools.com/', 'bot': bot}
return [{
'title': 'Data Science Introduction - W3Schools',
'text': "Data Science is a combination of multiple disciplines that uses statistics, data analysis, and machine learning to analyze data and to extract knowledge and insights from it. What is Data Science? Data Science is about data gathering, analysis and decision-making.",
Expand Down Expand Up @@ -8437,7 +8437,7 @@ def test_process_web_search_action_with_search_engine_url():
status=200,
match=[
responses.matchers.json_params_matcher({
"text": 'What is data science?', "site": '', "topn": 1
"text": 'What is data science?', "site": '', "topn": 1, 'bot': bot
})],
)

Expand Down Expand Up @@ -8545,7 +8545,7 @@ def test_process_web_search_action_with_kairon_user_msg_entity():

def _perform_web_search(*args, **kwargs):
assert args == ('my public search text',)
assert kwargs == {'topn': 2, 'website': None}
assert kwargs == {'topn': 2, 'website': None, 'bot': bot}
return [
{'title': 'What is Data Science? | IBM',
'text': 'Data science combines math, statistics, programming, analytics, AI, and machine learning to uncover insights from data. Learn how data science works, what it entails, and how it differs from data science and BI.',
Expand Down Expand Up @@ -8665,7 +8665,7 @@ def test_process_web_search_action_without_kairon_user_msg_entity():

def _perform_web_search(*args, **kwargs):
assert args == ('/action_public_search',)
assert kwargs == {'topn': 2, 'website': None}
assert kwargs == {'topn': 2, 'website': None, 'bot': bot}
return [
{'title': 'What is Data Science? | IBM',
'text': 'Data science combines math, statistics, programming, analytics, AI, and machine learning to uncover insights from data. Learn how data science works, what it entails, and how it differs from data science and BI.',
Expand Down Expand Up @@ -8785,7 +8785,7 @@ def test_process_web_search_action_dispatch_false():

def _perform_web_search(*args, **kwargs):
assert args == ('What is Python?',)
assert kwargs == {'topn': 1, 'website': None}
assert kwargs == {'topn': 1, 'website': None, 'bot': bot}
return [
{'title': 'Python.org - What is Python? Executive Summary',
'text': 'Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.',
Expand Down
Loading

0 comments on commit 8203cc5

Please sign in to comment.