Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added api to update knowldege vault real time #1599

Conversation

himanshugt16
Copy link
Contributor

@himanshugt16 himanshugt16 commented Nov 15, 2024

…ledge_vault and corresponding test cases

Summary by CodeRabbit

  • New Features

    • Introduced a new endpoint for synchronizing data with MongoDB and a vector database via POST request at /cognition/sync.
    • Enhanced data validation and upserting capabilities within the data processing workflow, including robust error handling.
    • Added functionality to check for the existence of collections in the database.
  • Tests

    • Added comprehensive test cases for the knowledge vault synchronization feature, covering successful operations and various error scenarios.
    • Introduced new tests for checking the existence of collections in the LLMProcessor class.

Copy link

coderabbitai bot commented Nov 15, 2024

Walkthrough

The changes introduce a new endpoint for data synchronization with a MongoDB collection and a vector database in the API. The knowledge_vault_sync method processes incoming data, validating it against specified parameters and upserting it into the database. Enhancements to the CognitionDataProcessor class include methods for data validation and upserting, as well as synchronization with a Qdrant vector database. A new method in the LLMProcessor class checks for the existence of collections. The test suite is updated to cover the new synchronization feature and various error scenarios.

Changes

File Path Change Summary
kairon/api/app/routers/bot/data.py Added knowledge_vault_sync method for data synchronization via POST request at /cognition/sync.
kairon/shared/cognition/processor.py Added methods: get_pydantic_type, validate_data, upsert_data, sync_with_qdrant to enhance data handling.
kairon/shared/llm/processor.py Added __collection_exists__ method to check for the existence of a collection in the database.
tests/integration_test/services_test.py Added test cases for knowledge_vault_sync, covering successful sync and various error scenarios.
tests/unit_test/llm_test.py Added tests for LLMProcessor to check collection existence with success and failure scenarios.

Possibly related PRs

Suggested reviewers

  • hiteshghuge
  • sfahad1414

🐰 In the garden, data flows,
Syncing here, as knowledge grows.
With collections checked and keys in line,
Our vaults of wisdom, truly divine!
Errors caught, tests run bright,
Hooray for changes, what a sight! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (12)
kairon/api/app/routers/bot/data.py (2)

342-350: Enhance validation error handling and logging

The validation logic could be improved by:

  1. Adding type hints for error_summary
  2. Providing structured error messages
  3. Adding logging for validation failures

Apply this diff:

+from typing import Optional, Dict, Any
+import logging
+
+logger = logging.getLogger(__name__)
+
     error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), data, current_user.get_bot())
 
     if error_summary:
+        logger.error(f"Validation failed for collection {collection_name}: {error_summary}")
         return Response(
             success=False,
-            message="Validation failed",
+            message=f"Validation failed for collection {collection_name}",
             data=error_summary,
             error_code=400
         )

355-359: Enhance response with processing metrics

The success response should include:

  1. Summary of processed records
  2. Processing time information
  3. Detailed success metrics

Apply this diff:

+from time import time
+
+    start_time = time()
+    processed_count = len(data)
+    
+    # ... processing logic ...
+    
+    processing_time = time() - start_time
     return Response(
         success=True,
         message="Processing completed successfully",
-        data=None
+        data={
+            "processed_records": processed_count,
+            "processing_time_seconds": round(processing_time, 2)
+        }
     )
kairon/shared/llm/processor.py (2)

293-307: Enhance error handling and documentation

While the implementation is functionally correct, consider these improvements:

  1. Enhance error handling to differentiate between connection errors and non-existent collections
  2. Add cleanup for the AioRestClient instance
  3. Expand the docstring with parameters, return value, and examples
 async def __collection_exists__(self, collection_name: Text) -> bool:
-    """Check if a collection exists."""
+    """
+    Check if a collection exists in the vector database.
+    
+    Args:
+        collection_name: Name of the collection to check
+        
+    Returns:
+        bool: True if collection exists and is accessible, False otherwise
+        
+    Example:
+        exists = await processor.__collection_exists__("my_collection")
+    """
     try:
+        client = AioRestClient()
         response = await AioRestClient().request(
             http_url=urljoin(self.db_url, f"/collections/{collection_name}"),
             request_method="GET",
             headers=self.headers,
             return_json=True,
             timeout=5
         )
         return response.get('status') == "ok"
+    except ConnectionError as e:
+        logging.error(f"Failed to connect to vector DB: {str(e)}")
+        return False
     except Exception as e:
-        logging.info(e)
+        logging.error(f"Failed to check collection existence: {str(e)}")
         return False
+    finally:
+        await client.cleanup()

293-307: Standardize database operation patterns

The class has multiple methods interacting with the vector database. Consider standardizing:

  1. Client lifecycle management (creation and cleanup)
  2. Error handling and logging patterns
  3. Response processing

Consider creating a base method to handle common vector DB operations:

async def __execute_db_operation__(self, operation_name: str, **kwargs):
    """
    Base method to handle vector DB operations with consistent patterns.
    """
    client = AioRestClient()
    try:
        response = await client.request(**kwargs)
        return self.__process_db_response(response)
    except Exception as e:
        logging.error(f"Failed to execute {operation_name}: {str(e)}")
        raise
    finally:
        await client.cleanup()

This would help maintain consistency across all database operations in the class.

tests/unit_test/data_processor/data_processor_test.py (2)

1367-1521: LGTM with suggestions for improvement!

The test cases thoroughly cover data validation scenarios including success case, missing collection, missing primary key, and column header mismatch. The assertions are clear and appropriate.

To improve maintainability:

  1. Consider moving the test data (metadata, schema setup) to pytest fixtures
  2. Extract common setup code (e.g., CognitionSchema creation) to reduce duplication

Example fixture implementation:

@pytest.fixture
def groceries_metadata():
    return [
        {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True},
        {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True},
        {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True},
        {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}
    ]

@pytest.fixture
def cognition_schema(groceries_metadata):
    schema = CognitionSchema(
        metadata=[ColumnMetadata(**item) for item in groceries_metadata],
        collection_name='groceries',
        user='test_user',
        bot='test_bot',
        timestamp=datetime.utcnow()
    )
    schema.validate(clean=True)
    schema.save()
    yield schema
    CognitionSchema.objects(bot='test_bot', collection_name="groceries").delete()

1586-1587: Document the hardcoded embedding dimension

The embedding dimension (1532) appears to be hardcoded without explanation. Consider adding a comment explaining this specific value or making it a configurable constant.

-        embedding = list(np.random.random(1532))
+        EMBEDDING_DIM = 1532  # Dimension size required by the model
+        embedding = list(np.random.random(EMBEDDING_DIM))
kairon/shared/cognition/processor.py (1)

423-423: Remove commented-out code for cleaner codebase

The commented line # return (str, ...) can be removed to maintain code cleanliness and readability.

Apply this diff to remove the commented code:

        return (constr(strict=True, min_length=1), ...)
-        # return (str, ...)
tests/integration_test/services_test.py (5)

1696-1701: Improve error message assertion for clarity

In test_knowledge_vault_sync_missing_primary_key, the error message assertion can be made more precise by checking the exact content.

Consider updating the assertion:

assert not actual["success"]
-assert actual["message"] == "Primary key 'id' must exist in each row."
+assert actual["message"] == "Primary key 'id' must exist in each data row."
assert actual["error_code"] == 422

Ensure that the error message matches exactly what the API returns.


1438-1449: Validate the status code of the schema creation response

After creating the schema, it's good practice to assert the HTTP status code to ensure the request was successful.

response = client.post(
    url=f"/api/bot/{pytest.bot}/data/cognition/schema",
    json={...},
    headers={"Authorization": pytest.token_type + " " + pytest.access_token}
)
+assert response.status_code == 200
schema_response = response.json()
assert schema_response["message"] == "Schema saved!"

1448-1451: Avoid redundant variable assignment

The variable schema_response is assigned after calling response.json(), but there is no need to assign it if it's not used elsewhere.

schema_response = response.json()
-assert schema_response["message"] == "Schema saved!"
-assert schema_response["error_code"] == 0
+assert response.json()["message"] == "Schema saved!"
+assert response.json()["error_code"] == 0

Alternatively, if schema_response is used later, ensure consistency.


1484-1487: Remove debug print statements

The print(actual) statement can clutter test output. It's better to remove it unless necessary for debugging.

actual = response.json()
-print(actual)
assert actual["success"]

1519-1522: Enhance assertion messages for better test failure understanding

When comparing expected and actual calls, include informative messages to aid debugging if the test fails.

for i, expected in enumerate(expected_calls):
    actual_call = mock_embedding.call_args_list[i].kwargs
    assert actual_call == expected, f"Embedding call {i} does not match expected arguments"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 38d1819 and 5cdc792.

📒 Files selected for processing (5)
  • kairon/api/app/routers/bot/data.py (1 hunks)
  • kairon/shared/cognition/processor.py (2 hunks)
  • kairon/shared/llm/processor.py (1 hunks)
  • tests/integration_test/services_test.py (3 hunks)
  • tests/unit_test/data_processor/data_processor_test.py (2 hunks)
🔥 Files not summarized due to errors (1)
  • tests/unit_test/data_processor/data_processor_test.py: Error: Server error: no LLM provider could handle the message
🧰 Additional context used
🪛 Ruff
kairon/api/app/routers/bot/data.py

335-335: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

kairon/shared/cognition/processor.py

573-573: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (9)
kairon/api/app/routers/bot/data.py (1)

340-340: Verify case-sensitivity requirements and add data structure validation

The current implementation converts all keys to lowercase without:

  1. Validating if case sensitivity is important for the data
  2. Checking data structure before transformation
  3. Handling nested dictionaries

Run this script to check if any existing collections have case-sensitive keys:

kairon/shared/llm/processor.py (1)

293-307: Verify the usage of collection existence check

The new method appears to be added for the knowledge vault sync feature. Let's verify its integration with the codebase.

✅ Verification successful

Private method is properly integrated and used

The __collection_exists__ method is well integrated into the codebase:

  • Used in cognition/processor.py to check collection existence before creation
  • Properly tested in both unit tests and integration tests
  • Used specifically in knowledge vault sync feature as intended
  • Follows consistent pattern with other private methods in the LLMProcessor class
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find usages of the new collection existence check method
# Expected: Should find calls from knowledge vault sync related code

# Search for method calls
rg -p "__collection_exists__" --type python

# Search for knowledge vault sync related files that might need this check
rg -l "knowledge.*vault.*sync" --type python

Length of output: 152


Script:

#!/bin/bash
# Description: Find usages of the new collection existence check method and knowledge vault sync related code
# Using ripgrep without file type filtering since python extension is not recognized

# Search for method calls
rg -p "__collection_exists__"

# Search for knowledge vault sync related files
rg -l "knowledge.*vault.*sync"

# Also search for the class containing this method to understand context
rg -p "class.*:.*\n.*def __collection_exists__" -A 5

Length of output: 982


Script:

#!/bin/bash
# Let's examine the actual usage context of this method

# Look at the usage in cognition processor
rg -A 10 -B 5 "__collection_exists__" "kairon/shared/cognition/processor.py"

# Look at the test cases to understand expected behavior
rg -A 10 -B 5 "__collection_exists__" "tests/unit_test/data_processor/data_processor_test.py"

# Look at the integration test
rg -A 10 -B 5 "__collection_exists__" "tests/integration_test/services_test.py"

Length of output: 3401

tests/unit_test/data_processor/data_processor_test.py (1)

16-16: LGTM!

The import of LLMProcessor is correctly placed and necessary for the test cases.

tests/integration_test/services_test.py (6)

13-13: Verify the inclusion of litellm in the test dependencies

The import of litellm is necessary for mocking but ensure that litellm is included in the test requirements to prevent import errors during test execution.


35-36: Ensure numpy is listed in the test dependencies

The addition of import numpy as np indicates usage within the tests. Verify that numpy is included in the project's test dependencies.


1528-1531: Correct the order of mock arguments in test_knowledge_vault_sync_missing_collection

The order of the mock arguments does not align with the decorators. Update the function signature:

@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_missing_collection(
-    mock_embedding
+    mock_embedding
):
    ...

Note: Since there is only one @mock.patch.object, the argument order is correct here. Ensure consistency across all test functions.


1642-1645: Correct the order of mock arguments in test_knowledge_vault_sync_missing_primary_key

Adjust the function signature to match the order of the decorators:

@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_missing_primary_key(
-    mock_embedding
+    mock_embedding
):
    ...

Similar to previous functions, ensure the argument order matches the decorator order.


1706-1707: Ensure cleanup code is executed

After tests, ensure that any created objects are properly cleaned up to prevent side effects on other tests.

CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete()
LLMSecret.objects.delete()

This cleanup is already present; just verify that it's correctly placed and will always execute, even if exceptions occur.


1404-1410: ⚠️ Potential issue

Correct the order of mock arguments in the test function

The order of the mock arguments in test_knowledge_vault_sync does not match the order of the @mock.patch.object decorators, which can lead to incorrect mocking and test failures.

The arguments should be ordered in the reverse of the decorators:

@mock.patch.object(litellm, "aembedding", autospec=True)
@mock.patch.object(LLMProcessor, "__collection_upsert__", autospec=True)
@mock.patch.object(LLMProcessor, "__create_collection__", autospec=True)
@mock.patch.object(LLMProcessor, "__collection_exists__", autospec=True)
def test_knowledge_vault_sync(
-    mock_embedding, mock_collection_exists, mock_create_collection, mock_collection_upsert
+    mock_collection_exists, mock_create_collection, mock_collection_upsert, mock_embedding
):

Likely invalid or redundant comment.

Comment on lines +352 to +353
await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data,
current_user.get_bot(), current_user.get_user())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and transaction management for database operations

The upsert operation needs:

  1. Try-catch block for database operations
  2. Transaction management for data consistency
  3. Progress tracking for large datasets

Apply this diff:

-    await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data,
-                                    current_user.get_bot(), current_user.get_user())
+    try:
+        total_records = len(data)
+        logger.info(f"Starting upsert operation for {total_records} records")
+        
+        await cognition_processor.upsert_data(
+            primary_key_col.lower(),
+            collection_name.lower(),
+            data,
+            current_user.get_bot(),
+            current_user.get_user()
+        )
+        
+        logger.info(f"Successfully processed {total_records} records")
+    except Exception as e:
+        logger.error(f"Error during upsert operation: {str(e)}")
+        return Response(
+            success=False,
+            message="Failed to process data",
+            data={"error": str(e)},
+            error_code=500
+        )

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +330 to +336
@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance endpoint definition with input validation and documentation

  1. Move Security decorator from parameter default to function body to avoid potential issues with argument defaults
  2. Add input validation for required parameters
  3. Add parameter documentation in docstring

Apply this diff:

 @router.post("/cognition/sync", response_model=Response)
 async def knowledge_vault_sync(
     primary_key_col: str,
     collection_name: str,
     data: List[dict],
-    current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
+    current_user: User = None,
 ):
     """
     Validates and syncs data to the specified MongoDB collection and vector database.
+    
+    Args:
+        primary_key_col: Primary key column name for the collection
+        collection_name: Name of the target collection
+        data: List of dictionaries containing the data to sync
+        current_user: Current authenticated user
+    
+    Returns:
+        Response object with sync status
     """
+    current_user = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
+    if not primary_key_col or not collection_name:
+        raise HTTPException(status_code=400, detail="primary_key_col and collection_name are required")
+    if not data:
+        raise HTTPException(status_code=400, detail="data cannot be empty")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
@router.post("/cognition/sync", response_model=Response)
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
data: List[dict],
current_user: User = None,
):
"""
Validates and syncs data to the specified MongoDB collection and vector database.
Args:
primary_key_col: Primary key column name for the collection
collection_name: Name of the target collection
data: List of dictionaries containing the data to sync
current_user: Current authenticated user
Returns:
Response object with sync status
"""
current_user = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
if not primary_key_col or not collection_name:
raise HTTPException(status_code=400, detail="primary_key_col and collection_name are required")
if not data:
raise HTTPException(status_code=400, detail="data cannot be empty")
🧰 Tools
🪛 Ruff

335-335: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

Comment on lines +1522 to +1707
assert inserted_record.data["quantity"] == "10"

updated_record = next((item for item in upserted_data if item.data["id"] == "2"), None)
assert updated_record is not None
assert updated_record.data["item"] == "Milk"
assert updated_record.data["price"] == "3.00" # Updated price
assert updated_record.data["quantity"] == "5"

CognitionSchema.objects(bot=bot, collection_name="groceries").delete()
CognitionData.objects(bot=bot, collection="groceries").delete()
LLMSecret.objects.delete()

@pytest.mark.asyncio
@patch.object(LLMProcessor, "__collection_exists__", autospec=True)
@patch.object(LLMProcessor, "__create_collection__", autospec=True)
@patch.object(LLMProcessor, "__collection_upsert__", autospec=True)
@patch.object(litellm, "aembedding", autospec=True)
async def test_upsert_data_empty_data_list(self, mock_embedding, mock_collection_upsert, mock_create_collection,
mock_collection_exists):
bot = 'test_bot'
user = 'test_user'
collection_name = 'groceries'
primary_key_col = 'id'

metadata = [
{"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True},
{"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True},
{"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True},
{"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True},
]

cognition_schema = CognitionSchema(
metadata=[ColumnMetadata(**item) for item in metadata],
collection_name=collection_name,
user=user,
bot=bot,
timestamp=datetime.utcnow()
)
cognition_schema.validate(clean=True)
cognition_schema.save()

dummy_data = {
"id": "2",
"item": "Milk",
"price": "2.80",
"quantity": "5"
}
existing_document = CognitionData(
data=dummy_data,
content_type="json",
collection=collection_name,
user=user,
bot=bot,
timestamp=datetime.utcnow()
)
existing_document.save()

upsert_data = []

llm_secret = LLMSecret(
llm_type="openai",
api_key="openai_key",
models=["model1", "model2"],
api_base_url="https://api.example.com",
bot=bot,
user=user
)
llm_secret.save()

mock_collection_exists.return_value = False
mock_create_collection.return_value = None
mock_collection_upsert.return_value = None

embedding = list(np.random.random(1532))
mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]}

processor = CognitionDataProcessor()
result = await processor.upsert_data(
primary_key_col=primary_key_col,
collection_name=collection_name,
data=upsert_data,
bot=bot,
user=user
)

data = list(CognitionData.objects(bot=bot, collection=collection_name))

assert result["message"] == "Upsert complete!"
assert len(data) == 1

existing_record = data[0]
assert existing_record.data["id"] == "2"
assert existing_record.data["item"] == "Milk"
assert existing_record.data["price"] == "2.80"
assert existing_record.data["quantity"] == "5"

CognitionSchema.objects(bot=bot, collection_name=collection_name).delete()
CognitionData.objects(bot=bot, collection=collection_name).delete()
LLMSecret.objects.delete()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error scenario test cases

The current test suite covers successful upsertion and empty data list scenarios. Consider adding test cases for:

  1. Network errors during embedding generation
  2. Vector database upsertion failures
  3. Invalid data types in the input
  4. Concurrent upsertion attempts

Also, the mock setup code is duplicated. Consider moving it to a fixture.

Example implementation for error scenarios:

@pytest.fixture
def mock_llm_dependencies(mocker):
    mock_exists = mocker.patch.object(LLMProcessor, "__collection_exists__", return_value=False)
    mock_create = mocker.patch.object(LLMProcessor, "__create_collection__", return_value=None)
    mock_upsert = mocker.patch.object(LLMProcessor, "__collection_upsert__", return_value=None)
    mock_embed = mocker.patch.object(litellm, "aembedding")
    return mock_exists, mock_create, mock_upsert, mock_embed

@pytest.mark.asyncio
async def test_upsert_data_embedding_failure(mock_llm_dependencies, cognition_schema):
    _, _, _, mock_embed = mock_llm_dependencies
    mock_embed.side_effect = Exception("Failed to generate embeddings")
    
    processor = CognitionDataProcessor()
    with pytest.raises(Exception, match="Failed to generate embeddings"):
        await processor.upsert_data(
            primary_key_col='id',
            collection_name='groceries',
            data=[{"id": 1, "item": "Juice", "price": "2.50", "quantity": "10"}],
            bot='test_bot',
            user='test_user'
        )

err_msg="Unable to train FAQ! Contact support")
logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
except Exception as e:
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use raise ... from e to preserve exception context

When re-raising exceptions, use raise ... from e to maintain the original traceback and exception context, which aids in debugging.

Apply this diff to modify the exception raising:

-        raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
+        raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e
🧰 Tools
🪛 Ruff

573-573: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Comment on lines +513 to +515
for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle None values appropriately when converting data to strings

Converting None values to the string 'None' may lead to unintended behavior. Ensure that None values are handled correctly during the conversion.

Apply this diff to handle None values:

-                row = {str(key): str(value) for key, value in row.items()}
+                row = {str(key): str(value) if value is not None else '' for key, value in row.items()}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)
for row in data:
row = {str(key): str(value) if value is not None else '' for key, value in row.items()}
primary_key_value = row.get(primary_key_col)

Comment on lines +1575 to +1578
@pytest.mark.asyncio
@responses.activate
@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_column_header_mismatch(mock_embedding):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing @pytest.mark.asyncio decorator to test_knowledge_vault_sync_column_header_mismatch

The test function test_knowledge_vault_sync_column_header_mismatch should be marked as an asynchronous test using @pytest.mark.asyncio.

+@pytest.mark.asyncio
@responses.activate
@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_column_header_mismatch(mock_embedding):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@pytest.mark.asyncio
@responses.activate
@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_column_header_mismatch(mock_embedding):
@pytest.mark.asyncio
@responses.activate
@mock.patch.object(litellm, "aembedding", autospec=True)
def test_knowledge_vault_sync_column_header_mismatch(mock_embedding):

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
tests/unit_test/data_processor/data_processor_test.py (2)

19-19: Use uppercase for environment variable names.

Follow Python conventions by using uppercase for environment variable names.

-os.environ["system_file"] = "./tests/testing_data/system.yaml"
+os.environ["SYSTEM_FILE"] = "./tests/testing_data/system.yaml"
🧰 Tools
🪛 Ruff

19-19: Use capitalized environment variable SYSTEM_FILE instead of system_file

Replace system_file with SYSTEM_FILE

(SIM112)


24-24: Remove unused imports.

The following imports are not used in the code:

  • unittest.mock.ANY
  • pydantic.constr

Also applies to: 35-35

🧰 Tools
🪛 Ruff

24-24: unittest.mock.ANY imported but unused

Remove unused import: unittest.mock.ANY

(F401)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 5cdc792 and 2c6f4b7.

📒 Files selected for processing (1)
  • tests/unit_test/data_processor/data_processor_test.py (3 hunks)
🔥 Files not summarized due to errors (1)
  • tests/unit_test/data_processor/data_processor_test.py: Error: Server error: no LLM provider could handle the message
🧰 Additional context used
🪛 Ruff
tests/unit_test/data_processor/data_processor_test.py

19-19: Use capitalized environment variable SYSTEM_FILE instead of system_file

Replace system_file with SYSTEM_FILE

(SIM112)


24-24: unittest.mock.ANY imported but unused

Remove unused import: unittest.mock.ANY

(F401)


35-35: pydantic.constr imported but unused

Remove unused import: pydantic.constr

(F401)

🔇 Additional comments (4)
tests/unit_test/data_processor/data_processor_test.py (4)

1368-1522: LGTM: Comprehensive data validation test cases.

The test cases thoroughly cover:

  • Successful data validation
  • Missing collection scenarios
  • Missing primary key scenarios
  • Column header mismatch scenarios

1523-1708: Consider adding error scenario test cases for upsert operations.

While the current test cases cover successful upsert and empty data scenarios, consider adding test cases for:

  1. Network errors during embedding generation
  2. Invalid data types in the input
  3. Concurrent upsert attempts

Example implementation:

@pytest.mark.asyncio
async def test_upsert_data_embedding_failure(self, mock_llm_dependencies):
    mock_embedding = mock_llm_dependencies[-1]
    mock_embedding.side_effect = Exception("Failed to generate embeddings")
    
    processor = CognitionDataProcessor()
    with pytest.raises(Exception, match="Failed to generate embeddings"):
        await processor.upsert_data(
            primary_key_col='id',
            collection_name='groceries',
            data=[{"id": 1, "item": "Juice", "price": "2.50", "quantity": "10"}],
            bot='test_bot',
            user='test_user'
        )

1709-1888: LGTM: Comprehensive Qdrant synchronization test cases.

The test cases effectively cover:

  • Successful synchronization with Qdrant
  • Connection error handling
  • Proper cleanup of test data

1889-1902: LGTM: Pydantic type helper test cases.

The test cases effectively cover:

  • Integer type conversion
  • Float type conversion
  • Invalid type handling

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (3)
kairon/shared/cognition/processor.py (1)

420-429: Consider enhancing type support and using enums

The type mapping implementation could be more robust:

  1. Add support for common types like boolean and datetime
  2. Use an enum to define supported types instead of string literals
+from enum import Enum
+
+class SupportedDataType(Enum):
+    STRING = 'str'
+    INTEGER = 'int'
+    FLOAT = 'float'
+    BOOLEAN = 'bool'
+    DATETIME = 'datetime'

    @staticmethod
    def get_pydantic_type(data_type: str):
-        if data_type == 'str':
+        if data_type == SupportedDataType.STRING.value:
            return (constr(strict=True, min_length=1), ...)
-        elif data_type == 'int':
+        elif data_type == SupportedDataType.INTEGER.value:
            return (int, ...)
-        elif data_type == 'float':
+        elif data_type == SupportedDataType.FLOAT.value:
            return (float, ...)
+        elif data_type == SupportedDataType.BOOLEAN.value:
+            return (bool, ...)
+        elif data_type == SupportedDataType.DATETIME.value:
+            return (datetime, ...)
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
tests/unit_test/data_processor/data_processor_test.py (2)

19-19: Use uppercase for environment variable names.

Follow Python conventions by using uppercase for environment variable names.

-os.environ["system_file"] = "./tests/testing_data/system.yaml"
+os.environ["SYSTEM_FILE"] = "./tests/testing_data/system.yaml"
🧰 Tools
🪛 Ruff

19-19: Use capitalized environment variable SYSTEM_FILE instead of system_file

Replace system_file with SYSTEM_FILE

(SIM112)


24-24: Remove unused imports.

The following imports are not used in the code:

  • unittest.mock.ANY
  • pydantic.constr

Also applies to: 35-35

🧰 Tools
🪛 Ruff

24-24: unittest.mock.ANY imported but unused

Remove unused import: unittest.mock.ANY

(F401)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2c6f4b7 and ee81b1f.

📒 Files selected for processing (2)
  • kairon/shared/cognition/processor.py (2 hunks)
  • tests/unit_test/data_processor/data_processor_test.py (3 hunks)
🔥 Files not summarized due to errors (1)
  • tests/unit_test/data_processor/data_processor_test.py: Error: Server error: no LLM provider could handle the message
🧰 Additional context used
🪛 Ruff
kairon/shared/cognition/processor.py

572-572: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

tests/unit_test/data_processor/data_processor_test.py

19-19: Use capitalized environment variable SYSTEM_FILE instead of system_file

Replace system_file with SYSTEM_FILE

(SIM112)


24-24: unittest.mock.ANY imported but unused

Remove unused import: unittest.mock.ANY

(F401)


35-35: pydantic.constr imported but unused

Remove unused import: pydantic.constr

(F401)

🔇 Additional comments (5)
kairon/shared/cognition/processor.py (1)

512-514: ⚠️ Potential issue

Avoid indiscriminate type conversion to strings

Converting all values to strings can lead to data type loss and potential issues with numeric operations or comparisons.

-            row = {str(key): str(value) for key, value in row.items()}
+            row = {str(key): str(value) if value is not None else '' for key, value in row.items()}

Likely invalid or redundant comment.

tests/unit_test/data_processor/data_processor_test.py (4)

1368-1522: LGTM: Comprehensive data validation test cases.

The test cases cover various validation scenarios:

  • Success case with valid data
  • Missing collection error
  • Missing primary key error
  • Column header mismatch error

The test setup, assertions, and cleanup are thorough.


1523-1708: Consider adding error scenario test cases for data upsert.

While the current test cases cover successful upsert and empty data list scenarios, there are opportunities to enhance test coverage.

Consider adding test cases for:

  1. Network errors during embedding generation
  2. Vector database upsertion failures
  3. Invalid data types in the input
  4. Concurrent upsertion attempts

Example implementation:

@pytest.mark.asyncio
async def test_upsert_data_embedding_failure(self, mock_llm_dependencies):
    mock_embedding = mock_llm_dependencies[3]
    mock_embedding.side_effect = Exception("Failed to generate embeddings")
    
    processor = CognitionDataProcessor()
    with pytest.raises(Exception, match="Failed to generate embeddings"):
        await processor.upsert_data(
            primary_key_col='id',
            collection_name='groceries',
            data=[{"id": 1, "item": "Juice", "price": "2.50", "quantity": "10"}],
            bot='test_bot',
            user='test_user'
        )

1709-1898: LGTM: Comprehensive Qdrant synchronization test cases.

The test cases effectively cover:

  • Successful synchronization with vector database
  • Error handling for connection failures
  • Proper cleanup of test data

The mock setup and assertions are thorough.


1899-1912: LGTM: Clear and focused utility method tests.

The test cases for get_pydantic_type method cover:

  • Integer type conversion
  • Float type conversion
  • Invalid type handling

The assertions are precise and error cases are properly tested.

Comment on lines +490 to +547
async def upsert_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str, user: Text):
"""
Upserts data into the CognitionData collection.
If document with the primary key exists, it will be updated.
If not, it will be inserted.

Args:
primary_key_col: The primary key column name to check for uniqueness.
collection_name: The collection name (table).
data: List of rows of data to upsert.
bot: The bot identifier associated with the data.
user: The user
"""

from kairon.shared.llm.processor import LLMProcessor
llm_processor = LLMProcessor(bot, DEFAULT_LLM)
suffix = "_faq_embd"
qdrant_collection = f"{bot}_{collection_name}{suffix}" if collection_name else f"{bot}{suffix}"

if await llm_processor.__collection_exists__(qdrant_collection) is False:
await llm_processor.__create_collection__(qdrant_collection)

for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)

payload = {
"data": row,
"content_type": CognitionDataType.json.value,
"collection": collection_name
}
existing_document = CognitionData.objects(
Q(bot=bot) &
Q(collection=collection_name) &
Q(**{f"data__{primary_key_col}": str(primary_key_value)})
).first()

if existing_document:
if not isinstance(existing_document, dict):
existing_document = existing_document.to_mongo().to_dict()
row_id = str(existing_document["_id"])
self.update_cognition_data(row_id, payload, user, bot)
updated_document = CognitionData.objects(id=row_id).first()
if not isinstance(updated_document, dict):
updated_document = updated_document.to_mongo().to_dict()
logger.info(f"Row with {primary_key_col}: {primary_key_value} updated in MongoDB")
await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, updated_document, user,
primary_key_col)
else:
row_id = self.save_cognition_data(payload, user, bot)
new_document = CognitionData.objects(id=row_id).first()
if not isinstance(new_document, dict):
new_document = new_document.to_mongo().to_dict()
logger.info(f"Row with {primary_key_col}: {primary_key_value} inserted in MongoDB")
await self.sync_with_qdrant(llm_processor, qdrant_collection, bot, new_document, user, primary_key_col)

return {"message": "Upsert complete!"}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider implementing batch processing for better performance

The current implementation processes one row at a time, which could be inefficient for large datasets. Consider implementing batch processing for both MongoDB and Qdrant operations.

Additionally, there's a potential race condition in the check-then-act pattern between checking for an existing document and updating it. Consider using MongoDB's atomic operations.

Comment on lines +548 to +572
async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, user, primary_key_col):
"""
Syncs a document with Qdrant vector database by generating embeddings and upserting them.

Args:
llm_processor (LLMProcessor): Instance of LLMProcessor for embedding and Qdrant operations.
collection_name (str): Name of the Qdrant collection.
bot (str): Bot identifier.
document (CognitionData): Document to sync with Qdrant.
user (Text): User performing the operation.

Raises:
AppException: If Qdrant upsert operation fails.
"""
try:
metadata = self.find_matching_metadata(bot, document['data'], document.get('collection'))
search_payload, embedding_payload = Utility.retrieve_search_payload_and_embedding_payload(
document['data'], metadata)
embeddings = await llm_processor.get_embedding(embedding_payload, user, invocation='knowledge_vault_sync')
points = [{'id': document['vector_id'], 'vector': embeddings, 'payload': search_payload}]
await llm_processor.__collection_upsert__(collection_name, {'points': points},
err_msg="Unable to train FAQ! Contact support")
logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
except Exception as e:
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and add retry mechanism

  1. Use raise ... from e to preserve the exception context
  2. Consider adding a retry mechanism for transient failures in vector operations
+from tenacity import retry, stop_after_attempt, wait_exponential

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, user, primary_key_col):
        try:
            metadata = self.find_matching_metadata(bot, document['data'], document.get('collection'))
            search_payload, embedding_payload = Utility.retrieve_search_payload_and_embedding_payload(
                document['data'], metadata)
            embeddings = await llm_processor.get_embedding(embedding_payload, user, invocation='knowledge_vault_sync')
            points = [{'id': document['vector_id'], 'vector': embeddings, 'payload': search_payload}]
            await llm_processor.__collection_upsert__(collection_name, {'points': points},
                                                      err_msg="Unable to train FAQ! Contact support")
            logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
        except Exception as e:
-            raise AppException(f"Failed to sync document with Qdrant: {str(e)}")
+            raise AppException(f"Failed to sync document with Qdrant: {str(e)}") from e

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff

572-572: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Comment on lines +431 to +489
def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.

Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.

Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")

column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)

error_summary = {}

model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)

for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")

row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue

try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details

return error_summary

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize validation performance and enhance error messages

The validation logic could be improved for better performance and user experience:

  1. Move DynamicModel creation outside the row loop
  2. Enhance error messages with expected data types
    def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
        if not CognitionSchema.objects(collection_name=collection_name).first():
            raise AppException(f"Collection '{collection_name}' does not exist.")

        column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
        error_summary = {}

        model_fields = {
            column_name: self.get_pydantic_type(data_type)
            for column_name, data_type in column_dict.items()
        }
        DynamicModel = create_model('DynamicModel', **model_fields)

        for row in data:
            row_key = row.get(primary_key_col)
            if not row_key:
                raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")

            row_errors = []
            if set(row.keys()) != set(column_dict.keys()):
                row_errors.append({
                    "status": "Column headers mismatch",
                    "expected_columns": list(column_dict.keys()),
                    "actual_columns": list(row.keys())
                })
            if row_errors:
                error_summary[row_key] = row_errors
                continue

            try:
                DynamicModel(**row)
            except ValidationError as e:
                error_details = []
                for error in e.errors():
                    column_name = error['loc'][0]
                    input_value = row.get(column_name)
-                    status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
+                    if input_value == "":
+                        status = "Required Field is Empty"
+                    else:
+                        status = f"Invalid DataType - Expected {column_dict[column_name]}, got {type(input_value).__name__}"
                    error_details.append({
                        "column_name": column_name,
                        "input": input_value,
                        "status": status
                    })
                error_summary[row_key] = error_details

        return error_summary
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.
Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.
Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
error_summary = {}
model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)
for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")
row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue
try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details
return error_summary
def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.
Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.
Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)
error_summary = {}
model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
}
DynamicModel = create_model('DynamicModel', **model_fields)
for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")
row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})
if row_errors:
error_summary[row_key] = row_errors
continue
try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
if input_value == "":
status = "Required Field is Empty"
else:
status = f"Invalid DataType - Expected {column_dict[column_name]}, got {type(input_value).__name__}"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details
return error_summary

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
tests/unit_test/llm_test.py (3)

1293-1359: Consider refactoring test setup and adding more test cases.

The test methods have good coverage for success and failure scenarios, but there are some improvements that could be made:

  1. Extract common setup code:
+    @pytest.fixture
+    def setup_llm_secret(self):
+        def _setup_llm_secret(bot, user):
+            llm_secret = LLMSecret(
+                llm_type="openai",
+                api_key="openai_key",
+                models=["model1", "model2"],
+                api_base_url="https://api.example.com",
+                bot=bot,
+                user=user
+            )
+            llm_secret.save()
+            return llm_secret
+        return _setup_llm_secret
  1. Add test case for non-200 response:
@pytest.mark.asyncio
@mock.patch.object(AioRestClient, "request", autospec=True)
async def test_collection_exists_not_found(self, mock_request):
    collection_name = "test_collection"
    mock_request.return_value = {"status": "not_found"}
    
    llm_processor = LLMProcessor("test_bot", DEFAULT_LLM)
    result = await llm_processor.__collection_exists__(collection_name)
    
    assert result is False

1314-1314: Consider using a more descriptive variable name.

The variable name result could be more descriptive to indicate it's a boolean representing collection existence.

-        result = await llm_processor.__collection_exists__(collection_name)
+        collection_exists = await llm_processor.__collection_exists__(collection_name)

Also applies to: 1348-1348


1325-1325: Move cleanup to fixture.

The LLMSecret.objects.delete() cleanup could be moved to a fixture to ensure it runs even if the test fails.

+    @pytest.fixture(autouse=True)
+    def cleanup(self):
+        yield
+        LLMSecret.objects.delete()

Also applies to: 1359-1359

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between ee81b1f and 354dae9.

📒 Files selected for processing (1)
  • tests/unit_test/llm_test.py (2 hunks)

Copy link
Collaborator

@sushantpatade sushantpatade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, further improvement are noted, will be done in next sprint

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants