diff --git a/kairon/api/app/routers/bot/data.py b/kairon/api/app/routers/bot/data.py index f3075f0a8..42b79b217 100644 --- a/kairon/api/app/routers/bot/data.py +++ b/kairon/api/app/routers/bot/data.py @@ -326,3 +326,34 @@ async def download_error_csv( data=None, error_code=e.status_code ) + +@router.post("/cognition/sync", response_model=Response) +async def knowledge_vault_sync( + primary_key_col: str, + collection_name: str, + data: List[dict], + current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), +): + """ + Validates and syncs data to the specified MongoDB collection and vector database. + """ + data = [{key.lower(): value for key, value in row.items()} for row in data] + + error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), data, current_user.get_bot()) + + if error_summary: + return Response( + success=False, + message="Validation failed", + data=error_summary, + error_code=400 + ) + + await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data, + current_user.get_bot(), current_user.get_user()) + + return Response( + success=True, + message="Processing completed successfully", + data=None + ) \ No newline at end of file diff --git a/kairon/shared/cognition/processor.py b/kairon/shared/cognition/processor.py index 499a798da..4ac49c531 100644 --- a/kairon/shared/cognition/processor.py +++ b/kairon/shared/cognition/processor.py @@ -1,13 +1,15 @@ from datetime import datetime -from typing import Text, Dict, Any +from typing import Text, Dict, Any, List from loguru import logger from mongoengine import DoesNotExist, Q +from pydantic import constr, create_model, ValidationError from kairon import Utility from kairon.exceptions import AppException from kairon.shared.actions.data_objects import PromptAction, DatabaseAction from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema, ColumnMetadata, CollectionData +from kairon.shared.data.constant import DEFAULT_LLM from kairon.shared.data.processor import MongoProcessor from kairon.shared.models import CognitionDataType, CognitionMetadataType @@ -414,3 +416,157 @@ def validate_collection_name(bot: Text, collection: Text): raise AppException(f'Cannot remove collection {collection} linked to action "{prompt_action[0].name}"!') if database_action: raise AppException(f'Cannot remove collection {collection} linked to action "{database_action[0].name}"!') + + @staticmethod + def get_pydantic_type(data_type: str): + if data_type == 'str': + return (constr(strict=True, min_length=1), ...) + elif data_type == 'int': + return (int, ...) + elif data_type == 'float': + return (float, ...) + else: + raise ValueError(f"Unsupported data type: {data_type}") + + def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict: + """ + Validates each dictionary in the data list according to the expected schema from column_dict. + + Args: + data: List of dictionaries where each dictionary represents a row to be validated. + collection_name: The name of the collection (table name). + bot: The bot identifier. + primary_key_col: The primary key column for identifying rows. + + Returns: + Dict: Summary of validation errors, if any. + """ + if not CognitionSchema.objects(collection_name=collection_name).first(): + raise AppException(f"Collection '{collection_name}' does not exist.") + + column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name) + + error_summary = {} + + model_fields = { + column_name: self.get_pydantic_type(data_type) + for column_name, data_type in column_dict.items() + } + DynamicModel = create_model('DynamicModel', **model_fields) + + for row in data: + row_key = row.get(primary_key_col) + if not row_key: + raise AppException(f"Primary key '{primary_key_col}' must exist in each row.") + + row_errors = [] + if set(row.keys()) != set(column_dict.keys()): + row_errors.append({ + "status": "Column headers mismatch", + "expected_columns": list(column_dict.keys()), + "actual_columns": list(row.keys()) + }) + if row_errors: + error_summary[row_key] = row_errors + continue + + try: + DynamicModel(**row) + except ValidationError as e: + error_details = [] + for error in e.errors(): + column_name = error['loc'][0] + input_value = row.get(column_name) + status = "Required Field is Empty" if input_value == "" else "Invalid DataType" + error_details.append({ + "column_name": column_name, + "input": input_value, + "status": status + }) + error_summary[row_key] = error_details + + return error_summary + + async def upsert_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str, user: Text): + """ + Upserts data into the CognitionData collection. + If document with the primary key exists, it will be updated. + If not, it will be inserted. + + Args: + primary_key_col: The primary key column name to check for uniqueness. + collection_name: The collection name (table). + data: List of rows of data to upsert. + bot: The bot identifier associated with the data. + user: The user + """ + + from kairon.shared.llm.processor import LLMProcessor + llm_processor = LLMProcessor(bot, DEFAULT_LLM) + suffix = "_faq_embd" + qdrant_collection = f"{bot}_{collection_name}{suffix}" if collection_name else f"{bot}{suffix}" + + if await llm_processor.__collection_exists__(qdrant_collection) is False: + await llm_processor.__create_collection__(qdrant_collection) + + for row in data: + row = {str(key): str(value) for key, value in row.items()} + primary_key_value = row.get(primary_key_col) + + payload = { + "data": row, + "content_type": CognitionDataType.json.value, + "collection": collection_name + } + existing_document = CognitionData.objects( + Q(bot=bot) & + Q(collection=collection_name) & + Q(**{f"data__{primary_key_col}": str(primary_key_value)}) + ).first() + + if existing_document: + if not isinstance(existing_document, dict): + existing_document = existing_document.to_mongo().to_dict() + row_id = str(existing_document["_id"]) + self.update_cognition_data(row_id, payload, user, bot) + updated_document = CognitionData.objects(id=row_id).first() + if not isinstance(updated_document, dict): + updated_document = updated_document.to_mongo().to_dict() + logger.info(f"Row with {primary_key_col}: {primary_key_value} updated in MongoDB") + await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, updated_document, user, + primary_key_col) + else: + row_id = self.save_cognition_data(payload, user, bot) + new_document = CognitionData.objects(id=row_id).first() + if not isinstance(new_document, dict): + new_document = new_document.to_mongo().to_dict() + logger.info(f"Row with {primary_key_col}: {primary_key_value} inserted in MongoDB") + await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, new_document, user, primary_key_col) + + return {"message": "Upsert complete!"} + + async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, user, primary_key_col): + """ + Syncs a document with Qdrant vector database by generating embeddings and upserting them. + + Args: + llm_processor (LLMProcessor): Instance of LLMProcessor for embedding and Qdrant operations. + collection_name (str): Name of the Qdrant collection. + bot (str): Bot identifier. + document (CognitionData): Document to sync with Qdrant. + user (Text): User performing the operation. + + Raises: + AppException: If Qdrant upsert operation fails. + """ + try: + metadata = self.find_matching_metadata(bot, document['data'], document.get('collection')) + search_payload, embedding_payload = Utility.retrieve_search_payload_and_embedding_payload( + document['data'], metadata) + embeddings = await llm_processor.get_embedding(embedding_payload, user, invocation='knowledge_vault_sync') + points = [{'id': document['vector_id'], 'vector': embeddings, 'payload': search_payload}] + await llm_processor.__collection_upsert__(collection_name, {'points': points}, + err_msg="Unable to train FAQ! Contact support") + logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.") + except Exception as e: + raise AppException(f"Failed to sync document with Qdrant: {str(e)}") diff --git a/kairon/shared/llm/processor.py b/kairon/shared/llm/processor.py index dbaca51cc..168e7273d 100644 --- a/kairon/shared/llm/processor.py +++ b/kairon/shared/llm/processor.py @@ -290,6 +290,21 @@ async def __collection_upsert__(self, collection_name: Text, data: Dict, err_msg if raise_err: raise AppException(err_msg) + async def __collection_exists__(self, collection_name: Text) -> bool: + """Check if a collection exists.""" + try: + response = await AioRestClient().request( + http_url=urljoin(self.db_url, f"/collections/{collection_name}"), + request_method="GET", + headers=self.headers, + return_json=True, + timeout=5 + ) + return response.get('status') == "ok" + except Exception as e: + logging.info(e) + return False + async def __collection_search__(self, collection_name: Text, vector: List, limit: int, score_threshold: float): client = AioRestClient() response = await client.request( diff --git a/tests/integration_test/services_test.py b/tests/integration_test/services_test.py index 8a55c9879..98a37fc12 100644 --- a/tests/integration_test/services_test.py +++ b/tests/integration_test/services_test.py @@ -10,6 +10,7 @@ from unittest.mock import patch from urllib.parse import urljoin from zipfile import ZipFile +import litellm import pytest import responses @@ -31,6 +32,8 @@ from kairon.shared.callback.data_objects import CallbackLog, CallbackRecordStatusType from kairon.shared.content_importer.content_processor import ContentImporterLogProcessor from kairon.shared.utils import Utility, MailUtility +from kairon.shared.llm.processor import LLMProcessor +import numpy as np Utility.load_system_metadata() @@ -1398,6 +1401,310 @@ def test_default_values(): assert sorted(actual["data"]["default_names"]) == sorted(expected_default_names) +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(LLMProcessor, "__collection_exists__", autospec=True) +@mock.patch.object(LLMProcessor, "__create_collection__", autospec=True) +@mock.patch.object(LLMProcessor, "__collection_upsert__", autospec=True) +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync(mock_embedding, mock_collection_exists, mock_create_collection, mock_collection_upsert): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + mock_collection_exists.return_value = False + mock_create_collection.return_value = None + mock_collection_upsert.return_value = None + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + }, + ] + + for secret in secrets: + LLMSecret(**secret).save() + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + schema_response = response.json() + assert schema_response["message"] == "Schema saved!" + assert schema_response["error_code"] == 0 + + dummy_data = { + "id": "1", + "item": "Juice", + "price": "2.00", + "quantity": "9" + } + dummy_doc = CognitionData( + data=dummy_data, + content_type="json", + collection="groceries", + user="himanshu.gupta@digite.com", + bot=pytest.bot, + timestamp=datetime.utcnow() + ) + dummy_doc.save() + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 1 + + sync_data = [ + {"id": 1, "item": "Juice", "price": "2.50", "quantity": "10"}, + {"id": 2, "item": "Apples", "price": "1.20", "quantity": "20"} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual= response.json() + print(actual) + assert actual["success"] + assert actual["message"] == "Processing completed successfully" + assert actual["error_code"] == 0 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 2 + + expected_data = [ + {"id": "1", "item": "Juice", "price": "2.50", "quantity": "10"}, + {"id": "2", "item": "Apples", "price": "1.20", "quantity": "20"} + ] + + for index, doc in enumerate(cognition_data): + doc_data = doc.to_mongo().to_dict()["data"] + assert doc_data == expected_data[index] + + expected_calls = [ + { + "model": "text-embedding-3-small", + "input": ['{"id":1,"item":"Juice","price":2.5,"quantity":10}'], # First input + "metadata": {'user': 'integration@demo.ai', 'bot': pytest.bot, 'invocation': 'knowledge_vault_sync'}, + "api_key": "common_openai_key", + "num_retries": 3 + }, + { + "model": "text-embedding-3-small", + "input": ['{"id":2,"item":"Apples","price":1.2,"quantity":20}'], # Second input + "metadata": {'user': 'integration@demo.ai', 'bot': pytest.bot, 'invocation': 'knowledge_vault_sync'}, + "api_key": "common_openai_key", + "num_retries": 3 + } + ] + + for i, expected in enumerate(expected_calls): + actual_call = mock_embedding.call_args_list[i].kwargs + assert actual_call == expected + + CognitionData.objects(bot=pytest.bot, collection="groceries").delete() + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + LLMSecret.objects.delete() + + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_missing_collection(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + sync_data = [ + {"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}, + {"id": 2, "item": "Apples", "price": 1.20, "quantity": 20} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=nonexistent_collection", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + assert not actual["success"] + assert actual["message"] == "Collection 'nonexistent_collection' does not exist." + assert actual["error_code"] == 422 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="nonexistent_collection") + assert cognition_data.count() == 0 + + LLMSecret.objects.delete() + + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_column_header_mismatch(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + schema_response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + assert schema_response.status_code == 200 + assert schema_response.json()["message"] == "Schema saved!" + assert schema_response.json()["error_code"] == 0 + + sync_data = [ + {"id": 1, "item": "Juice", "quantity": 10, "description": "Orange juice"}, + {"id": 2, "item": "Apples", "quantity": 20, "description": "Fresh apples"} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + print(actual) + assert not actual["success"] + assert actual["message"] == "Validation failed" + assert actual["error_code"] == 400 + assert actual["data"] == {'1': [{'status': 'Column headers mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity', 'description']}], '2': [{'status': 'Column headers mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity', 'description']}]} + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 0 + + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + LLMSecret.objects.delete() + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_missing_primary_key(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + schema_response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + assert schema_response.status_code == 200 + assert schema_response.json()["message"] == "Schema saved!" + assert schema_response.json()["error_code"] == 0 + + sync_data = [ + {"item": "Juice", "price": 2.50, "quantity": 10}, + {"item": "Apples", "price": 1.20, "quantity": 20} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + print(actual) + assert not actual["success"] + assert actual["message"] == "Primary key 'id' must exist in each row." + assert actual["error_code"] == 422 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 0 + + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + LLMSecret.objects.delete() + @responses.activate def test_upload_doc_content(): bot_settings = BotSettings.objects(bot=pytest.bot).get() diff --git a/tests/unit_test/data_processor/data_processor_test.py b/tests/unit_test/data_processor/data_processor_test.py index 6ca7075f1..d4b87eac9 100644 --- a/tests/unit_test/data_processor/data_processor_test.py +++ b/tests/unit_test/data_processor/data_processor_test.py @@ -12,14 +12,16 @@ import yaml from kairon.shared.content_importer.data_objects import ContentValidationLogs +from kairon.shared.rest_client import AioRestClient from kairon.shared.utils import Utility +from kairon.shared.llm.processor import LLMProcessor os.environ["system_file"] = "./tests/testing_data/system.yaml" Utility.load_environment() Utility.load_system_metadata() -from unittest.mock import patch +from unittest.mock import patch, ANY import numpy as np import pandas as pd import pytest @@ -30,7 +32,7 @@ from mongoengine.errors import ValidationError from mongoengine.queryset.base import BaseQuerySet from pipedrive.exceptions import UnauthorizedError -from pydantic import SecretStr +from pydantic import SecretStr, constr from rasa.core.agent import Agent from rasa.shared.constants import DEFAULT_DOMAIN_PATH, DEFAULT_DATA_PATH, DEFAULT_CONFIG_PATH, \ DEFAULT_NLU_FALLBACK_INTENT_NAME @@ -1363,6 +1365,551 @@ def test_bot_id_change(self): bot_id = Slots.objects(bot="test_load_yml", user="testUser", influence_conversation=False, name='bot').get() assert bot_id['initial_value'] == "test_load_yml" + def test_validate_data_success(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + + metadata = [ + { + "column_name": "id", + "data_type": "int", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "item", + "data_type": "str", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "price", + "data_type": "float", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "quantity", + "data_type": "int", + "enable_search": True, + "create_embeddings": True + } + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + data = [ + {"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}, + {"id": 2, "item": "Apples", "price": 1.20, "quantity": 20}, + {"id": 3, "item": "Bananas", "price": 0.50, "quantity": 15}, + ] + + processor = CognitionDataProcessor() + validation_summary = processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=data, + bot=bot + ) + + assert validation_summary == {} + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + + def test_validate_data_missing_collection(self): + bot = 'test_bot' + collection_name = 'nonexistent_collection' + primary_key_col = "id" + data = [{"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}] + + processor = CognitionDataProcessor() + + with pytest.raises(AppException, match=f"Collection '{collection_name}' does not exist."): + processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=data, + bot=bot + ) + + def test_validate_data_missing_primary_key(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True} + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + data = [ + {"item": "Juice", "price": 2.50, "quantity": 10} + ] + + processor = CognitionDataProcessor() + + with pytest.raises(AppException, match=f"Primary key '{primary_key_col}' must exist in each row."): + processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=data, + bot=bot + ) + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + + def test_validate_data_column_header_mismatch(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True} + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + data = [ + {"id": "1", "item": "Juice", "quantity": 10, "discount": 0.10} + ] + + processor = CognitionDataProcessor() + validation_summary = processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=data, + bot=bot + ) + + assert "1" in validation_summary + assert validation_summary["1"][0]["status"] == "Column headers mismatch" + assert validation_summary["1"][0]["expected_columns"] == ["id", "item", "price", "quantity"] + assert validation_summary["1"][0]["actual_columns"] == ["id", "item", "quantity", "discount"] + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + + @pytest.mark.asyncio + @patch.object(LLMProcessor, "__collection_exists__", autospec=True) + @patch.object(LLMProcessor, "__create_collection__", autospec=True) + @patch.object(LLMProcessor, "__collection_upsert__", autospec=True) + @patch.object(litellm, "aembedding", autospec=True) + async def test_upsert_data_success(self, mock_embedding, mock_collection_upsert, mock_create_collection, + mock_collection_exists): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = 'id' + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + dummy_data = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "5" + } + existing_document = CognitionData( + data=dummy_data, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + upsert_data = [ + {"id": 1, "item": "Juice", "price": "2.50", "quantity": "10"}, # New entry + {"id": 2, "item": "Milk", "price": "3.00", "quantity": "5"} # Existing entry to be updated + ] + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + mock_collection_exists.return_value = False + mock_create_collection.return_value = None + mock_collection_upsert.return_value = None + + embedding = list(np.random.random(1532)) + mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]} + + processor = CognitionDataProcessor() + + result = await processor.upsert_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=upsert_data, + bot=bot, + user=user + ) + + upserted_data = list(CognitionData.objects(bot=bot, collection=collection_name)) + + assert result["message"] == "Upsert complete!" + assert len(upserted_data) == 2 + + inserted_record = next((item for item in upserted_data if item.data["id"] == "1"), None) + assert inserted_record is not None + assert inserted_record.data["item"] == "Juice" + assert inserted_record.data["price"] == "2.50" + assert inserted_record.data["quantity"] == "10" + + updated_record = next((item for item in upserted_data if item.data["id"] == "2"), None) + assert updated_record is not None + assert updated_record.data["item"] == "Milk" + assert updated_record.data["price"] == "3.00" # Updated price + assert updated_record.data["quantity"] == "5" + + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch.object(LLMProcessor, "__collection_exists__", autospec=True) + @patch.object(LLMProcessor, "__create_collection__", autospec=True) + @patch.object(LLMProcessor, "__collection_upsert__", autospec=True) + @patch.object(litellm, "aembedding", autospec=True) + async def test_upsert_data_empty_data_list(self, mock_embedding, mock_collection_upsert, mock_create_collection, + mock_collection_exists): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = 'id' + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + dummy_data = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "5" + } + existing_document = CognitionData( + data=dummy_data, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + upsert_data = [] + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + mock_collection_exists.return_value = False + mock_create_collection.return_value = None + mock_collection_upsert.return_value = None + + embedding = list(np.random.random(1532)) + mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]} + + processor = CognitionDataProcessor() + result = await processor.upsert_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + data=upsert_data, + bot=bot, + user=user + ) + + data = list(CognitionData.objects(bot=bot, collection=collection_name)) + + assert result["message"] == "Upsert complete!" + assert len(data) == 1 + + existing_record = data[0] + assert existing_record.data["id"] == "2" + assert existing_record.data["item"] == "Milk" + assert existing_record.data["price"] == "2.80" + assert existing_record.data["quantity"] == "5" + + CognitionSchema.objects(bot=bot, collection_name=collection_name).delete() + CognitionData.objects(bot=bot, collection=collection_name).delete() + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch.object(litellm, "aembedding", autospec=True) + @patch.object(LLMProcessor, "__collection_upsert__", autospec=True) + async def test_sync_with_qdrant_success(self, mock_collection_upsert, mock_embedding): + bot = "test_bot" + user = "test_user" + collection_name = "groceries" + primary_key_col = "id" + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + document_data = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "5" + } + document = CognitionData( + data=document_data, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + document.save() + + saved_document = None + for doc in CognitionData.objects(bot=bot, collection=collection_name): + doc_dict = doc.to_mongo().to_dict() + if doc_dict.get("data", {}).get("id") == "2": # Match based on `data.id` + saved_document = doc_dict + break + assert saved_document, "Saved CognitionData document not found" + vector_id = saved_document["vector_id"] + + if not isinstance(document, dict): + document = document.to_mongo().to_dict() + + embedding = list(np.random.random(1532)) + mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]} + + mock_collection_upsert.return_value = None + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = CognitionDataProcessor() + llm_processor = LLMProcessor(bot, DEFAULT_LLM) + await processor.sync_with_qdrant( + llm_processor=llm_processor, + collection_name=collection_name, + bot=bot, + document=document, + user=user, + primary_key_col=primary_key_col + ) + + mock_embedding.assert_called_once_with( + model="text-embedding-3-small", + input=['{"id":2,"item":"Milk","price":2.8,"quantity":5}'], + metadata={'user': user, 'bot': bot, 'invocation': 'knowledge_vault_sync'}, + api_key="openai_key", + num_retries=3 + ) + mock_collection_upsert.assert_called_once_with( + llm_processor, + collection_name, + { + "points": [ + { + "id": vector_id, + "vector": embedding, + "payload": {'id': 2, 'item': 'Milk', 'price': 2.8, 'quantity': 5} + } + ] + }, + err_msg="Unable to train FAQ! Contact support" + ) + + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch.object(litellm, "aembedding", autospec=True) + @patch.object(AioRestClient, "request", autospec=True) + async def test_sync_with_qdrant_upsert_failure(self, mock_request, mock_embedding): + bot = "test_bot" + user = "test_user" + collection_name = "groceries" + primary_key_col = "id" + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + document_data = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "5" + } + document = CognitionData( + data=document_data, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + document.save() + if not isinstance(document, dict): + document = document.to_mongo().to_dict() + + embedding = list(np.random.random(1532)) + mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]} + + mock_request.side_effect = ConnectionError("Failed to connect to Qdrant") + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = CognitionDataProcessor() + llm_processor = LLMProcessor(bot, DEFAULT_LLM) + + with pytest.raises(AppException, match="Failed to sync document with Qdrant: Failed to connect to Qdrant"): + await processor.sync_with_qdrant( + llm_processor=llm_processor, + collection_name=collection_name, + bot=bot, + document=document, + user=user, + primary_key_col=primary_key_col + ) + + mock_embedding.assert_called_once_with( + model="text-embedding-3-small", + input=['{"id":2,"item":"Milk","price":2.8,"quantity":5}'], + metadata={'user': user, 'bot': bot, 'invocation': 'knowledge_vault_sync'}, + api_key="openai_key", + num_retries=3 + ) + + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() + LLMSecret.objects.delete() + + def test_get_pydantic_type_int(self): + result = CognitionDataProcessor().get_pydantic_type('int') + expected = (int, ...) + assert result == expected + + def test_get_pydantic_type_float(self): + result = CognitionDataProcessor.get_pydantic_type('float') + expected = (float, ...) + assert result == expected + + def test_get_pydantic_type_invalid(self): + with pytest.raises(ValueError, match="Unsupported data type: unknown"): + CognitionDataProcessor.get_pydantic_type('unknown') + def test_save_and_validate_success(self): bot = 'test_bot' user = 'test_user' diff --git a/tests/unit_test/llm_test.py b/tests/unit_test/llm_test.py index c754845d3..34e39e99c 100644 --- a/tests/unit_test/llm_test.py +++ b/tests/unit_test/llm_test.py @@ -9,6 +9,7 @@ from aiohttp import ClientConnectionError from mongoengine import connect +from kairon.shared.rest_client import AioRestClient from kairon.shared.utils import Utility Utility.load_system_metadata() @@ -1287,4 +1288,72 @@ async def test_gpt3_faq_embedding_predict_with_query_prompt(self, mock_embedding "input": [query], 'metadata': {'user': user, 'bot': bot, 'invocation': None}, "api_key": key, "num_retries": 3} - assert not DeepDiff(mock_embedding.call_args[1], expected, ignore_order=True) \ No newline at end of file + assert not DeepDiff(mock_embedding.call_args[1], expected, ignore_order=True) + + @pytest.mark.asyncio + @mock.patch.object(AioRestClient, "request", autospec=True) + async def test_collection_exists_success(self, mock_request): + collection_name = "test_collection" + bot = "test_collection_exists_success" + user = "test_new" + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + mock_request.return_value = {"status": "ok"} + + llm_processor = LLMProcessor(bot, DEFAULT_LLM) + + result = await llm_processor.__collection_exists__(collection_name) + + mock_request.assert_called_once_with( + mock.ANY, + http_url=f"{llm_processor.db_url}/collections/{collection_name}", + request_method="GET", + headers=llm_processor.headers, + return_json=True, + timeout=5 + ) + assert result is True + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(AioRestClient, "request", autospec=True) + async def test_collection_exists_failure(self, mock_request): + collection_name = "test_collection" + bot = "test_collection_exists_failure" + user = "test_new" + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + mock_request.side_effect = Exception("Connection error") + + llm_processor = LLMProcessor(bot, DEFAULT_LLM) + + result = await llm_processor.__collection_exists__(collection_name) + + mock_request.assert_called_once_with( + mock.ANY, + http_url=f"{llm_processor.db_url}/collections/{collection_name}", + request_method="GET", + headers=llm_processor.headers, + return_json=True, + timeout=5 + ) + assert result is False + LLMSecret.objects.delete() \ No newline at end of file