Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make bulk indexing lambda raise errors #702

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import Dict, List, Optional, Tuple, Union

import pg8000
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from sqlalchemy import create_engine, text
Expand Down Expand Up @@ -111,6 +112,7 @@ def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]:
file_stream = get_s3_file(bucket_name, object_key)
except Exception as e:
logger.error(f"Failed to obtain file {object_key}: {e}")
raise e

document = add_text_content(file, file_stream)

Expand Down Expand Up @@ -169,18 +171,17 @@ def _fetch_files_in_consignment(
c."ConsignmentReference" = :consignment_reference
AND f."FileType" = 'File';
"""
# try:
# result = session.execute(
# text(query), {"consignment_reference": consignment_reference}
# ).fetchall()
# except pg8000.Error as e:
# raise Exception(f"Database query failed: {e}")
# finally:
# session.close()

result = session.execute(
text(query), {"consignment_reference": consignment_reference}
).fetchall()
try:
result = session.execute(
text(query), {"consignment_reference": consignment_reference}
).fetchall()
except pg8000.Error as e:
logger.error(
f"Failed to retrieve file metadata from database for consignment reference: {consignment_reference}"
)
session.close()
raise e

session.close()

# Process query results
Expand Down Expand Up @@ -231,16 +232,9 @@ def bulk_index_files_in_opensearch(
Returns:
None
"""
bulk_data = []
for doc in documents:
bulk_data.append(
json.dumps(
{"index": {"_index": "documents", "_id": doc["file_id"]}}
)
)
bulk_data.append(json.dumps(doc["document"]))
opensearch_index = "documents"

bulk_payload = "\n".join(bulk_data) + "\n"
bulk_payload = _prepare_bulk_index_payload(documents, opensearch_index)

open_search = OpenSearch(
open_search_host_url,
Expand All @@ -251,8 +245,6 @@ def bulk_index_files_in_opensearch(
connection_class=RequestsHttpConnection,
)

opensearch_index = "documents"

try:
response = open_search.bulk(
index=opensearch_index,
Expand All @@ -267,11 +259,27 @@ def bulk_index_files_in_opensearch(
logger.info(response)

if response["errors"]:
logger.error("Errors occurred during bulk indexing")
logger.info("Opensearch bulk indexing completed with errors")
error_message = "Opensearch bulk indexing errors:"
for item in response["items"]:
if "error" in item.get("index", {}):
logger.error(
f"Error for document ID {item['index']['_id']}: {item['index']['error']}"
)
error_message += f"\nError for document ID {item['index']['_id']}: {item['index']['error']}"
raise Exception(error_message)
else:
logger.info("Bulk indexing completed successfully")
logger.info("Opensearch bulk indexing completed successfully")


def _prepare_bulk_index_payload(
documents: List[Dict[str, Union[str, Dict]]], opensearch_index: str
) -> str:
bulk_data = []
for doc in documents:
bulk_data.append(
json.dumps(
{"index": {"_index": opensearch_index, "_id": doc["file_id"]}}
)
)
bulk_data.append(json.dumps(doc["document"]))

bulk_payload = "\n".join(bulk_data) + "\n"
return bulk_payload
Loading
Loading