diff --git a/kairon/actions/definitions/prompt.py b/kairon/actions/definitions/prompt.py index f6524748d..e12ab7d46 100644 --- a/kairon/actions/definitions/prompt.py +++ b/kairon/actions/definitions/prompt.py @@ -14,6 +14,9 @@ from kairon.shared.data.constant import DEFAULT_NLU_FALLBACK_RESPONSE from kairon.shared.models import LlmPromptType, LlmPromptSource from kairon.shared.llm.processor import LLMProcessor +LLMProcessor.load_sparse_embedding_model() +LLMProcessor.load_rerank_embedding_model() + class ActionPrompt(ActionsBase): diff --git a/kairon/shared/cognition/processor.py b/kairon/shared/cognition/processor.py index 0885fe607..3e8e4c037 100644 --- a/kairon/shared/cognition/processor.py +++ b/kairon/shared/cognition/processor.py @@ -585,6 +585,8 @@ async def upsert_data(self, primary_key_col: str, collection_name: str, event_ty """ from kairon.shared.llm.processor import LLMProcessor + LLMProcessor.load_sparse_embedding_model() + LLMProcessor.load_rerank_embedding_model() llm_processor = LLMProcessor(bot, DEFAULT_LLM) suffix = "_faq_embd" qdrant_collection = f"{bot}_{collection_name}{suffix}" if collection_name else f"{bot}{suffix}" diff --git a/kairon/shared/llm/processor.py b/kairon/shared/llm/processor.py index 26cc541d0..770e2d165 100644 --- a/kairon/shared/llm/processor.py +++ b/kairon/shared/llm/processor.py @@ -1,3 +1,4 @@ +import os import time import urllib.parse from secrets import randbelow, choice @@ -5,6 +6,7 @@ from urllib.parse import urljoin import litellm +from fastembed import SparseTextEmbedding, LateInteractionTextEmbedding from loguru import logger as logging from mongoengine.base import BaseList from tiktoken import get_encoding @@ -25,7 +27,6 @@ from kairon.shared.rest_client import AioRestClient from kairon.shared.utils import Utility from http import HTTPStatus - litellm.callbacks = [LiteLLMLogger()] @@ -44,14 +45,11 @@ def __init__(self, bot: Text, llm_type: str): self.llm_type = llm_type self.vectors_config = {} self.sparse_vectors_config = {} - - self.llm_secret = Sysadmin.get_llm_secret(llm_type, bot) if llm_type != DEFAULT_LLM: self.llm_secret_embedding = Sysadmin.get_llm_secret(DEFAULT_LLM, bot) else: self.llm_secret_embedding = self.llm_secret - self.tokenizer = get_encoding("cl100k_base") self.EMBEDDING_CTX_LENGTH = 8191 self.__logs = [] @@ -144,29 +142,48 @@ async def predict(self, query: Text, user, *args, **kwargs) -> Tuple: elapsed_time = end_time - start_time return response, elapsed_time + def truncate_text(self, texts: List[Text]) -> List[Text]: + """ + Truncate multiple texts to 8191 tokens for openai + """ + truncated_texts = [] - async def get_embedding(self, texts: Union[Text, List[Text]], user: Text, **kwargs): + for text in texts: + tokens = self.tokenizer.encode(text)[:self.EMBEDDING_CTX_LENGTH] + truncated_texts.append(self.tokenizer.decode(tokens)) + + return truncated_texts + + async def get_embedding(self, texts: Union[Text, List[Text]], user, **kwargs): """ - Get embeddings for a batch of texts by making an API call. + Get embeddings for a batch of texts. """ - body = { - 'texts': texts, - 'user': user, - 'invocation': kwargs.get("invocation") - } + try: + is_single_text = isinstance(texts, str) + if is_single_text: + texts = [texts] - timeout = Utility.environment['llm'].get('request_timeout', 30) - http_response, status_code, _, _ = await ActionUtility.execute_request_async( - http_url=f"{Utility.environment['llm']['url']}/{urllib.parse.quote(self.bot)}/embedding/{self.llm_type}", - request_method="POST", - request_body=body, - timeout=timeout) + truncated_texts = self.truncate_text(texts) + + embeddings = {} + + result = await litellm.aembedding( + model="text-embedding-3-small", + input=truncated_texts, + metadata={'user': user, 'bot': self.bot, 'invocation': kwargs.get("invocation")}, + api_key=self.llm_secret_embedding.get('api_key'), + num_retries=3 + ) + embeddings["dense"] = [embedding["embedding"] for embedding in result["data"]] + embeddings["sparse"] = self.get_sparse_embedding(truncated_texts) + embeddings["rerank"] = self.get_rerank_embedding(truncated_texts) + + if is_single_text: + return {model: embedding[0] for model, embedding in embeddings.items()} - if status_code == 200: - embeddings = http_response.get('embedding', {}) return embeddings - else: - raise Exception(f"Failed to fetch embeddings: {http_response.get('message', 'Unknown error')}") + except Exception as e: + raise Exception(f"Failed to fetch embeddings: {str(e)}") async def __parse_completion_response(self, response, **kwargs): if kwargs.get("stream"): @@ -467,4 +484,62 @@ async def initialize_vector_configs(self): self.vectors_config = response_data.get('vectors_config', {}) self.sparse_vectors_config = response_data.get('sparse_vectors_config', {}) else: - raise Exception(f"Failed to fetch vector configs: {http_response.get('message', 'Unknown error')}") \ No newline at end of file + raise Exception(f"Failed to fetch vector configs: {http_response.get('message', 'Unknown error')}") + + @classmethod + def load_sparse_embedding_model(cls): + hf_cache_dir = os.path.expanduser("~/.cache/huggingface/hub") + kairon_cache_dir = "./kairon/pre-trained-models/" + + cache_dir = hf_cache_dir if os.path.exists(hf_cache_dir) else kairon_cache_dir + + if cls._sparse_embedding is None: + cls._sparse_embedding = SparseTextEmbedding("Qdrant/bm25", cache_dir=cache_dir) + logging.info("SPARSE MODEL LOADED") + + @classmethod + def load_rerank_embedding_model(cls): + hf_cache_dir = os.path.expanduser("~/.cache/huggingface/hub") + kairon_cache_dir = "./kairon/pre-trained-models/" + + cache_dir = hf_cache_dir if os.path.exists(hf_cache_dir) else kairon_cache_dir + + if cls._rerank_embedding is None: + cls._rerank_embedding = LateInteractionTextEmbedding("colbert-ir/colbertv2.0", cache_dir=cache_dir) + logging.info("RERANK MODEL LOADED") + + def get_sparse_embedding(self, sentences): + """ + Generate sparse embeddings for a list of sentences. + + Args: + sentences (list): A list of sentences to be encoded + + Returns: + list: A list of embeddings. + """ + try: + embeddings = list(self._sparse_embedding.passage_embed(sentences)) + + return [ + {"values": emb.values.tolist(), "indices": emb.indices.tolist()} + for emb in embeddings + ] + except Exception as e: + raise Exception(f"Error processing sparse embeddings: {str(e)}") + + def get_rerank_embedding(self, sentences): + """ + Generate embeddings for a list of sentences. + + Args: + sentences (list): A list of sentences to be encoded. + + Returns: + list: A list of embedding vectors. + """ + try: + embeddings = list(self._rerank_embedding.passage_embed(sentences)) + return [emb.tolist() for emb in embeddings] + except Exception as e: + raise Exception(f"Error processing rerank embeddings: {str(e)}") \ No newline at end of file diff --git a/kairon/shared/vector_embeddings/db/qdrant.py b/kairon/shared/vector_embeddings/db/qdrant.py index 6f05f1032..c40fe657a 100644 --- a/kairon/shared/vector_embeddings/db/qdrant.py +++ b/kairon/shared/vector_embeddings/db/qdrant.py @@ -4,11 +4,13 @@ from kairon import Utility from kairon.shared.actions.utils import ActionUtility -from kairon.shared.llm.processor import LLMProcessor from kairon.shared.data.constant import DEFAULT_LLM from kairon.shared.vector_embeddings.db.base import DatabaseBase from kairon.shared.actions.models import DbActionOperationType from kairon.shared.actions.exception import ActionFailure +from kairon.shared.llm.processor import LLMProcessor +LLMProcessor.load_sparse_embedding_model() +LLMProcessor.load_rerank_embedding_model() class Qdrant(DatabaseBase, ABC): diff --git a/kairon/train.py b/kairon/train.py index 285bd7aa7..831b79277 100644 --- a/kairon/train.py +++ b/kairon/train.py @@ -17,6 +17,8 @@ from kairon.shared.metering.metering_processor import MeteringProcessor from kairon.shared.utils import Utility from kairon.shared.llm.processor import LLMProcessor +LLMProcessor.load_sparse_embedding_model() +LLMProcessor.load_rerank_embedding_model() def train_model_for_bot(bot: str): diff --git a/requirements/prod.txt b/requirements/prod.txt index de1aaf950..f519d89b0 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -69,4 +69,5 @@ huggingface-hub==0.25.2 imap-tools==1.7.4 more-itertools python-multipart>=0.0.18 -nltk \ No newline at end of file +nltk +fastembed==0.5.1 \ No newline at end of file diff --git a/tests/integration_test/action_service_test.py b/tests/integration_test/action_service_test.py index cd36234b4..12e28dcbb 100644 --- a/tests/integration_test/action_service_test.py +++ b/tests/integration_test/action_service_test.py @@ -3805,6 +3805,9 @@ def test_http_action_doesnotexist(): @responses.activate def test_vectordb_action_execution_payload_search_from_slot(): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution_payload_search_from_slot" bot = '5f50md0a56b698ca10d35e2e' Actions(name=action_name, type=ActionType.database_action.value, bot=bot, @@ -3895,6 +3898,9 @@ def test_vectordb_action_execution_payload_search_from_slot(): @responses.activate def test_vectordb_action_execution_payload_search_from_user_message(): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution_payload_search_from_user_message" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50md0a56b698ca10d35d2e", user="user").save() @@ -3984,6 +3990,9 @@ def test_vectordb_action_execution_payload_search_from_user_message(): @responses.activate def test_vectordb_action_execution_payload_search_from_user_message_in_slot(): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution_payload_search_from_user_message_in_slot" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50md0a56b698ca10d35d2f", user="user").save() @@ -4076,6 +4085,9 @@ def test_vectordb_action_execution_payload_search_from_user_message_in_slot(): @mock.patch.object(LLMProcessor, "get_embedding", autospec=True) def test_vectordb_action_execution_embedding_search_from_value(mock_get_embedding): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50fd0a56b698ca10d75d2e", user="user").save() @@ -4210,6 +4222,9 @@ def test_vectordb_action_execution_embedding_search_from_value(mock_get_embeddin @responses.activate def test_vectordb_action_execution_payload_search_from_value(): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50md0a56b698ca10d35d2z", user="user").save() @@ -4298,6 +4313,9 @@ def test_vectordb_action_execution_payload_search_from_value(): @responses.activate def test_vectordb_action_execution_payload_search_from_value_json_decode_error(): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution_payload_search_from_value_json_decode_error" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50md0a56b698ca10d35d2e", user="user").save() @@ -4358,6 +4376,9 @@ def test_vectordb_action_execution_payload_search_from_value_json_decode_error() @mock.patch.object(LLMProcessor, "get_embedding", autospec=True) def test_vectordb_action_execution_embedding_search_from_slot(mock_get_embedding): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50fx0a56b698ca10d35d2e", user="user").save() @@ -4495,6 +4516,9 @@ def test_vectordb_action_execution_embedding_search_from_slot(mock_get_embedding @mock.patch.object(LLMProcessor, "get_embedding", autospec=True) def test_vectordb_action_execution_embedding_search_no_response_dispatch(mock_get_embedding): responses.add_passthru("https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken") + responses.add_passthru("https://huggingface.co/Qdrant/bm25") + responses.add_passthru("https://huggingface.co/Qdrant/bm25/tree/main") + responses.add_passthru("https://huggingface.co/colbert-ir/colbertv2.0") action_name = "test_vectordb_action_execution_no_response_dispatch" Actions(name=action_name, type=ActionType.database_action.value, bot="5f50fd0a56v098ca10d75d2e", user="user").save() diff --git a/tests/unit_test/llm_test.py b/tests/unit_test/llm_test.py index 7909c551b..203f7ad09 100644 --- a/tests/unit_test/llm_test.py +++ b/tests/unit_test/llm_test.py @@ -1,12 +1,16 @@ import os import urllib from unittest import mock -from unittest.mock import patch, AsyncMock +from unittest.mock import patch, AsyncMock, MagicMock from urllib.parse import urljoin +from loguru import logger +from io import StringIO + import numpy as np import pytest from aiohttp import ClientConnectionError +from fastembed import SparseTextEmbedding, LateInteractionTextEmbedding from mongoengine import connect from kairon.shared.rest_client import AioRestClient @@ -14,10 +18,12 @@ from kairon.exceptions import AppException +from kairon.shared.admin.constants import BotSecretType from kairon.shared.admin.data_objects import BotSecrets, LLMSecret from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema from kairon.shared.data.constant import DEFAULT_SYSTEM_PROMPT, DEFAULT_LLM from kairon.shared.llm.processor import LLMProcessor +import litellm from deepdiff import DeepDiff @@ -1942,14 +1948,16 @@ async def test_initialize_vector_configs_empty_response(self, mock_execute_reque LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.actions.utils.ActionUtility.execute_request_async", new_callable=AsyncMock) - async def test_get_embedding_success_single_text(self, mock_execute_request_async): + @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) + async def test_collection_hybrid_query_success(self, mock_request): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - texts = "Hello how are you??" - invocation = "test_invocation" + collection_name = "test_collection" + limit = 5 + score_threshold = 0.7 + llm_secret = LLMSecret( llm_type=llm_type, api_key=key, @@ -1960,8 +1968,7 @@ async def test_get_embedding_success_single_text(self, mock_execute_request_asyn ) llm_secret.save() - mock_response = { - "embedding": { + embeddings = { "dense": [ 0.01926439255475998, -0.0645047277212143 @@ -1982,46 +1989,72 @@ async def test_get_embedding_success_single_text(self, mock_execute_request_asyn [ 0.046593569219112396, -0.023154577240347862 - ], - [ - 0.0206329133361578, - -0.07174995541572571 - ], - [ - -0.007417665328830481, - 0.09697738289833069 ] ] } + + mock_response = { + "result":{ + "points":[ + { + "id":2, + "version":0, + "score":1.5, + "payload":{ + "content":"Great Wall of China is a historic fortification stretching thousands of miles, built to protect Chinese states from invasions." + } + }, + { + "id":1, + "version":0, + "score":1.0, + "payload":{ + "content":"Taj Mahal is a white marble mausoleum in India, built by Mughal Emperor Shah Jahan in memory of his wife Mumtaz Mahal." + } + } + ] + }, + "status":"ok", + "time":0.003191196 } - mock_execute_request_async.return_value = (mock_response, 200, None, None) + mock_request.return_value = mock_response processor = LLMProcessor(bot, llm_type) - embeddings = await processor.get_embedding(texts, user, invocation=invocation) + response = await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) - assert embeddings == mock_response["embedding"] - mock_execute_request_async.assert_called_once_with( - http_url=f"{Utility.environment['llm']['url']}/{urllib.parse.quote(bot)}/embedding/{llm_type}", + assert response == mock_response + mock_request.assert_called_once_with( + http_url=f"{Utility.environment['vector']['db']}/collections/{collection_name}/points/query", request_method="POST", + headers={}, request_body={ - 'texts': texts, - 'user': user, - 'invocation': invocation + "prefetch": [ + {"query": embeddings.get("dense", []), "using": "dense", "limit": limit * 2}, + {"query": embeddings.get("rerank", []), "using": "rerank", "limit": limit * 2}, + {"query": embeddings.get("sparse", {}), "using": "sparse", "limit": limit * 2} + ], + "query": {"fusion": "rrf"}, + "with_payload": True, + "score_threshold": score_threshold, + "limit": limit }, - timeout=Utility.environment['llm'].get('request_timeout', 30) + return_json=True, + timeout=5 ) LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.actions.utils.ActionUtility.execute_request_async", new_callable=AsyncMock) - async def test_get_embedding_success_multiple_texts(self, mock_execute_request_async): + @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) + async def test_collection_hybrid_query_request_failure(self, mock_request): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - texts = ["Hello how are you?","I am Python"] - invocation = "test_invocation" + collection_name = "test_collection" + limit = 5 + score_threshold = 0.7 + llm_secret = LLMSecret( llm_type=llm_type, api_key=key, @@ -2032,105 +2065,31 @@ async def test_get_embedding_success_multiple_texts(self, mock_execute_request_a ) llm_secret.save() - mock_response = { - "embedding": { - "dense": [ - [ - -0.014715306460857391, - -0.022890476509928703 - ], - [ - -0.019074518233537674, - -0.0060106911696493626 - ] - ], - "sparse": [ - { - "values": [ - 1.6877434821696136 - ], - "indices": [ - 613153351 - ] - }, - { - "values": [ - 1.6877434821696136 - ], - "indices": [ - 948991206 - ] - } - ], - "rerank": [ - [ - [ - -0.17043153941631317, - -0.05260511487722397 - ], - [ - -0.0009218898485414684, - 0.028302231803536415 - ], - [ - 0.006710350513458252, - 0.06639177352190018 - ], - [ - -0.1451372504234314, - -0.0822567492723465 - ] - ], - [ - [ - -0.09881757199764252, - -0.05606473982334137 - ], - [ - -0.06487230211496353, - -0.042552437633275986 - ], - [ - -0.09635651856660843, - -0.06676826626062393 - ], - [ - -0.09975136816501617, - -0.07088008522987366 - ] - ] - ] - } + embeddings = { + "dense": [0.01926439255475998, -0.0645047277212143], + "sparse": {"values": [1.6877434821696136], "indices": [613153351]}, + "rerank": [[0.03842781484127045, 0.10881761461496353], [0.046593569219112396, -0.023154577240347862]] } - mock_execute_request_async.return_value = (mock_response, 200, None, None) + mock_request.side_effect = Exception("Request failed") processor = LLMProcessor(bot, llm_type) - embeddings = await processor.get_embedding(texts, user, invocation=invocation) + with pytest.raises(Exception, match="Request failed"): + await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) - assert embeddings == mock_response["embedding"] - mock_execute_request_async.assert_called_once_with( - http_url=f"{Utility.environment['llm']['url']}/{urllib.parse.quote(bot)}/embedding/{llm_type}", - request_method="POST", - request_body={ - 'texts': texts, - 'user': user, - 'invocation': invocation - }, - timeout=Utility.environment['llm'].get('request_timeout', 30) - ) + mock_request.assert_called_once() LLMSecret.objects.delete() - @pytest.mark.asyncio - @patch("kairon.shared.actions.utils.ActionUtility.execute_request_async", new_callable=AsyncMock) - async def test_get_embedding_api_error(self, mock_execute_request_async): + @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) + async def test_collection_hybrid_query_empty_response(self, mock_request): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - texts = ["Hello how are you?", "I am Python"] - invocation = "test_invocation" + collection_name = "test_collection" + limit = 5 + score_threshold = 0.7 llm_secret = LLMSecret( llm_type=llm_type, @@ -2142,34 +2101,28 @@ async def test_get_embedding_api_error(self, mock_execute_request_async): ) llm_secret.save() - mock_execute_request_async.return_value = ({"message": "Internal Server Error"}, 500, None, None) + embeddings = { + "dense": [0.01926439255475998, -0.0645047277212143], + "sparse": {"values": [1.6877434821696136], "indices": [613153351]}, + "rerank": [[0.03842781484127045, 0.10881761461496353], [0.046593569219112396, -0.023154577240347862]] + } + + mock_request.return_value = {} processor = LLMProcessor(bot, llm_type) + response = await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) - with pytest.raises(Exception, match="Failed to fetch embeddings: Internal Server Error"): - await processor.get_embedding(texts, user, invocation=invocation) - - mock_execute_request_async.assert_called_once_with( - http_url=f"{Utility.environment['llm']['url']}/{urllib.parse.quote(bot)}/embedding/{llm_type}", - request_method="POST", - request_body={ - 'texts': texts, - 'user': user, - 'invocation': invocation - }, - timeout=Utility.environment['llm'].get('request_timeout', 30) - ) + assert response == {} + mock_request.assert_called_once() LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.actions.utils.ActionUtility.execute_request_async", new_callable=AsyncMock) - async def test_get_embedding_empty_response(self, mock_execute_request_async): + @patch("kairon.shared.llm.processor.LLMProcessor._sparse_embedding") + def test_sparse_embedding_single_sentence(self, mock_sparse): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - texts = ["Hello how are you?", "I am Python"] - invocation = "test_invocation" llm_secret = LLMSecret( llm_type=llm_type, @@ -2181,35 +2134,24 @@ async def test_get_embedding_empty_response(self, mock_execute_request_async): ) llm_secret.save() - mock_execute_request_async.return_value = ({}, 200, None, None) - processor = LLMProcessor(bot, llm_type) - embeddings = await processor.get_embedding(texts, user, invocation=invocation) + mock_sparse.passage_embed.return_value = iter([ + MagicMock(values=np.array([1.62, 1.87]), indices=np.array([101, 202])) + ]) - assert embeddings == {} + result = processor.get_sparse_embedding(["Hello"]) - mock_execute_request_async.assert_called_once_with( - http_url=f"{Utility.environment['llm']['url']}/{urllib.parse.quote(bot)}/embedding/{llm_type}", - request_method="POST", - request_body={ - 'texts': texts, - 'user': user, - 'invocation': invocation - }, - timeout=Utility.environment['llm'].get('request_timeout', 30) - ) + assert result == [{"values": [1.62, 1.87], "indices": [101, 202]}] + mock_sparse.passage_embed.assert_called_once_with(["Hello"]) LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) - async def test_collection_hybrid_query_success(self, mock_request): + @patch("kairon.shared.llm.processor.LLMProcessor._sparse_embedding") + async def test_sparse_embedding_multiple_sentences(self, mock_sparse): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - collection_name = "test_collection" - limit = 5 - score_threshold = 0.7 llm_secret = LLMSecret( llm_type=llm_type, @@ -2221,92 +2163,55 @@ async def test_collection_hybrid_query_success(self, mock_request): ) llm_secret.save() - embeddings = { - "dense": [ - 0.01926439255475998, - -0.0645047277212143 - ], - "sparse": { - "values": [ - 1.6877434821696136 - ], - "indices": [ - 613153351 - ] - }, - "rerank": [ - [ - 0.03842781484127045, - 0.10881761461496353, - ], - [ - 0.046593569219112396, - -0.023154577240347862 - ] - ] - } + processor = LLMProcessor(bot, llm_type) + mock_sparse.passage_embed.return_value = iter([ + MagicMock(values=np.array([1.62, 1.87]), indices=np.array([101, 202])), + MagicMock(values=np.array([2.71, 3.14]), indices=np.array([303, 404])) + ]) - mock_response = { - "result":{ - "points":[ - { - "id":2, - "version":0, - "score":1.5, - "payload":{ - "content":"Great Wall of China is a historic fortification stretching thousands of miles, built to protect Chinese states from invasions." - } - }, - { - "id":1, - "version":0, - "score":1.0, - "payload":{ - "content":"Taj Mahal is a white marble mausoleum in India, built by Mughal Emperor Shah Jahan in memory of his wife Mumtaz Mahal." - } - } - ] - }, - "status":"ok", - "time":0.003191196 - } + result = processor.get_sparse_embedding(["Hello", "World"]) - mock_request.return_value = mock_response + assert result == [ + {"values": [1.62, 1.87], "indices": [101, 202]}, + {"values": [2.71, 3.14], "indices": [303, 404]} + ] + mock_sparse.passage_embed.assert_called_once_with(["Hello", "World"]) + LLMSecret.objects.delete() - processor = LLMProcessor(bot, llm_type) - response = await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) + @pytest.mark.asyncio + @patch("kairon.shared.llm.processor.LLMProcessor._sparse_embedding") + async def test_sparse_embedding_empty_list(self, mock_sparse): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" - assert response == mock_response - mock_request.assert_called_once_with( - http_url=f"{Utility.environment['vector']['db']}/collections/{collection_name}/points/query", - request_method="POST", - headers={}, - request_body={ - "prefetch": [ - {"query": embeddings.get("dense", []), "using": "dense", "limit": limit * 2}, - {"query": embeddings.get("rerank", []), "using": "rerank", "limit": limit * 2}, - {"query": embeddings.get("sparse", {}), "using": "sparse", "limit": limit * 2} - ], - "query": {"fusion": "rrf"}, - "with_payload": True, - "score_threshold": score_threshold, - "limit": limit - }, - return_json=True, - timeout=5 + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + mock_sparse.passage_embed.return_value = iter([]) + + result = processor.get_sparse_embedding([]) + + assert result == [] + mock_sparse.passage_embed.assert_called_once_with([]) LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) - async def test_collection_hybrid_query_request_failure(self, mock_request): + @patch("kairon.shared.llm.processor.LLMProcessor._sparse_embedding") + async def test_sparse_embedding_raises_exception(self, mock_sparse): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - collection_name = "test_collection" - limit = 5 - score_threshold = 0.7 llm_secret = LLMSecret( llm_type=llm_type, @@ -2318,31 +2223,23 @@ async def test_collection_hybrid_query_request_failure(self, mock_request): ) llm_secret.save() - embeddings = { - "dense": [0.01926439255475998, -0.0645047277212143], - "sparse": {"values": [1.6877434821696136], "indices": [613153351]}, - "rerank": [[0.03842781484127045, 0.10881761461496353], [0.046593569219112396, -0.023154577240347862]] - } + processor = LLMProcessor(bot, llm_type) - mock_request.side_effect = Exception("Request failed") + mock_sparse.passage_embed.side_effect = Exception("Not Found") - processor = LLMProcessor(bot, llm_type) - with pytest.raises(Exception, match="Request failed"): - await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) + with pytest.raises(Exception, match="Error processing sparse embeddings: Not Found"): + processor.get_sparse_embedding(["Text of error case"]) - mock_request.assert_called_once() + mock_sparse.passage_embed.assert_called_once_with(["Text of error case"]) LLMSecret.objects.delete() @pytest.mark.asyncio - @patch("kairon.shared.rest_client.AioRestClient.request", new_callable=AsyncMock) - async def test_collection_hybrid_query_empty_response(self, mock_request): + @patch("kairon.shared.llm.processor.LLMProcessor._rerank_embedding") + def test_rerank_embedding_single_sentence(self, mock_rerank): bot = "test_bot" llm_type = "openai" key = "test" user = "test" - collection_name = "test_collection" - limit = 5 - score_threshold = 0.7 llm_secret = LLMSecret( llm_type=llm_type, @@ -2354,17 +2251,567 @@ async def test_collection_hybrid_query_empty_response(self, mock_request): ) llm_secret.save() - embeddings = { - "dense": [0.01926439255475998, -0.0645047277212143], - "sparse": {"values": [1.6877434821696136], "indices": [613153351]}, - "rerank": [[0.03842781484127045, 0.10881761461496353], [0.046593569219112396, -0.023154577240347862]] - } + processor = LLMProcessor(bot, llm_type) - mock_request.return_value = {} + mock_rerank.passage_embed.return_value = iter([ + np.array([[0.11, 0.22, 0.33], [0.44, 0.55, 0.66]]) + ]) + + result = processor.get_rerank_embedding(["Hello"]) + + assert result == [ + [[0.11, 0.22, 0.33], [0.44, 0.55, 0.66]] + ] + + mock_rerank.passage_embed.assert_called_once_with(["Hello"]) + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch("kairon.shared.llm.processor.LLMProcessor._rerank_embedding") + def test_rerank_embedding_multiple_sentences(self, mock_rerank): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() processor = LLMProcessor(bot, llm_type) - response = await processor.__collection_hybrid_query__(collection_name, embeddings, limit, score_threshold) - assert response == {} - mock_request.assert_called_once() + mock_rerank.passage_embed.return_value = iter([ + np.array([[0.11, 0.22, 0.33], [0.44, 0.55, 0.66]]), + np.array([[0.77, 0.88, 0.99], [1.11, 1.22, 1.33]]) + ]) + + result = processor.get_rerank_embedding(["Hello", "World"]) + + assert result == [ + [[0.11, 0.22, 0.33], [0.44, 0.55, 0.66]], + [[0.77, 0.88, 0.99], [1.11, 1.22, 1.33]] + ] + + mock_rerank.passage_embed.assert_called_once_with(["Hello", "World"]) + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch("kairon.shared.llm.processor.LLMProcessor._rerank_embedding") + def test_rerank_embedding_empty_sentences(self, mock_rerank): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + + mock_rerank.passage_embed.return_value = iter([]) + + result = processor.get_rerank_embedding([]) + + assert result == [] + + mock_rerank.passage_embed.assert_called_once_with([]) + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @patch("kairon.shared.llm.processor.LLMProcessor._rerank_embedding") + def test_rerank_embedding_raises_exception(self, mock_rerank): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + + mock_rerank.passage_embed.side_effect = Exception("Not Found") + + with pytest.raises(Exception, match="Error processing rerank embeddings: Not Found"): + processor.get_rerank_embedding(["Text of error case"]) + + mock_rerank.passage_embed.assert_called_once_with(["Text of error case"]) + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(litellm, "aembedding", autospec=True) + @patch("kairon.shared.llm.processor.LLMProcessor.get_sparse_embedding") + @patch("kairon.shared.llm.processor.LLMProcessor.get_rerank_embedding") + async def test_get_embedding_single_text(self, mock_rerank, mock_sparse, mock_dense): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + text = "Hello" + + embedding = np.random.random(1536).tolist() + mock_dense.return_value = litellm.EmbeddingResponse( + **{'data': [{'embedding': embedding}]} + ) + + bm25_embedding = [{"indices": [1850593538, 11711171], "values": [1.66, 1.66]}] + mock_sparse.return_value = bm25_embedding + + colbertv2_0_embedding = [[np.random.random(128).tolist()]] + mock_rerank.return_value = colbertv2_0_embedding + + result = await processor.get_embedding(text, user) + + assert result == { + "dense": embedding, + "sparse": bm25_embedding[0], + "rerank": colbertv2_0_embedding[0] + } + + mock_dense.assert_called_once_with( + model="text-embedding-3-small", + input=[text], + metadata={'user': user, 'bot': bot, 'invocation': None}, + api_key=key, + num_retries=3 + ) + + mock_sparse.assert_called_once_with([text]) + mock_rerank.assert_called_once_with([text]) + + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(litellm, "aembedding", autospec=True) + @patch("kairon.shared.llm.processor.LLMProcessor.get_sparse_embedding") + @patch("kairon.shared.llm.processor.LLMProcessor.get_rerank_embedding") + async def test_get_embedding_multiple_texts( + self, mock_rerank, mock_sparse, mock_dense + ): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + + texts = ["Hello", "World"] + + embedding = np.random.random(1536).tolist() + mock_dense.return_value = litellm.EmbeddingResponse( + **{'data': [{'embedding': embedding}, {'embedding': embedding}]} + ) + + bm25_embeddings = [ + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]}, + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]} + ] + mock_sparse.return_value = bm25_embeddings + + colbertv2_0_embeddings = [ + [np.random.random(128).tolist()], + [np.random.random(128).tolist()] + ] + mock_rerank.return_value = colbertv2_0_embeddings + + result = await processor.get_embedding(texts, user) + + assert result == { + "dense": [embedding, embedding], + "sparse": bm25_embeddings, + "rerank": colbertv2_0_embeddings + } + + mock_dense.assert_called_once_with( + model="text-embedding-3-small", + input=texts, + metadata={'user': user, 'bot': bot, 'invocation': None}, + api_key=key, + num_retries=3 + ) + + mock_sparse.assert_called_once_with(texts) + mock_rerank.assert_called_once_with(texts) + + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(litellm, "aembedding", autospec=True) + @patch("kairon.shared.llm.processor.LLMProcessor.get_sparse_embedding") + @patch("kairon.shared.llm.processor.LLMProcessor.get_rerank_embedding") + async def test_get_embedding_dense_failure(self, mock_rerank, mock_sparse, mock_dense): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + texts = ["Hello", "World"] + + mock_dense.side_effect = Exception("Dense embedding failed") + + bm25_embeddings = [ + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]}, + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]} + ] + mock_sparse.return_value = bm25_embeddings + + colbertv2_0_embeddings = [ + [np.random.random(128).tolist()], + [np.random.random(128).tolist()] + ] + mock_rerank.return_value = colbertv2_0_embeddings + + with pytest.raises(Exception, match="Failed to fetch embeddings: Dense embedding failed"): + await processor.get_embedding(texts, user) + + mock_dense.assert_called_once_with( + model="text-embedding-3-small", + input=texts, + metadata={'user': user, 'bot': bot, 'invocation': None}, + api_key=key, + num_retries=3 + ) + + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(litellm, "aembedding", autospec=True) + @patch("kairon.shared.llm.processor.LLMProcessor.get_sparse_embedding") + @patch("kairon.shared.llm.processor.LLMProcessor.get_rerank_embedding") + async def test_get_embedding_sparse_failure(self, mock_rerank, mock_sparse, mock_dense): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + texts = ["Hello", "World"] + + embedding = np.random.random(1536).tolist() + mock_dense.return_value = litellm.EmbeddingResponse( + **{'data': [{'embedding': embedding}, {'embedding': embedding}]} + ) + + mock_sparse.side_effect = Exception("Sparse embedding failed") + + colbertv2_0_embeddings = [ + [np.random.random(128).tolist()], + [np.random.random(128).tolist()] + ] + mock_rerank.return_value = colbertv2_0_embeddings + + with pytest.raises(Exception, match="Failed to fetch embeddings: Sparse embedding failed"): + await processor.get_embedding(texts, user) + + mock_dense.assert_called_once_with( + model="text-embedding-3-small", + input=texts, + metadata={'user': user, 'bot': bot, 'invocation': None}, + api_key=key, + num_retries=3 + ) + + LLMSecret.objects.delete() + + @pytest.mark.asyncio + @mock.patch.object(litellm, "aembedding", autospec=True) + @patch("kairon.shared.llm.processor.LLMProcessor.get_sparse_embedding") + @patch("kairon.shared.llm.processor.LLMProcessor.get_rerank_embedding") + async def test_get_embedding_rerank_failure(self, mock_rerank, mock_sparse, mock_dense): + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + processor = LLMProcessor(bot, llm_type) + texts = ["Hello", "World"] + + embedding = np.random.random(1536).tolist() + mock_dense.return_value = litellm.EmbeddingResponse( + **{'data': [{'embedding': embedding}, {'embedding': embedding}]} + ) + + bm25_embeddings = [ + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]}, + {"indices": [1850593538, 11711171], "values": [1.66, 1.66]} + ] + mock_sparse.return_value = bm25_embeddings + + mock_rerank.side_effect = Exception("Failed to fetch embeddings: Rerank embedding failed") + + with pytest.raises(Exception, match="Rerank embedding failed"): + await processor.get_embedding(texts, user) + + mock_dense.assert_called_once_with( + model="text-embedding-3-small", + input=texts, + metadata={'user': user, 'bot': bot, 'invocation': None}, + api_key=key, + num_retries=3 + ) + + LLMSecret.objects.delete() + + def test_load_sparse_embedding_model_already_initialized(self): + """Test that the sparse embedding model loads correctly when LLMProcessor is initialized.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + LLMProcessor._sparse_embedding = SparseTextEmbedding("Qdrant/bm25") + + log_output = StringIO() + logger.add(log_output, format="{message}") + + processor = LLMProcessor(bot="test_bot", llm_type="openai") + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._sparse_embedding, SparseTextEmbedding) + assert "SPARSE MODEL LOADED" not in log_contents + + LLMSecret.objects.delete() + + def test_load_sparse_embedding_model_not_initialized(self): + """Test that the sparse embedding model loads correctly when LLMProcessor is not initialized.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + log_output = StringIO() + logger.add(log_output, format="{message}") + + LLMProcessor._sparse_embedding = None + LLMProcessor.load_sparse_embedding_model() + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._sparse_embedding, SparseTextEmbedding) + assert "SPARSE MODEL LOADED" in log_contents + LLMSecret.objects.delete() + + def test_load_sparse_embedding_model_fallback_cache(self): + """Test that the sparse embedding model falls back to the Kairon cache if Hugging Face cache is missing.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + with patch("os.path.exists", return_value=False): + LLMProcessor._sparse_embedding = None + + log_output = StringIO() + logger.add(log_output, format="{message}") + + LLMProcessor.load_sparse_embedding_model() + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._sparse_embedding, SparseTextEmbedding) + assert "SPARSE MODEL LOADED" in log_contents + + LLMSecret.objects.delete() + + def test_load_sparse_embedding_model_hf_cache(self): + """Test that the sparse embedding model falls back to the Kairon cache if Hugging Face cache is missing.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + with patch("os.path.exists", return_value=True): + LLMProcessor._sparse_embedding = None + + log_output = StringIO() + logger.add(log_output, format="{message}") + + LLMProcessor.load_sparse_embedding_model() + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._sparse_embedding, SparseTextEmbedding) + assert "SPARSE MODEL LOADED" in log_contents + + LLMSecret.objects.delete() + + def test_load_rerank_embedding_model_not_initialized(self): + """Test that the sparse embedding model loads correctly when LLMProcessor is not initialized.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + log_output = StringIO() + logger.add(log_output, format="{message}") + + LLMProcessor._rerank_embedding = None + LLMProcessor.load_rerank_embedding_model() + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._rerank_embedding, LateInteractionTextEmbedding) + assert "RERANK MODEL LOADED" in log_contents + LLMSecret.objects.delete() + + def test_load_rerank_embedding_model_hf_cache(self): + """Test that the sparse embedding model falls back to the Kairon cache if Hugging Face cache is missing.""" + bot = "test_bot" + llm_type = "openai" + key = "test" + user = "test" + + llm_secret = LLMSecret( + llm_type=llm_type, + api_key=key, + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + with patch("os.path.exists", return_value=True): + LLMProcessor._rerank_embedding = None + + log_output = StringIO() + logger.add(log_output, format="{message}") + + LLMProcessor.load_rerank_embedding_model() + + logger.remove() + log_contents = log_output.getvalue() + + assert isinstance(LLMProcessor._rerank_embedding, LateInteractionTextEmbedding) + assert "RERANK MODEL LOADED" in log_contents + LLMSecret.objects.delete() \ No newline at end of file