From f73b0a5493c662633630cc7fc56530cb1bdb574c Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 11 Dec 2024 11:28:28 +0000 Subject: [PATCH 1/5] Raise error in lambda if opensearch bulk indexing call returns any errors for individual documents --- .../bulk_index_consignment.py | 16 +- .../test_bulk_index_files_in_opensearch.py | 277 +++++++++++++----- 2 files changed, 204 insertions(+), 89 deletions(-) diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py index 41cbec5b..7c1fb846 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py @@ -231,11 +231,13 @@ def bulk_index_files_in_opensearch( Returns: None """ + opensearch_index = "documents" + bulk_data = [] for doc in documents: bulk_data.append( json.dumps( - {"index": {"_index": "documents", "_id": doc["file_id"]}} + {"index": {"_index": opensearch_index, "_id": doc["file_id"]}} ) ) bulk_data.append(json.dumps(doc["document"])) @@ -251,8 +253,6 @@ def bulk_index_files_in_opensearch( connection_class=RequestsHttpConnection, ) - opensearch_index = "documents" - try: response = open_search.bulk( index=opensearch_index, @@ -267,11 +267,11 @@ def bulk_index_files_in_opensearch( logger.info(response) if response["errors"]: - logger.error("Errors occurred during bulk indexing") + logger.info("Opensearch bulk indexing completed with errors") + error_message = "Opensearch bulk indexing errors:" for item in response["items"]: if "error" in item.get("index", {}): - logger.error( - f"Error for document ID {item['index']['_id']}: {item['index']['error']}" - ) + error_message += f"\nError for document ID {item['index']['_id']}: {item['index']['error']}" + raise Exception(error_message) else: - logger.info("Bulk indexing completed successfully") + logger.info("Opensearch bulk indexing completed successfully") diff --git a/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py index 81e78be4..4c670db7 100644 --- a/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py +++ b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py @@ -1,5 +1,7 @@ +import re from unittest import mock +import pytest from opensearch_indexer.index_consignment.bulk_index_consignment import ( bulk_index_files_in_opensearch, ) @@ -9,20 +11,23 @@ @mock.patch( "opensearch_indexer.index_consignment.bulk_index_consignment.OpenSearch" ) -def test_index_file_content_and_metadata_in_opensearch(mock_open_search): +def test_index_file_content_and_metadata_in_opensearch( + mock_open_search, caplog +): """ + Test the `bulk_index_files_in_opensearch` function for successful indexing. + Given: - - A file stream representing a text file. - - An SQLite database mimicking the file data. - - OpenSearch connection details. + - A list of document dictionaries containing file IDs and content. + - Mocked OpenSearch connection details. When: - - The index_file_content_and_metadata_in_opensearch function is invoked. + - `bulk_index_files_in_opensearch` is invoked. Then: - - The relevant file data is fetched from the database. - - The file's text content is extracted using real extract_text. - - The file is indexed in OpenSearch with the extracted text. + - Documents are indexed in OpenSearch successfully. + - OpenSearch client is initialized with the correct parameters. + - Logs confirm the completion of the bulk indexing operation without errors. """ open_search_host_url = "test_open_search_host_url" open_search_http_auth = mock.Mock() @@ -31,92 +36,69 @@ def test_index_file_content_and_metadata_in_opensearch(mock_open_search): { "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", "document": { - "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", - "file_name": "path0", - "file_reference": "ZD8MCK", - "file_path": "data/E2E_tests/original/path0", - "citeable_reference": "MOCK1 123/ZD8MCK", - "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", - "series_name": "MOCK1 123", - "transferring_body": "Mock 1 Department", - "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", - "transferring_body_description": "Mock 1 Department", - "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", - "consignment_reference": "TDR-2024-KKX4", - "file_type": "File", - "file_size": "1024", - "rights_copyright": "Crown Copyright", - "legal_status": "Public Record(s)", - "held_by": "The National Archives, Kew", - "date_last_modified": "2024-03-05T15:05:31", - "closure_type": "Open", - "title_closed": "false", - "description_closed": "false", - "language": "English", - "content": "", - "text_extraction_status": "n/a", + "a": "foo1", + "b": "bar1", }, }, { "file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", "document": { - "file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", - "file_name": "path2", - "file_reference": "ZD8MCN", - "file_path": "data/E2E_tests/original/path2", - "citeable_reference": "MOCK1 123/ZD8MCN", - "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", - "series_name": "MOCK1 123", - "transferring_body": "Mock 1 Department", - "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", - "transferring_body_description": "Mock 1 Department", - "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", - "consignment_reference": "TDR-2024-KKX4", - "file_type": "File", - "file_size": "1024", - "rights_copyright": "Crown Copyright", - "legal_status": "Public Record(s)", - "held_by": "The National Archives, Kew", - "date_last_modified": "2024-03-05T15:05:31", - "closure_type": "Open", - "title_closed": "false", - "description_closed": "false", - "language": "English", - "content": "", - "text_extraction_status": "n/a", + "c": "foo2", + "d": "bar2", }, }, { "file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", "document": { - "file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", - "file_name": "path1", - "file_reference": "ZD8MCL", - "file_path": "data/E2E_tests/original/path1", - "citeable_reference": "MOCK1 123/ZD8MCL", - "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", - "series_name": "MOCK1 123", - "transferring_body": "Mock 1 Department", - "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", - "transferring_body_description": "Mock 1 Department", - "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", - "consignment_reference": "TDR-2024-KKX4", - "file_type": "File", - "file_size": "1024", - "rights_copyright": "Crown Copyright", - "legal_status": "Public Record(s)", - "held_by": "The National Archives, Kew", - "date_last_modified": "2024-03-05T15:05:31", - "closure_type": "Open", - "title_closed": "false", - "description_closed": "false", - "language": "English", - "content": "", - "text_extraction_status": "n/a", + "e": "foo3", + "f": "bar3", }, }, ] + mock_opensearch_response = { + "took": 203, + "errors": False, + "items": [ + { + "index": { + "_index": "documents", + "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "_version": 26, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "_seq_no": 420, + "status": 200, + } + }, + { + "index": { + "_index": "documents", + "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", + "_version": 26, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "_seq_no": 421, + "_primary_term": 8, + "status": 200, + } + }, + { + "index": { + "_index": "documents", + "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", + "_version": 26, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "_seq_no": 422, + "_primary_term": 8, + "status": 200, + } + }, + ], + } + mock_open_search.return_value.bulk.return_value = mock_opensearch_response + bulk_index_files_in_opensearch( documents, open_search_host_url, @@ -133,6 +115,139 @@ def test_index_file_content_and_metadata_in_opensearch(mock_open_search): ) mock_open_search.return_value.bulk.assert_called_once_with( index="documents", - body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", "file_name": "path0", "file_reference": "ZD8MCK", "file_path": "data/E2E_tests/original/path0", "citeable_reference": "MOCK1 123/ZD8MCK", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", "file_name": "path2", "file_reference": "ZD8MCN", "file_path": "data/E2E_tests/original/path2", "citeable_reference": "MOCK1 123/ZD8MCN", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", "file_name": "path1", "file_reference": "ZD8MCL", "file_path": "data/E2E_tests/original/path1", "citeable_reference": "MOCK1 123/ZD8MCL", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n', # noqa: E501 + body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"a": "foo1", "b": "bar1"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"c": "foo2", "d": "bar2"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"e": "foo3", "f": "bar3"}\n', # noqa: E501 timeout=60, ) + + assert [rec.message for rec in caplog.records] == [ + "Opensearch bulk indexing call completed with response", + str(mock_opensearch_response), + "Opensearch bulk indexing completed successfully", + ] + + +@mock.patch( + "opensearch_indexer.index_consignment.bulk_index_consignment.OpenSearch" +) +def test_index_file_content_and_metadata_in_opensearch_with_document_indexing_errors( + mock_open_search, caplog +): + """ + Test the `bulk_index_files_in_opensearch` function for error handling during indexing. + + Given: + - A list of document dictionaries containing file IDs and content. + - Mocked OpenSearch connection details. + + When: + - `bulk_index_files_in_opensearch` is invoked, and OpenSearch returns errors for some documents. + + Then: + - An exception is raised for the failed documents. + - OpenSearch client is initialized with the correct parameters. + - Logs indicate the presence of errors in the indexing operation. + """ + open_search_host_url = "test_open_search_host_url" + open_search_http_auth = mock.Mock() + + documents = [ + { + "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "document": { + "a": "foo1", + "b": "bar1", + }, + }, + { + "file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", + "document": { + "c": "foo2", + "d": "bar2", + }, + }, + { + "file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", + "document": { + "e": "foo3", + "f": "bar3", + }, + }, + ] + + mock_opensearch_response = { + "took": 203, + "errors": True, + "items": [ + { + "index": { + "_index": "documents", + "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "_version": 26, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 1}, + "_seq_no": 420, + "status": 200, + } + }, + { + "index": { + "_index": "documents", + "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", + "_version": 26, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "_seq_no": 421, + "_primary_term": 8, + "status": 200, + } + }, + { + "index": { + "_index": "documents", + "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", + "error": { + "type": "document_missing_exception", + "reason": "[_doc][tt0816711]: document missing", + "index": "documents", + "shard": "0", + "index_uuid": "yhizhusbSWmP0G7OJnmcLg", + }, + "status": 404, + } + }, + ], + } + + mock_open_search.return_value.bulk.return_value = mock_opensearch_response + + with pytest.raises( + Exception, + match=re.escape( + "Opensearch bulk indexing errors:\nError for document ID 47526ba9-88e5-4cc8-8bc1-d682a10fa270: {'type': 'document_missing_exception', 'reason': '[_doc][tt0816711]: document missing', 'index': 'documents', 'shard': '0', 'index_uuid': 'yhizhusbSWmP0G7OJnmcLg'}" # noqa: E501 + ), + ): + bulk_index_files_in_opensearch( + documents, + open_search_host_url, + open_search_http_auth, + ) + + mock_open_search.assert_called_once_with( + open_search_host_url, + http_auth=open_search_http_auth, + use_ssl=True, + verify_certs=True, + ca_certs=None, + connection_class=RequestsHttpConnection, + ) + mock_open_search.return_value.bulk.assert_called_once_with( + index="documents", + body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"a": "foo1", "b": "bar1"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"c": "foo2", "d": "bar2"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"e": "foo3", "f": "bar3"}\n', # noqa: E501 + timeout=60, + ) + + assert [rec.message for rec in caplog.records] == [ + "Opensearch bulk indexing call completed with response", + str(mock_opensearch_response), + "Opensearch bulk indexing completed with errors", + ] From c119e20735e7a16b83952142171e55091286ecf1 Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 11 Dec 2024 14:22:17 +0000 Subject: [PATCH 2/5] Add test_index_file_content_and_metadata_in_opensearch_with_bulk_api_exception --- .../test_bulk_index_files_in_opensearch.py | 90 ++++++++++++++++++- 1 file changed, 87 insertions(+), 3 deletions(-) diff --git a/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py index 4c670db7..ef8efa9b 100644 --- a/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py +++ b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py @@ -115,7 +115,14 @@ def test_index_file_content_and_metadata_in_opensearch( ) mock_open_search.return_value.bulk.assert_called_once_with( index="documents", - body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"a": "foo1", "b": "bar1"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"c": "foo2", "d": "bar2"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"e": "foo3", "f": "bar3"}\n', # noqa: E501 + body=( + '{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n' + '{"a": "foo1", "b": "bar1"}\n' + '{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n' + '{"c": "foo2", "d": "bar2"}\n' + '{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n' + '{"e": "foo3", "f": "bar3"}\n' + ), timeout=60, ) @@ -223,7 +230,12 @@ def test_index_file_content_and_metadata_in_opensearch_with_document_indexing_er with pytest.raises( Exception, match=re.escape( - "Opensearch bulk indexing errors:\nError for document ID 47526ba9-88e5-4cc8-8bc1-d682a10fa270: {'type': 'document_missing_exception', 'reason': '[_doc][tt0816711]: document missing', 'index': 'documents', 'shard': '0', 'index_uuid': 'yhizhusbSWmP0G7OJnmcLg'}" # noqa: E501 + ( + "Opensearch bulk indexing errors:\n" + "Error for document ID 47526ba9-88e5-4cc8-8bc1-d682a10fa270: " + "{'type': 'document_missing_exception', 'reason': '[_doc][tt0816711]: document missing', " + "'index': 'documents', 'shard': '0', 'index_uuid': 'yhizhusbSWmP0G7OJnmcLg'}" + ) ), ): bulk_index_files_in_opensearch( @@ -242,7 +254,14 @@ def test_index_file_content_and_metadata_in_opensearch_with_document_indexing_er ) mock_open_search.return_value.bulk.assert_called_once_with( index="documents", - body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"a": "foo1", "b": "bar1"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"c": "foo2", "d": "bar2"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"e": "foo3", "f": "bar3"}\n', # noqa: E501 + body=( + '{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n' + '{"a": "foo1", "b": "bar1"}\n' + '{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n' + '{"c": "foo2", "d": "bar2"}\n' + '{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n' + '{"e": "foo3", "f": "bar3"}\n' + ), timeout=60, ) @@ -251,3 +270,68 @@ def test_index_file_content_and_metadata_in_opensearch_with_document_indexing_er str(mock_opensearch_response), "Opensearch bulk indexing completed with errors", ] + + +@mock.patch( + "opensearch_indexer.index_consignment.bulk_index_consignment.OpenSearch" +) +def test_index_file_content_and_metadata_in_opensearch_with_bulk_api_exception( + mock_open_search, caplog +): + """ + Test the `bulk_index_files_in_opensearch` function for handling exceptions raised by the OpenSearch bulk API. + + Given: + - A list of document dictionaries containing file IDs and content. + - Mocked OpenSearch connection details. + + When: + - `bulk_index_files_in_opensearch` is invoked, and the OpenSearch bulk API raises an exception. + + Then: + - The exception is propagated as expected. + - Logs capture the error details. + """ + open_search_host_url = "test_open_search_host_url" + open_search_http_auth = mock.Mock() + + documents = [ + { + "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "document": {"a": "foo1", "b": "bar1"}, + } + ] + + mock_open_search.return_value.bulk.side_effect = Exception( + "Simulated OpenSearch bulk API failure" + ) + + with pytest.raises( + Exception, match="Simulated OpenSearch bulk API failure" + ): + bulk_index_files_in_opensearch( + documents, + open_search_host_url, + open_search_http_auth, + ) + + mock_open_search.assert_called_once_with( + open_search_host_url, + http_auth=open_search_http_auth, + use_ssl=True, + verify_certs=True, + ca_certs=None, + connection_class=RequestsHttpConnection, + ) + mock_open_search.return_value.bulk.assert_called_once_with( + index="documents", + body=( + '{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n' + '{"a": "foo1", "b": "bar1"}\n' + ), + timeout=60, + ) + + assert [rec.message for rec in caplog.records] == [ + "Opensearch bulk indexing call failed: Simulated OpenSearch bulk API failure" + ] From 1dd0acac06405fc1fddc2579d35af029cc701a65 Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 11 Dec 2024 14:25:08 +0000 Subject: [PATCH 3/5] Reraise file retrieval exception in consignment indexing lambda --- .../index_consignment/bulk_index_consignment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py index 7c1fb846..67f7a342 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py @@ -111,6 +111,7 @@ def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]: file_stream = get_s3_file(bucket_name, object_key) except Exception as e: logger.error(f"Failed to obtain file {object_key}: {e}") + raise e document = add_text_content(file, file_stream) From 8ce10b1d03b985232b8e0269197cd364a6961258 Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 11 Dec 2024 14:36:00 +0000 Subject: [PATCH 4/5] Error handling around query to fetch file metadata for consignment in consignment indexing lambda --- .../bulk_index_consignment.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py index 67f7a342..0f66deac 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py @@ -2,6 +2,7 @@ import logging from typing import Dict, List, Optional, Tuple, Union +import pg8000 from opensearchpy import OpenSearch, RequestsHttpConnection from requests_aws4auth import AWS4Auth from sqlalchemy import create_engine, text @@ -170,18 +171,17 @@ def _fetch_files_in_consignment( c."ConsignmentReference" = :consignment_reference AND f."FileType" = 'File'; """ - # try: - # result = session.execute( - # text(query), {"consignment_reference": consignment_reference} - # ).fetchall() - # except pg8000.Error as e: - # raise Exception(f"Database query failed: {e}") - # finally: - # session.close() - - result = session.execute( - text(query), {"consignment_reference": consignment_reference} - ).fetchall() + try: + result = session.execute( + text(query), {"consignment_reference": consignment_reference} + ).fetchall() + except pg8000.Error as e: + logger.error( + f"Failed to retrieve file metadata from database for consignment reference: {consignment_reference}" + ) + session.close() + raise e + session.close() # Process query results From 69efe0aa4651b8f7f16123aa7bd45b4774e0c32f Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 11 Dec 2024 15:44:04 +0000 Subject: [PATCH 5/5] Extract _prepare_bulk_index_payload from bulk_index_files_in_opensearch --- .../bulk_index_consignment.py | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py index 0f66deac..0e5655e9 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py @@ -234,16 +234,7 @@ def bulk_index_files_in_opensearch( """ opensearch_index = "documents" - bulk_data = [] - for doc in documents: - bulk_data.append( - json.dumps( - {"index": {"_index": opensearch_index, "_id": doc["file_id"]}} - ) - ) - bulk_data.append(json.dumps(doc["document"])) - - bulk_payload = "\n".join(bulk_data) + "\n" + bulk_payload = _prepare_bulk_index_payload(documents, opensearch_index) open_search = OpenSearch( open_search_host_url, @@ -276,3 +267,19 @@ def bulk_index_files_in_opensearch( raise Exception(error_message) else: logger.info("Opensearch bulk indexing completed successfully") + + +def _prepare_bulk_index_payload( + documents: List[Dict[str, Union[str, Dict]]], opensearch_index: str +) -> str: + bulk_data = [] + for doc in documents: + bulk_data.append( + json.dumps( + {"index": {"_index": opensearch_index, "_id": doc["file_id"]}} + ) + ) + bulk_data.append(json.dumps(doc["document"])) + + bulk_payload = "\n".join(bulk_data) + "\n" + return bulk_payload