Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added api to update knowldege vault real time #1599

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions kairon/api/app/routers/bot/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,34 @@ async def download_error_csv(
data=None,
error_code=e.status_code
)

@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
Comment on lines +330 to +336
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance endpoint definition with input validation and documentation

  1. Move Security decorator from parameter default to function body to avoid potential issues with argument defaults
  2. Add input validation for required parameters
  3. Add parameter documentation in docstring

Apply this diff:

 @router.post("/cognition/sync", response_model=Response)
 async def knowledge_vault_sync(
     primary_key_col: str,
     collection_name: str,
     data: List[dict],
-    current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
+    current_user: User = None,
 ):
     """
     Validates and syncs data to the specified MongoDB collection and vector database.
+    
+    Args:
+        primary_key_col: Primary key column name for the collection
+        collection_name: Name of the target collection
+        data: List of dictionaries containing the data to sync
+        current_user: Current authenticated user
+    
+    Returns:
+        Response object with sync status
     """
+    current_user = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
+    if not primary_key_col or not collection_name:
+        raise HTTPException(status_code=400, detail="primary_key_col and collection_name are required")
+    if not data:
+        raise HTTPException(status_code=400, detail="data cannot be empty")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = None,
):
"""
Validates and syncs data to the specified MongoDB collection and vector database.
Args:
primary_key_col: Primary key column name for the collection
collection_name: Name of the target collection
data: List of dictionaries containing the data to sync
current_user: Current authenticated user
Returns:
Response object with sync status
"""
current_user = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
if not primary_key_col or not collection_name:
raise HTTPException(status_code=400, detail="primary_key_col and collection_name are required")
if not data:
raise HTTPException(status_code=400, detail="data cannot be empty")
🧰 Tools
🪛 Ruff

335-335: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

"""
Validates and syncs data to the specified MongoDB collection and vector database.
"""
data = [{key.lower(): value for key, value in row.items()} for row in data]

error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), data, current_user.get_bot())

if error_summary:
return Response(
success=False,
message="Validation failed",
data=error_summary,
error_code=400
)

await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data,
current_user.get_bot(), current_user.get_user())
Comment on lines +352 to +353
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and transaction management for database operations

The upsert operation needs:

  1. Try-catch block for database operations
  2. Transaction management for data consistency
  3. Progress tracking for large datasets

Apply this diff:

-    await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data,
-                                    current_user.get_bot(), current_user.get_user())
+    try:
+        total_records = len(data)
+        logger.info(f"Starting upsert operation for {total_records} records")
+        
+        await cognition_processor.upsert_data(
+            primary_key_col.lower(),
+            collection_name.lower(),
+            data,
+            current_user.get_bot(),
+            current_user.get_user()
+        )
+        
+        logger.info(f"Successfully processed {total_records} records")
+    except Exception as e:
+        logger.error(f"Error during upsert operation: {str(e)}")
+        return Response(
+            success=False,
+            message="Failed to process data",
+            data={"error": str(e)},
+            error_code=500
+        )

Committable suggestion skipped: line range outside the PR's diff.


return Response(
success=True,
message="Processing completed successfully",
data=None
)
159 changes: 158 additions & 1 deletion kairon/shared/cognition/processor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from datetime import datetime
from typing import Text, Dict, Any
from typing import Text, Dict, Any, List

from loguru import logger
from mongoengine import DoesNotExist, Q
from pydantic import constr, create_model, ValidationError

from kairon import Utility
from kairon.exceptions import AppException
from kairon.shared.actions.data_objects import PromptAction, DatabaseAction
from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema, ColumnMetadata, CollectionData
from kairon.shared.data.constant import DEFAULT_LLM
from kairon.shared.data.processor import MongoProcessor
from kairon.shared.models import CognitionDataType, CognitionMetadataType

Expand Down Expand Up @@ -414,3 +416,158 @@ def validate_collection_name(bot: Text, collection: Text):
raise AppException(f'Cannot remove collection {collection} linked to action "{prompt_action[0].name}"!')
if database_action:
raise AppException(f'Cannot remove collection {collection} linked to action "{database_action[0].name}"!')

@staticmethod
def get_pydantic_type(data_type: str):
if data_type == 'str':
return (constr(strict=True, min_length=1), ...)
# return (str, ...)
elif data_type == 'int':
return (int, ...)
elif data_type == 'float':
return (float, ...)
else:
raise ValueError(f"Unsupported data type: {data_type}")

def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.

Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.

Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")

column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)

error_summary = {}

model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)

for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")

row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue

try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details

return error_summary

Comment on lines +431 to +489
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize validation performance and enhance error messages

The validation logic could be improved for better performance and user experience:

  1. Move DynamicModel creation outside the row loop
  2. Enhance error messages with expected data types
    def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
        if not CognitionSchema.objects(collection_name=collection_name).first():
            raise AppException(f"Collection '{collection_name}' does not exist.")

        column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
        error_summary = {}

        model_fields = {
            column_name: self.get_pydantic_type(data_type)
            for column_name, data_type in column_dict.items()
        }
        DynamicModel = create_model('DynamicModel', **model_fields)

        for row in data:
            row_key = row.get(primary_key_col)
            if not row_key:
                raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")

            row_errors = []
            if set(row.keys()) != set(column_dict.keys()):
                row_errors.append({
                    "status": "Column headers mismatch",
                    "expected_columns": list(column_dict.keys()),
                    "actual_columns": list(row.keys())
                })
            if row_errors:
                error_summary[row_key] = row_errors
                continue

            try:
                DynamicModel(**row)
            except ValidationError as e:
                error_details = []
                for error in e.errors():
                    column_name = error['loc'][0]
                    input_value = row.get(column_name)
-                    status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
+                    if input_value == "":
+                        status = "Required Field is Empty"
+                    else:
+                        status = f"Invalid DataType - Expected {column_dict[column_name]}, got {type(input_value).__name__}"
                    error_details.append({
                        "column_name": column_name,
                        "input": input_value,
                        "status": status
                    })
                error_summary[row_key] = error_details

        return error_summary
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.
Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.
Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
error_summary = {}
model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)
for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")
row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue
try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details
return error_summary
def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.
Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.
Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
error_summary = {}
model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)
for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")
row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue
try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
if input_value == "":
status = "Required Field is Empty"
else:
status = f"Invalid DataType - Expected {column_dict[column_name]}, got {type(input_value).__name__}"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details
return error_summary

async def upsert_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str, user: Text):
"""
Upserts data into the CognitionData collection.
If document with the primary key exists, it will be updated.
If not, it will be inserted.

Args:
primary_key_col: The primary key column name to check for uniqueness.
collection_name: The collection name (table).
data: List of rows of data to upsert.
bot: The bot identifier associated with the data.
user: The user
"""

from kairon.shared.llm.processor import LLMProcessor
llm_processor = LLMProcessor(bot, DEFAULT_LLM)
suffix = "_faq_embd"
qdrant_collection = f"{bot}_{collection_name}{suffix}" if collection_name else f"{bot}{suffix}"

if await llm_processor.__collection_exists__(qdrant_collection) is False:
await llm_processor.__create_collection__(qdrant_collection)

for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)
Comment on lines +512 to +514
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle None values appropriately when converting data to strings

Converting None values to the string 'None' may lead to unintended behavior. Ensure that None values are handled correctly during the conversion.

Apply this diff to handle None values:

-                row = {str(key): str(value) for key, value in row.items()}
+                row = {str(key): str(value) if value is not None else '' for key, value in row.items()}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)
for row in data:
row = {str(key): str(value) if value is not None else '' for key, value in row.items()}
primary_key_value = row.get(primary_key_col)


payload = {
"data": row,
"content_type": CognitionDataType.json.value,
"collection": collection_name
}
existing_document = CognitionData.objects(
Q(bot=bot) &
Q(collection=collection_name) &
Q(**{f"data__{primary_key_col}": str(primary_key_value)})
).first()

if existing_document:
if not isinstance(existing_document, dict):
existing_document = existing_document.to_mongo().to_dict()
row_id = str(existing_document["_id"])
self.update_cognition_data(row_id, payload, user, bot)
updated_document = CognitionData.objects(id=row_id).first()
if not isinstance(updated_document, dict):
updated_document = updated_document.to_mongo().to_dict()
logger.info(f"Row with {primary_key_col}: {primary_key_value} updated in MongoDB")
await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, updated_document, user,
primary_key_col)
else:
row_id = self.save_cognition_data(payload, user, bot)
new_document = CognitionData.objects(id=row_id).first()
if not isinstance(new_document, dict):
new_document = new_document.to_mongo().to_dict()
logger.info(f"Row with {primary_key_col}: {primary_key_value} inserted in MongoDB")
await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, new_document, user, primary_key_col)

return {"message": "Upsert complete!"}

Comment on lines +490 to +547
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider implementing batch processing for better performance

The current implementation processes one row at a time, which could be inefficient for large datasets. Consider implementing batch processing for both MongoDB and Qdrant operations.

Additionally, there's a potential race condition in the check-then-act pattern between checking for an existing document and updating it. Consider using MongoDB's atomic operations.

async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, user, primary_key_col):
"""
Syncs a document with Qdrant vector database by generating embeddings and upserting them.

Args:
llm_processor (LLMProcessor): Instance of LLMProcessor for embedding and Qdrant operations.
collection_name (str): Name of the Qdrant collection.
bot (str): Bot identifier.
document (CognitionData): Document to sync with Qdrant.
user (Text): User performing the operation.

Raises:
AppException: If Qdrant upsert operation fails.
"""
try:
metadata = self.find_matching_metadata(bot, document['data'], document.get('collection'))
search_payload, embedding_payload = Utility.retrieve_search_payload_and_embedding_payload(
document['data'], metadata)
embeddings = await llm_processor.get_embedding(embedding_payload, user, invocation='knowledge_vault_sync')
points = [{'id': document['vector_id'], 'vector': embeddings, 'payload': search_payload}]
await llm_processor.__collection_upsert__(collection_name, {'points': points},
err_msg="Unable to train FAQ! Contact support")
logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
except Exception as e:
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use raise ... from e to preserve exception context

When re-raising exceptions, use raise ... from e to maintain the original traceback and exception context, which aids in debugging.

Apply this diff to modify the exception raising:

-        raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
+        raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e
🧰 Tools
🪛 Ruff

573-573: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Comment on lines +548 to +572
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and add retry mechanism

  1. Use raise ... from e to preserve the exception context
  2. Consider adding a retry mechanism for transient failures in vector operations
+from tenacity import retry, stop_after_attempt, wait_exponential

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, user, primary_key_col):
        try:
            metadata = self.find_matching_metadata(bot, document['data'], document.get('collection'))
            search_payload, embedding_payload = Utility.retrieve_search_payload_and_embedding_payload(
                document['data'], metadata)
            embeddings = await llm_processor.get_embedding(embedding_payload, user, invocation='knowledge_vault_sync')
            points = [{'id': document['vector_id'], 'vector': embeddings, 'payload': search_payload}]
            await llm_processor.__collection_upsert__(collection_name, {'points': points},
                                                      err_msg="Unable to train FAQ! Contact support")
            logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
        except Exception as e:
-            raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
+            raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff

572-572: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

15 changes: 15 additions & 0 deletions kairon/shared/llm/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,21 @@ async def __collection_upsert__(self, collection_name: Text, data: Dict, err_msg
if raise_err:
raise AppException(err_msg)

async def __collection_exists__(self, collection_name: Text) -> bool:
"""Check if a collection exists."""
try:
response = await AioRestClient().request(
http_url=urljoin(self.db_url, f"/collections/{collection_name}"),
request_method="GET",
headers=self.headers,
return_json=True,
timeout=5
)
return response.get('status') == "ok"
except Exception as e:
logging.info(e)
return False

async def __collection_search__(self, collection_name: Text, vector: List, limit: int, score_threshold: float):
client = AioRestClient()
response = await client.request(
Expand Down
Loading
Loading