From fc51661b383a50da8f15acf285cf19d778d7fee6 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 10 Nov 2023 16:09:42 +0530 Subject: [PATCH] Modified is_event_in_progress of every event. --- kairon/api/app/routers/bot/bot.py | 2 +- kairon/shared/data/history_log_processor.py | 3 +- kairon/shared/data/model_processor.py | 42 ++---------------- kairon/shared/data/processor.py | 44 ++++++++++++++++++- .../training_data_generation_processor.py | 2 +- kairon/shared/importer/processor.py | 3 +- kairon/shared/multilingual/processor.py | 3 +- kairon/shared/test/processor.py | 3 +- .../data_processor/data_processor_test.py | 35 ++++++++++++--- .../unit_test/data_processor/history_test.py | 6 +++ 10 files changed, 89 insertions(+), 54 deletions(-) diff --git a/kairon/api/app/routers/bot/bot.py b/kairon/api/app/routers/bot/bot.py index 54b6ec5b1..32bc848a4 100644 --- a/kairon/api/app/routers/bot/bot.py +++ b/kairon/api/app/routers/bot/bot.py @@ -519,7 +519,7 @@ async def abort_event( """ Aborts the event """ - ModelProcessor.abort_current_event(current_user.get_bot(), event_type) + mongo_processor.abort_current_event(current_user.get_bot(), event_type) return {"message": f"{event_type} aborted."} diff --git a/kairon/shared/data/history_log_processor.py b/kairon/shared/data/history_log_processor.py index 45736c9df..2166d1b4a 100644 --- a/kairon/shared/data/history_log_processor.py +++ b/kairon/shared/data/history_log_processor.py @@ -47,7 +47,8 @@ def is_event_in_progress(bot: str, raise_exception=True): try: ConversationsHistoryDeleteLogs.objects(bot=bot).filter( Q(status__ne=EVENT_STATUS.COMPLETED.value) & - Q(status__ne=EVENT_STATUS.FAIL.value)).get() + Q(status__ne=EVENT_STATUS.FAIL.value) & + Q(status__ne=EVENT_STATUS.ABORTED.value)).get() if raise_exception: raise AppException("Event already in progress! Check logs.") diff --git a/kairon/shared/data/model_processor.py b/kairon/shared/data/model_processor.py index 6d352fd3e..3ab3e9c97 100644 --- a/kairon/shared/data/model_processor.py +++ b/kairon/shared/data/model_processor.py @@ -5,12 +5,7 @@ from mongoengine.errors import DoesNotExist from kairon.exceptions import AppException from .constant import EVENT_STATUS -from .data_objects import ModelTraining, BotSettings, ConversationsHistoryDeleteLogs, TrainingDataGenerator -from ..chat.broadcast.data_objects import MessageBroadcastLogs -from ..constants import EventClass -from ..importer.data_objects import ValidationLogs -from ..multilingual.data_objects import BotReplicationLogs -from ..test.data_objects import ModelTestingLogs +from .data_objects import ModelTraining, BotSettings class ModelProcessor: @@ -58,38 +53,6 @@ def set_training_status( doc.exception = exception doc.save() - @staticmethod - def abort_current_event(bot: Text, event_type: EventClass): - """ - sets event status to aborted if there is any event in progress or enqueued - - :param bot: bot id - :param event_type: type of the event - :return: None - :raises: AppException - """ - events_dict = { - EventClass.model_training: ModelTraining, - EventClass.model_testing: ModelTestingLogs, - EventClass.delete_history: ConversationsHistoryDeleteLogs, - EventClass.data_importer: ValidationLogs, - EventClass.multilingual: BotReplicationLogs, - EventClass.data_generator: TrainingDataGenerator, - EventClass.faq_importer: ValidationLogs, - EventClass.message_broadcast: MessageBroadcastLogs - } - event_data_object = events_dict.get(event_type) - if event_data_object: - try: - event_object = event_data_object.objects.get( - bot=bot, - status__in=[EVENT_STATUS.INPROGRESS.value, EVENT_STATUS.ENQUEUED.value] - ) - event_object.status = EVENT_STATUS.ABORTED.value - event_object.save() - except DoesNotExist: - raise AppException(f"No Enqueued {event_type} present for this bot.") - @staticmethod def is_training_inprogress(bot: Text, raise_exception=True): """ @@ -102,7 +65,8 @@ def is_training_inprogress(bot: Text, raise_exception=True): """ if ModelTraining.objects(bot=bot).filter( Q(status__ne=EVENT_STATUS.DONE.value) & - Q(status__ne=EVENT_STATUS.FAIL.value)).count(): + Q(status__ne=EVENT_STATUS.FAIL.value) & + Q(status__ne=EVENT_STATUS.ABORTED.value)).count(): if raise_exception: raise AppException("Previous model training in progress.") else: diff --git a/kairon/shared/data/processor.py b/kairon/shared/data/processor.py index 7a53bdec6..8aa753ed1 100644 --- a/kairon/shared/data/processor.py +++ b/kairon/shared/data/processor.py @@ -90,11 +90,15 @@ Rules, Utterances, BotSettings, ChatClientConfig, SlotMapping, KeyVault, EventConfig, TrainingDataGenerator, MultiflowStories, MultiflowStoryEvents, MultiFlowStoryMetadata, - Synonyms, Lookup, Analytics + Synonyms, Lookup, Analytics, ModelTraining, ConversationsHistoryDeleteLogs ) from .utils import DataUtility -from ..constants import KaironSystemSlots, PluginTypes +from ..chat.broadcast.data_objects import MessageBroadcastLogs +from ..constants import KaironSystemSlots, PluginTypes, EventClass from ..custom_widgets.data_objects import CustomWidgets +from ..importer.data_objects import ValidationLogs +from ..multilingual.data_objects import BotReplicationLogs +from ..test.data_objects import ModelTestingLogs class MongoProcessor: @@ -3537,6 +3541,42 @@ def delete_slot( "Slot does not exist." ) + @staticmethod + def abort_current_event(bot: Text, event_type: EventClass): + """ + sets event status to aborted if there is any event in progress or enqueued + + :param bot: bot id + :param event_type: type of the event + :return: None + :raises: AppException + """ + events_dict = { + EventClass.model_training: ModelTraining, + EventClass.model_testing: ModelTestingLogs, + EventClass.delete_history: ConversationsHistoryDeleteLogs, + EventClass.data_importer: ValidationLogs, + EventClass.multilingual: BotReplicationLogs, + EventClass.data_generator: TrainingDataGenerator, + EventClass.faq_importer: ValidationLogs, + EventClass.message_broadcast: MessageBroadcastLogs + } + status_field = "status" if event_type in {EventClass.model_training, + EventClass.delete_history, + EventClass.data_generator} else "event_status" + event_data_object = events_dict.get(event_type) + if event_data_object: + try: + filter_params = {'bot': bot, + f'{status_field}__in': [EVENT_STATUS.INPROGRESS.value, EVENT_STATUS.ENQUEUED.value]} + + event_object = event_data_object.objects.get(**filter_params) + update_params = {f'set__{status_field}': EVENT_STATUS.ABORTED.value} + event_object.update(**update_params) + event_object.save() + except DoesNotExist: + raise AppException(f"No Enqueued {event_type} present for this bot.") + @staticmethod def get_row_count(document: Document, bot: str, **kwargs): """ diff --git a/kairon/shared/data/training_data_generation_processor.py b/kairon/shared/data/training_data_generation_processor.py index 426d37ba5..e3a590cce 100644 --- a/kairon/shared/data/training_data_generation_processor.py +++ b/kairon/shared/data/training_data_generation_processor.py @@ -136,7 +136,7 @@ def is_in_progress(bot: Text, raise_exception=True): """ if TrainingDataGenerator.objects(__raw__={ "bot": bot, - "status": {"$nin": [EVENT_STATUS.FAIL.value, EVENT_STATUS.COMPLETED.value]} + "status": {"$nin": [EVENT_STATUS.FAIL.value, EVENT_STATUS.COMPLETED.value, EVENT_STATUS.ABORTED.value]} }).count(): if raise_exception: raise AppException("Event already in progress! Check logs.") diff --git a/kairon/shared/importer/processor.py b/kairon/shared/importer/processor.py index 0c197d203..b43ccbb81 100644 --- a/kairon/shared/importer/processor.py +++ b/kairon/shared/importer/processor.py @@ -116,7 +116,8 @@ def is_event_in_progress(bot: str, raise_exception=True): try: ValidationLogs.objects(bot=bot).filter( Q(event_status__ne=EVENT_STATUS.COMPLETED.value) & - Q(event_status__ne=EVENT_STATUS.FAIL.value)).get() + Q(event_status__ne=EVENT_STATUS.FAIL.value) & + Q(event_status__ne=EVENT_STATUS.ABORTED.value)).get() if raise_exception: raise AppException("Event already in progress! Check logs.") diff --git a/kairon/shared/multilingual/processor.py b/kairon/shared/multilingual/processor.py index b5a4b4d15..3870537f3 100644 --- a/kairon/shared/multilingual/processor.py +++ b/kairon/shared/multilingual/processor.py @@ -106,7 +106,8 @@ def is_event_in_progress(source_bot: str, raise_exception=True): try: BotReplicationLogs.objects(bot=source_bot).filter( Q(event_status__ne=EVENT_STATUS.COMPLETED.value) & - Q(event_status__ne=EVENT_STATUS.FAIL.value)).get() + Q(event_status__ne=EVENT_STATUS.FAIL.value) & + Q(event_status__ne=EVENT_STATUS.ABORTED.value)).get() if raise_exception: raise AppException("Event already in progress! Check logs.") diff --git a/kairon/shared/test/processor.py b/kairon/shared/test/processor.py index c869359ae..1709eaf25 100644 --- a/kairon/shared/test/processor.py +++ b/kairon/shared/test/processor.py @@ -80,7 +80,8 @@ def is_event_in_progress(bot: str, raise_exception=True): try: ModelTestingLogs.objects(bot=bot, type='common').filter( Q(event_status__ne=EVENT_STATUS.COMPLETED.value) & - Q(event_status__ne=EVENT_STATUS.FAIL.value)).get() + Q(event_status__ne=EVENT_STATUS.FAIL.value) & + Q(event_status__ne=EVENT_STATUS.ABORTED.value)).get() if raise_exception: raise AppException("Event already in progress! Check logs.") diff --git a/tests/unit_test/data_processor/data_processor_test.py b/tests/unit_test/data_processor/data_processor_test.py index ba8d71c8b..6a48ff294 100644 --- a/tests/unit_test/data_processor/data_processor_test.py +++ b/tests/unit_test/data_processor/data_processor_test.py @@ -51,7 +51,7 @@ from kairon.shared.chat.data_objects import Channels from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema from kairon.shared.cognition.processor import CognitionDataProcessor -from kairon.shared.constants import SLOT_SET_TYPE +from kairon.shared.constants import SLOT_SET_TYPE, EventClass from kairon.shared.data.audit.data_objects import AuditLogData from kairon.shared.data.constant import ENDPOINT_TYPE from kairon.shared.data.constant import UTTERANCE_TYPE, EVENT_STATUS, STORY_EVENT, ALLOWED_DOMAIN_FORMATS, \ @@ -963,38 +963,53 @@ def test_edit_bot_settings(self): assert updated_settings.llm_settings.to_mongo().to_dict() == {'enable_faq': False, 'provider': 'openai'} def test_abort_current_event_with_no_model_training_event(self): + mongo_processor = MongoProcessor() bot = "test_bot" with pytest.raises(AppException, match="No Enqueued model_training present for this bot."): - ModelProcessor.abort_current_event(bot=bot, event_type="model_training") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.model_training) def test_abort_current_event_with_no_model_testing_event(self): + mongo_processor = MongoProcessor() bot = "test_bot" with pytest.raises(AppException, match="No Enqueued model_testing present for this bot."): - ModelProcessor.abort_current_event(bot=bot, event_type="model_testing") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.model_testing) def test_abort_current_event_with_no_delete_history_event(self): + mongo_processor = MongoProcessor() bot = "test_bot" with pytest.raises(AppException, match="No Enqueued delete_history present for this bot."): - ModelProcessor.abort_current_event(bot=bot, event_type="delete_history") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.delete_history) def test_abort_current_event_with_no_data_importer_event(self): + mongo_processor = MongoProcessor() bot = "test_bot" with pytest.raises(AppException, match="No Enqueued data_importer present for this bot."): - ModelProcessor.abort_current_event(bot=bot, event_type="data_importer") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.data_importer) def test_abort_current_event_with_model_training(self): + mongo_processor = MongoProcessor() bot = "test_bot" user = "test_user" ModelProcessor.set_training_status(bot=bot, user=user, status=EVENT_STATUS.ENQUEUED.value) - ModelProcessor.abort_current_event(bot=bot, event_type="model_training") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.model_training) model_training_object = ModelTraining.objects(bot=bot).get() assert model_training_object.status == EVENT_STATUS.ABORTED.value + def test_abort_current_event_with_model_testing(self): + mongo_processor = MongoProcessor() + bot = "test_bot" + user = "test_user" + ModelTestingLogProcessor.log_test_result(bot=bot, user=user, event_status=EVENT_STATUS.ENQUEUED.value) + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.model_testing) + model_testing_object = ModelTestingLogs.objects(bot=bot).get() + assert model_testing_object.event_status == EVENT_STATUS.ABORTED.value + def test_abort_current_event_with_data_generator(self): + mongo_processor = MongoProcessor() bot = "test_bot" user = "test_user" TrainingDataGenerationProcessor.set_status(bot=bot, user=user, status=EVENT_STATUS.ENQUEUED.value) - ModelProcessor.abort_current_event(bot=bot, event_type="data_generator") + mongo_processor.abort_current_event(bot=bot, event_type=EventClass.data_generator) data_generator_object = TrainingDataGenerator.objects(bot=bot).get() assert data_generator_object.status == EVENT_STATUS.ABORTED.value @@ -15205,6 +15220,12 @@ def test_is_training_inprogress_False(self): actual_response = ModelProcessor.is_training_inprogress("tests") assert actual_response is False + def test_is_training_inprogress_with_aborted(self): + ModelProcessor.set_training_status("testbot", "testuser", "Aborted") + model_training = ModelTraining.objects(bot="testbot", status="Aborted") + actual_response = ModelProcessor.is_training_inprogress("tests", False) + assert actual_response is False + def test_is_training_inprogress_True(self, test_set_training_status_inprogress): assert test_set_training_status_inprogress.__len__() == 1 assert test_set_training_status_inprogress.first().bot == "tests" diff --git a/tests/unit_test/data_processor/history_test.py b/tests/unit_test/data_processor/history_test.py index b30065cbf..5e9679a18 100644 --- a/tests/unit_test/data_processor/history_test.py +++ b/tests/unit_test/data_processor/history_test.py @@ -107,6 +107,12 @@ def test_delete_bot_history(self, mock_client): def test_is_event_in_progress(self, get_connection_delete_history): assert not HistoryDeletionLogProcessor.is_event_in_progress('5f1928bda7c0280ca4869da3') + def test_is_event_in_progress_with_aborted(self, get_connection_delete_history): + till_date = datetime.utcnow().date() + HistoryDeletionLogProcessor.add_log('5f1928bda7c0280ca4869da3', 'test_user', + till_date, status='Aborted') + assert not HistoryDeletionLogProcessor.is_event_in_progress('5f1928bda7c0280ca4869da3', False) + def test_is_event_in_progress_failure(self, get_connection_delete_history): till_date = datetime.utcnow().date() HistoryDeletionLogProcessor.add_log('5f1928bda7c0280ca4869da3', 'test_user',