From 6d0def425e70a8df9875c8d3784121dc77ac52a7 Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Wed, 4 Dec 2024 11:55:08 +0000 Subject: [PATCH] Create consignment indexing lambda for opensearch --- data_management/opensearch_indexer/Dockerfile | 2 +- .../opensearch_indexer/aws_helpers.py | 49 ++++ .../index_consignment/__init__.py | 0 .../bulk_index_consignment.py | 259 ++++++++++++++++++ .../index_consignment/lambda_function.py | 53 ++++ ...file_content_and_metadata_in_opensearch.py | 26 +- ...ent_and_metadata_in_opensearch_from_aws.py | 14 +- .../opensearch_indexer/text_extraction.py | 42 +++ .../test_bulk_index_files_in_opensearch.py | 137 +++++++++ .../tests/test_process_and_index_file.py | 40 +-- .../tests/test_text_extraction.py | 40 +++ 11 files changed, 587 insertions(+), 75 deletions(-) create mode 100644 data_management/opensearch_indexer/opensearch_indexer/aws_helpers.py create mode 100644 data_management/opensearch_indexer/opensearch_indexer/index_consignment/__init__.py create mode 100644 data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py create mode 100644 data_management/opensearch_indexer/opensearch_indexer/index_consignment/lambda_function.py create mode 100644 data_management/opensearch_indexer/opensearch_indexer/text_extraction.py create mode 100644 data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py create mode 100644 data_management/opensearch_indexer/tests/test_text_extraction.py diff --git a/data_management/opensearch_indexer/Dockerfile b/data_management/opensearch_indexer/Dockerfile index d2faaa47..9c2bbc41 100644 --- a/data_management/opensearch_indexer/Dockerfile +++ b/data_management/opensearch_indexer/Dockerfile @@ -10,4 +10,4 @@ RUN pip install -r requirements.txt COPY opensearch_indexer/ ${LAMBDA_TASK_ROOT}/opensearch_indexer -CMD [ "opensearch_indexer.lambda_function.lambda_handler" ] +CMD [ "opensearch_indexer.index_consignment.lambda_function.lambda_handler" ] diff --git a/data_management/opensearch_indexer/opensearch_indexer/aws_helpers.py b/data_management/opensearch_indexer/opensearch_indexer/aws_helpers.py new file mode 100644 index 00000000..8e286b44 --- /dev/null +++ b/data_management/opensearch_indexer/opensearch_indexer/aws_helpers.py @@ -0,0 +1,49 @@ +import json +import logging +from typing import Any, Dict +from urllib.parse import quote_plus + +import boto3 +from requests_aws4auth import AWS4Auth + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def get_s3_file(bucket_name: str, object_key: str) -> bytes: + s3 = boto3.client("s3") + s3_file_object = s3.get_object(Bucket=bucket_name, Key=object_key) + return s3_file_object["Body"].read() + + +def get_secret_data(secret_id: str) -> Dict[str, Any]: + sm = boto3.client("secretsmanager") + secret_response = sm.get_secret_value(SecretId=secret_id) + return json.loads(secret_response["SecretString"]) + + +def _build_db_url(secret_string: Dict[str, Any]) -> str: + return ( + "postgresql+pg8000://" + f'{secret_string["DB_USER"]}:{quote_plus(secret_string["DB_PASSWORD"])}' + f'@{secret_string["DB_HOST"]}:{secret_string["DB_PORT"]}/{secret_string["DB_NAME"]}' + ) + + +def _get_opensearch_auth(secret_string: Dict[str, Any]) -> AWS4Auth: + sts_client = boto3.client("sts") + assumed_role = sts_client.assume_role( + RoleArn=secret_string["OPEN_SEARCH_MASTER_ROLE_ARN"], + RoleSessionName="LambdaOpenSearchSession", + ) + logger.info("Extract temporary credentials to access OpenSearch") + credentials = assumed_role["Credentials"] + open_search_http_auth = AWS4Auth( + credentials["AccessKeyId"], + credentials["SecretAccessKey"], + secret_string["AWS_REGION"], + "es", + session_token=credentials["SessionToken"], + ) + + return open_search_http_auth diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/__init__.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py new file mode 100644 index 00000000..85138cb7 --- /dev/null +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py @@ -0,0 +1,259 @@ +import json +import logging +from typing import Dict, List, Optional, Tuple, Union + +import pg8000 +from opensearchpy import OpenSearch, RequestsHttpConnection +from requests_aws4auth import AWS4Auth +from sqlalchemy import create_engine, text +from sqlalchemy.orm import declarative_base, sessionmaker + +from ..aws_helpers import ( + _build_db_url, + _get_opensearch_auth, + get_s3_file, + get_secret_data, +) +from ..text_extraction import add_text_content + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def bulk_index_consignment_from_aws( + consignment_reference: str, bucket_name: str, secret_id: str +) -> None: + """ + Retrieve credentials and host information from AWS Secrets Manager, fetch consignment data, + and index it in OpenSearch. + + Args: + consignment_reference (str): The reference identifier for the consignment. + bucket_name (str): The name of the S3 bucket containing file records. + secret_id (str): The ID of the AWS secret storing database and OpenSearch credentials. + + Returns: + None + """ + secret_string = get_secret_data(secret_id) + database_url = _build_db_url(secret_string) + open_search_host_url = secret_string["OPEN_SEARCH_HOST"] + open_search_http_auth = _get_opensearch_auth(secret_string) + + bulk_index_consignment( + consignment_reference, + bucket_name, + database_url, + open_search_host_url, + open_search_http_auth, + ) + + +def bulk_index_consignment( + consignment_reference: str, + bucket_name: str, + database_url: str, + open_search_host_url: str, + open_search_http_auth: Union[AWS4Auth, Tuple[str, str]], + open_search_ca_certs: Optional[str] = None, +) -> None: + """ + Fetch files associated with a consignment and index them in OpenSearch. + + Args: + consignment_reference (str): The consignment reference identifier. + bucket_name (str): The S3 bucket name containing files. + database_url (str): The connection string for the PostgreSQL database. + open_search_host_url (str): The host URL of the OpenSearch cluster. + open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials for OpenSearch. + open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification. + + Returns: + None + """ + files = _fetch_files_in_consignment(consignment_reference, database_url) + documents_to_index = _construct_documents(files, bucket_name) + bulk_index_files_in_opensearch( + documents_to_index, + open_search_host_url, + open_search_http_auth, + open_search_ca_certs, + ) + + +def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]: + """ + Construct a list of documents to be indexed in OpenSearch from file metadata. + + Args: + files (List[Dict]): The list of file metadata dictionaries. + bucket_name (str): The S3 bucket name where the files are stored. + + Returns: + List[Dict]: A list of documents ready for indexing. + """ + documents_to_index = [] + for file in files: + object_key = file["consignment_reference"] + "/" + str(file["file_id"]) + + logger.info(f"Processing file: {object_key}") + + file_stream = None + document = file + + try: + file_stream = get_s3_file(bucket_name, object_key) + except Exception as e: + logger.error(f"Failed to obtain file {object_key}: {e}") + + document = add_text_content(file, file_stream) + + documents_to_index.append( + {"file_id": file["file_id"], "document": document} + ) + return documents_to_index + + +def _fetch_files_in_consignment( + consignment_reference: str, database_url: str +) -> List[Dict]: + """ + Fetch file metadata associated with the given consignment reference. + + Args: + consignment_reference (str): The consignment reference identifier. + database_url (str): The connection string for the PostgreSQL database. + + Returns: + List[Dict]: A list of file metadata dictionaries. + """ + engine = create_engine(database_url) + Base = declarative_base() + Base.metadata.reflect(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + + query = """ + SELECT + f."FileId" AS file_id, + f."FileName" AS file_name, + f."FileReference" AS file_reference, + f."FilePath" AS file_path, + f."CiteableReference" AS citeable_reference, + s."SeriesId" AS series_id, + s."Name" AS series_name, + b."Name" AS transferring_body, + b."BodyId" AS transferring_body_id, + b."Description" AS transferring_body_description, + c."ConsignmentId" AS consignment_id, + c."ConsignmentReference" AS consignment_reference, + fm."PropertyName", + fm."Value" + FROM + "File" f + JOIN + "Consignment" c ON f."ConsignmentId" = c."ConsignmentId" + JOIN + "Series" s ON c."SeriesId" = s."SeriesId" + JOIN + "Body" b ON s."BodyId" = b."BodyId" + LEFT JOIN + "FileMetadata" fm ON f."FileId" = fm."FileId" + WHERE + c."ConsignmentReference" = :consignment_reference + AND f."FileType" = 'File'; + """ + try: + result = session.execute( + text(query), {"consignment_reference": consignment_reference} + ).fetchall() + except pg8000.Error as e: + raise Exception(f"Database query failed: {e}") + finally: + session.close() + + # Process query results + files_data = {} + + for row in result: + file_id = str(row.file_id) + if file_id not in files_data: + files_data[file_id] = { + "file_id": str(row.file_id), + "file_name": str(row.file_name), + "file_reference": str(row.file_reference), + "file_path": str(row.file_path), + "citeable_reference": str(row.citeable_reference), + "series_id": str(row.series_id), + "series_name": str(row.series_name), + "transferring_body": str(row.transferring_body), + "transferring_body_id": str(row.transferring_body_id), + "transferring_body_description": str( + row.transferring_body_description + ), + "consignment_id": str(row.consignment_id), + "consignment_reference": str(row.consignment_reference), + } + + if row.PropertyName: + files_data[file_id][row.PropertyName] = str(row.Value) + + return list(files_data.values()) + + +def bulk_index_files_in_opensearch( + documents: List[Dict[str, Union[str, Dict]]], + open_search_host_url: str, + open_search_http_auth: Union[AWS4Auth, Tuple[str, str]], + open_search_ca_certs: Optional[str] = None, +) -> None: + """ + Perform bulk indexing of documents in OpenSearch. + + Args: + documents (List[Dict[str, Union[str, Dict]]]): The documents to index. + open_search_host_url (str): The OpenSearch cluster URL. + open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials. + open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification. + + Returns: + None + """ + bulk_data = [] + for doc in documents: + bulk_data.append( + json.dumps( + {"index": {"_index": "documents", "_id": doc["file_id"]}} + ) + ) + bulk_data.append(json.dumps(doc["document"])) + + bulk_payload = "\n".join(bulk_data) + "\n" + + open_search = OpenSearch( + open_search_host_url, + http_auth=open_search_http_auth, + use_ssl=True, + verify_certs=True, + ca_certs=open_search_ca_certs, + connection_class=RequestsHttpConnection, + ) + + opensearch_index = "documents" + + try: + response = open_search.bulk(index=opensearch_index, body=bulk_payload) + logger.info("Opensearch bulk command executed") + logger.info(response) + + if response["errors"]: + logger.error("Errors occurred during bulk indexing") + for item in response["items"]: + if "error" in item.get("index", {}): + logger.error( + f"Error for document ID {item['index']['_id']}: {item['index']['error']}" + ) + else: + logger.info("Bulk indexing completed successfully") + except Exception as e: + logger.error(f"Bulk indexing failed: {e}") diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_consignment/lambda_function.py b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/lambda_function.py new file mode 100644 index 00000000..d7ca6b9c --- /dev/null +++ b/data_management/opensearch_indexer/opensearch_indexer/index_consignment/lambda_function.py @@ -0,0 +1,53 @@ +import logging +import os +from typing import Any, Dict + +from .bulk_index_consignment import bulk_index_files_in_opensearch_from_aws + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def lambda_handler(event: Dict[str, Any], context: Any) -> None: + """ + AWS Lambda handler function to trigger the indexing of consignment files into OpenSearch. + + This function is invoked by an AWS event containing details of a consignment. It retrieves + the necessary parameters from the event and environment variables, then calls the indexing + function to process and index the files into OpenSearch. + + Args: + event (Dict[str, Any]): The event data triggering the Lambda function. Expected to contain: + - `detail` (Dict[str, Any]): A dictionary with `parameters` that includes: + - `reference` (str): The consignment reference identifier. + context (Any): AWS Lambda context object (not used in this function). + + Environment Variables: + BUCKET_NAME (str): The name of the S3 bucket where the files are stored. + SECRET_ID (str): The identifier for the AWS Secrets Manager secret containing database + and OpenSearch credentials. + + Raises: + Exception: If `consignment_reference`, `BUCKET_NAME`, or `SECRET_ID` are missing. + """ + logger.info("Lambda started") + logger.info("Event received: %s", event) + + # Extract parameters from the event and environment variables + detail = event.get("detail", {}) + consignment_reference = detail.get("parameters", {}).get("reference") + bucket_name = os.getenv("BUCKET_NAME") + secret_id = os.getenv("SECRET_ID") + + # Validate required parameters + if not consignment_reference or not bucket_name or not secret_id: + error_message = "Missing consignment_reference, BUCKET_NAME, or SECRET_ID required for indexing" + logger.error(error_message) + raise Exception(error_message) + + # Log and process the consignment reference + logger.info(f"Processing consignment reference: {consignment_reference}") + bulk_index_files_in_opensearch_from_aws( + consignment_reference, bucket_name, secret_id + ) + logger.info("Lambda completed") diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch.py b/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch.py index 60892c31..3cd1bebd 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch.py @@ -1,13 +1,13 @@ import logging -import tempfile from typing import Any, Dict, Optional, Tuple, Union -import textract from opensearchpy import OpenSearch, RequestsHttpConnection from requests_aws4auth import AWS4Auth from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker +from .text_extraction import add_text_content + logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -24,7 +24,7 @@ def index_file_content_and_metadata_in_opensearch( Extracts file metadata from the database, adds the file content, and indexes it in OpenSearch. """ file_data = _fetch_file_data(file_id, database_url) - file_data_with_text_content = _add_text_content(file_data, file_stream) + file_data_with_text_content = add_text_content(file_data, file_stream) _index_in_opensearch( file_id, file_data_with_text_content, @@ -34,16 +34,6 @@ def index_file_content_and_metadata_in_opensearch( ) -def _add_text_content( - file_data: Dict[str, Any], file_stream: bytes -) -> Dict[str, Any]: - file_type = file_data["file_name"].split(".")[-1].lower() - new_file_data = file_data - if file_type in ["txt", "docx", "pdf"]: - new_file_data["content"] = extract_text(file_stream, file_type) - return new_file_data - - def _fetch_file_data( file_id: str, database_url: str ) -> Dict[str, Union[str, Dict[str, str]]]: @@ -111,16 +101,6 @@ def _fetch_file_data( return file_data -def extract_text(file_stream: bytes, file_extension: str) -> str: - with tempfile.NamedTemporaryFile( - suffix=f".{file_extension}", delete=True - ) as temp: - temp.write(file_stream) - temp.flush() - context = textract.process(temp.name) - return context.decode("utf-8") - - def _index_in_opensearch( file_id: str, document: Dict[str, Any], diff --git a/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch_from_aws.py b/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch_from_aws.py index 2a3193b3..5168bbeb 100644 --- a/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch_from_aws.py +++ b/data_management/opensearch_indexer/opensearch_indexer/index_file_content_and_metadata_in_opensearch_from_aws.py @@ -1,11 +1,11 @@ import json import logging from typing import Any, Dict -from urllib.parse import quote_plus import boto3 from requests_aws4auth import AWS4Auth +from .aws_helpers import _build_db_url, get_secret_data from .index_file_content_and_metadata_in_opensearch import ( index_file_content_and_metadata_in_opensearch, ) @@ -17,7 +17,7 @@ def index_file_content_and_metadata_in_opensearch_from_aws( bucket_name, object_key, secret_id ): - secret_string = _get_secret_data(secret_id) + secret_string = get_secret_data(secret_id) file_id = object_key.split("/")[-1] @@ -36,8 +36,6 @@ def index_file_content_and_metadata_in_opensearch_from_aws( open_search_http_auth, ) - -def _get_secret_data(secret_id: str) -> Dict[str, Any]: sm = boto3.client("secretsmanager") secret_response = sm.get_secret_value(SecretId=secret_id) return json.loads(secret_response["SecretString"]) @@ -66,11 +64,3 @@ def _get_opensearch_auth(secret_string: Dict[str, Any]) -> AWS4Auth: ) return open_search_http_auth - - -def _build_db_url(secret_string: Dict[str, Any]) -> str: - return ( - "postgresql+pg8000://" - f'{secret_string["DB_USER"]}:{quote_plus(secret_string["DB_PASSWORD"])}' - f'@{secret_string["DB_HOST"]}:{secret_string["DB_PORT"]}/{secret_string["DB_NAME"]}' - ) diff --git a/data_management/opensearch_indexer/opensearch_indexer/text_extraction.py b/data_management/opensearch_indexer/opensearch_indexer/text_extraction.py new file mode 100644 index 00000000..ea7effc7 --- /dev/null +++ b/data_management/opensearch_indexer/opensearch_indexer/text_extraction.py @@ -0,0 +1,42 @@ +import logging +import tempfile +from typing import Dict + +import textract + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def add_text_content(file: Dict, file_stream: bytes) -> Dict: + file_type = file["file_name"].split(".")[-1].lower() + + if file_type not in ["txt", "docx", "pdf"]: + logger.info( + f"Text extraction skipped for unsupported file type: {file_type}" + ) + file["content"] = "" + file["text_extraction_status"] = "n/a" + else: + try: + file["content"] = extract_text(file_stream, file_type) + logger.info(f"Text extraction succeeded for file {file['file_id']}") + file["text_extraction_status"] = "success" + except Exception as e: + logger.error( + f"Text extraction failed for file {file['file_id']}: {e}" + ) + file["content"] = "" + file["text_extraction_status"] = "failed" + + return file + + +def extract_text(file_stream: bytes, file_extension: str) -> str: + with tempfile.NamedTemporaryFile( + suffix=f".{file_extension}", delete=True + ) as temp: + temp.write(file_stream) + temp.flush() + context = textract.process(temp.name) + return context.decode("utf-8") diff --git a/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py new file mode 100644 index 00000000..40ca4709 --- /dev/null +++ b/data_management/opensearch_indexer/tests/test_bulk_index_files_in_opensearch.py @@ -0,0 +1,137 @@ +from unittest import mock + +from opensearch_indexer.index_consignment.bulk_index_consignment import ( + bulk_index_files_in_opensearch, +) +from opensearchpy import RequestsHttpConnection + + +@mock.patch( + "opensearch_indexer.index_consignment.bulk_index_consignment.OpenSearch" +) +def test_index_file_content_and_metadata_in_opensearch(mock_open_search): + """ + Given: + - A file stream representing a text file. + - An SQLite database mimicking the file data. + - OpenSearch connection details. + + When: + - The index_file_content_and_metadata_in_opensearch function is invoked. + + Then: + - The relevant file data is fetched from the database. + - The file's text content is extracted using real extract_text. + - The file is indexed in OpenSearch with the extracted text. + """ + open_search_host_url = "test_open_search_host_url" + open_search_http_auth = mock.Mock() + + documents = [ + { + "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "document": { + "file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", + "file_name": "path0", + "file_reference": "ZD8MCK", + "file_path": "data/E2E_tests/original/path0", + "citeable_reference": "MOCK1 123/ZD8MCK", + "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", + "series_name": "MOCK1 123", + "transferring_body": "Mock 1 Department", + "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", + "transferring_body_description": "Mock 1 Department", + "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", + "consignment_reference": "TDR-2024-KKX4", + "file_type": "File", + "file_size": "1024", + "rights_copyright": "Crown Copyright", + "legal_status": "Public Record(s)", + "held_by": "The National Archives, Kew", + "date_last_modified": "2024-03-05T15:05:31", + "closure_type": "Open", + "title_closed": "false", + "description_closed": "false", + "language": "English", + "content": "", + "text_extraction_status": "n/a", + }, + }, + { + "file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", + "document": { + "file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", + "file_name": "path2", + "file_reference": "ZD8MCN", + "file_path": "data/E2E_tests/original/path2", + "citeable_reference": "MOCK1 123/ZD8MCN", + "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", + "series_name": "MOCK1 123", + "transferring_body": "Mock 1 Department", + "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", + "transferring_body_description": "Mock 1 Department", + "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", + "consignment_reference": "TDR-2024-KKX4", + "file_type": "File", + "file_size": "1024", + "rights_copyright": "Crown Copyright", + "legal_status": "Public Record(s)", + "held_by": "The National Archives, Kew", + "date_last_modified": "2024-03-05T15:05:31", + "closure_type": "Open", + "title_closed": "false", + "description_closed": "false", + "language": "English", + "content": "", + "text_extraction_status": "n/a", + }, + }, + { + "file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", + "document": { + "file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", + "file_name": "path1", + "file_reference": "ZD8MCL", + "file_path": "data/E2E_tests/original/path1", + "citeable_reference": "MOCK1 123/ZD8MCL", + "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", + "series_name": "MOCK1 123", + "transferring_body": "Mock 1 Department", + "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", + "transferring_body_description": "Mock 1 Department", + "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", + "consignment_reference": "TDR-2024-KKX4", + "file_type": "File", + "file_size": "1024", + "rights_copyright": "Crown Copyright", + "legal_status": "Public Record(s)", + "held_by": "The National Archives, Kew", + "date_last_modified": "2024-03-05T15:05:31", + "closure_type": "Open", + "title_closed": "false", + "description_closed": "false", + "language": "English", + "content": "", + "text_extraction_status": "n/a", + }, + }, + ] + + bulk_index_files_in_opensearch( + documents, + open_search_host_url, + open_search_http_auth, + ) + + mock_open_search.assert_called_once_with( + open_search_host_url, + http_auth=open_search_http_auth, + use_ssl=True, + verify_certs=True, + ca_certs=None, + connection_class=RequestsHttpConnection, + ) + mock_open_search.return_value.bulk.assert_called_once_with( + index="documents", + body='{"index": {"_index": "documents", "_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f"}}\n{"file_id": "8ffacc5a-443a-4568-a5c9-c9741955b40f", "file_name": "path0", "file_reference": "ZD8MCK", "file_path": "data/E2E_tests/original/path0", "citeable_reference": "MOCK1 123/ZD8MCK", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n{"index": {"_index": "documents", "_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f"}}\n{"file_id": "a948a34f-6ba0-4ff2-bef6-a290aec31d3f", "file_name": "path2", "file_reference": "ZD8MCN", "file_path": "data/E2E_tests/original/path2", "citeable_reference": "MOCK1 123/ZD8MCN", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n{"index": {"_index": "documents", "_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270"}}\n{"file_id": "47526ba9-88e5-4cc8-8bc1-d682a10fa270", "file_name": "path1", "file_reference": "ZD8MCL", "file_path": "data/E2E_tests/original/path1", "citeable_reference": "MOCK1 123/ZD8MCL", "series_id": "8bd7ad22-90d1-4c7f-ae00-645dfd1987cc", "series_name": "MOCK1 123", "transferring_body": "Mock 1 Department", "transferring_body_id": "8ccc8cd1-c0ee-431d-afad-70cf404ba337", "transferring_body_description": "Mock 1 Department", "consignment_id": "2fd4e03e-5913-4c04-b4f2-5a823fafd430", "consignment_reference": "TDR-2024-KKX4", "file_type": "File", "file_size": "1024", "rights_copyright": "Crown Copyright", "legal_status": "Public Record(s)", "held_by": "The National Archives, Kew", "date_last_modified": "2024-03-05T15:05:31", "closure_type": "Open", "title_closed": "false", "description_closed": "false", "language": "English", "content": "", "text_extraction_status": "n/a"}\n', # noqa: E501 + ) diff --git a/data_management/opensearch_indexer/tests/test_process_and_index_file.py b/data_management/opensearch_indexer/tests/test_process_and_index_file.py index 57852c92..f835afbe 100644 --- a/data_management/opensearch_indexer/tests/test_process_and_index_file.py +++ b/data_management/opensearch_indexer/tests/test_process_and_index_file.py @@ -1,11 +1,9 @@ import tempfile -from pathlib import Path from unittest import mock from uuid import uuid4 import pytest from opensearch_indexer.index_file_content_and_metadata_in_opensearch import ( - extract_text, index_file_content_and_metadata_in_opensearch, ) from opensearchpy import RequestsHttpConnection @@ -197,42 +195,6 @@ def test_index_file_content_and_metadata_in_opensearch( "Key1": "Value1", "Key2": "Value2", "content": "Text stream", + "text_extraction_status": "success", }, ) - - -class TestExtractText: - def test_txt_file(self): - pdf_path = Path(__file__).parent / "multiline.txt" - with open(pdf_path, "rb") as file: - file_stream = file.read() - file_type = "txt" - assert ( - extract_text(file_stream, file_type) - == "This is line 1\nThis is line 2\nThis is line 3\nThis is line 4, the final line.\n" - ) - - def test_docx_file(self): - path = Path(__file__).parent / "multiline.docx" - with open(path, "rb") as file: - file_stream = file.read() - file_type = "docx" - - assert extract_text(file_stream, file_type) == ( - "This is line 1\n\n\t\t\t\t\t\t\t\t\t\t\t\t\t" - "This is line 2\n\n\t\t\t\t\t\t\t\t\t\t\t\t\t" - "This is line 3\n\n" - "This is line 4, the final line." - ) - - def test_pdf_file(self): - path = Path(__file__).parent / "multiline.pdf" - with open(path, "rb") as file: - file_stream = file.read() - - file_type = "pdf" - - assert ( - extract_text(file_stream, file_type) - == "This is line 1\nThis is line 2\nThis is line 3\nThis is line 4, the final line.\n\n\x0c" - ) diff --git a/data_management/opensearch_indexer/tests/test_text_extraction.py b/data_management/opensearch_indexer/tests/test_text_extraction.py new file mode 100644 index 00000000..be9df8de --- /dev/null +++ b/data_management/opensearch_indexer/tests/test_text_extraction.py @@ -0,0 +1,40 @@ +from pathlib import Path + +from opensearch_indexer.text_extraction import extract_text + + +class TestExtractText: + def test_txt_file(self): + pdf_path = Path(__file__).parent / "multiline.txt" + with open(pdf_path, "rb") as file: + file_stream = file.read() + file_type = "txt" + assert ( + extract_text(file_stream, file_type) + == "This is line 1\nThis is line 2\nThis is line 3\nThis is line 4, the final line.\n" + ) + + def test_docx_file(self): + path = Path(__file__).parent / "multiline.docx" + with open(path, "rb") as file: + file_stream = file.read() + file_type = "docx" + + assert extract_text(file_stream, file_type) == ( + "This is line 1\n\n\t\t\t\t\t\t\t\t\t\t\t\t\t" + "This is line 2\n\n\t\t\t\t\t\t\t\t\t\t\t\t\t" + "This is line 3\n\n" + "This is line 4, the final line." + ) + + def test_pdf_file(self): + path = Path(__file__).parent / "multiline.pdf" + with open(path, "rb") as file: + file_stream = file.read() + + file_type = "pdf" + + assert ( + extract_text(file_stream, file_type) + == "This is line 1\nThis is line 2\nThis is line 3\nThis is line 4, the final line.\n\n\x0c" + )