generated from nationalarchives/da-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create consignment indexing lambda for opensearch
- Loading branch information
1 parent
e0a5f5e
commit 5ebf3ad
Showing
10 changed files
with
586 additions
and
74 deletions.
There are no files selected for viewing
49 changes: 49 additions & 0 deletions
49
data_management/opensearch_indexer/opensearch_indexer/aws_helpers.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import json | ||
import logging | ||
from typing import Any, Dict | ||
from urllib.parse import quote_plus | ||
|
||
import boto3 | ||
from requests_aws4auth import AWS4Auth | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
def get_s3_file(bucket_name: str, object_key: str) -> bytes: | ||
s3 = boto3.client("s3") | ||
s3_file_object = s3.get_object(Bucket=bucket_name, Key=object_key) | ||
return s3_file_object["Body"].read() | ||
|
||
|
||
def get_secret_data(secret_id: str) -> Dict[str, Any]: | ||
sm = boto3.client("secretsmanager") | ||
secret_response = sm.get_secret_value(SecretId=secret_id) | ||
return json.loads(secret_response["SecretString"]) | ||
|
||
|
||
def _build_db_url(secret_string: Dict[str, Any]) -> str: | ||
return ( | ||
"postgresql+pg8000://" | ||
f'{secret_string["DB_USER"]}:{quote_plus(secret_string["DB_PASSWORD"])}' | ||
f'@{secret_string["DB_HOST"]}:{secret_string["DB_PORT"]}/{secret_string["DB_NAME"]}' | ||
) | ||
|
||
|
||
def _get_opensearch_auth(secret_string: Dict[str, Any]) -> AWS4Auth: | ||
sts_client = boto3.client("sts") | ||
assumed_role = sts_client.assume_role( | ||
RoleArn=secret_string["OPEN_SEARCH_MASTER_ROLE_ARN"], | ||
RoleSessionName="LambdaOpenSearchSession", | ||
) | ||
logger.info("Extract temporary credentials to access OpenSearch") | ||
credentials = assumed_role["Credentials"] | ||
open_search_http_auth = AWS4Auth( | ||
credentials["AccessKeyId"], | ||
credentials["SecretAccessKey"], | ||
secret_string["AWS_REGION"], | ||
"es", | ||
session_token=credentials["SessionToken"], | ||
) | ||
|
||
return open_search_http_auth |
Empty file.
259 changes: 259 additions & 0 deletions
259
...agement/opensearch_indexer/opensearch_indexer/index_consignment/bulk_index_consignment.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
import json | ||
import logging | ||
from typing import Dict, List, Optional, Tuple, Union | ||
|
||
import pg8000 | ||
from opensearchpy import OpenSearch, RequestsHttpConnection | ||
from requests_aws4auth import AWS4Auth | ||
from sqlalchemy import create_engine, text | ||
from sqlalchemy.orm import declarative_base, sessionmaker | ||
|
||
from ..aws_helpers import ( | ||
_build_db_url, | ||
_get_opensearch_auth, | ||
get_s3_file, | ||
get_secret_data, | ||
) | ||
from ..text_extraction import add_text_content | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
def bulk_index_consignment_from_aws( | ||
consignment_reference: str, bucket_name: str, secret_id: str | ||
) -> None: | ||
""" | ||
Retrieve credentials and host information from AWS Secrets Manager, fetch consignment data, | ||
and index it in OpenSearch. | ||
Args: | ||
consignment_reference (str): The reference identifier for the consignment. | ||
bucket_name (str): The name of the S3 bucket containing file records. | ||
secret_id (str): The ID of the AWS secret storing database and OpenSearch credentials. | ||
Returns: | ||
None | ||
""" | ||
secret_string = get_secret_data(secret_id) | ||
database_url = _build_db_url(secret_string) | ||
open_search_host_url = secret_string["OPEN_SEARCH_HOST"] | ||
open_search_http_auth = _get_opensearch_auth(secret_string) | ||
|
||
bulk_index_consignment( | ||
consignment_reference, | ||
bucket_name, | ||
database_url, | ||
open_search_host_url, | ||
open_search_http_auth, | ||
) | ||
|
||
|
||
def bulk_index_consignment( | ||
consignment_reference: str, | ||
bucket_name: str, | ||
database_url: str, | ||
open_search_host_url: str, | ||
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]], | ||
open_search_ca_certs: Optional[str] = None, | ||
) -> None: | ||
""" | ||
Fetch files associated with a consignment and index them in OpenSearch. | ||
Args: | ||
consignment_reference (str): The consignment reference identifier. | ||
bucket_name (str): The S3 bucket name containing files. | ||
database_url (str): The connection string for the PostgreSQL database. | ||
open_search_host_url (str): The host URL of the OpenSearch cluster. | ||
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials for OpenSearch. | ||
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification. | ||
Returns: | ||
None | ||
""" | ||
files = _fetch_files_in_consignment(consignment_reference, database_url) | ||
documents_to_index = _construct_documents(files, bucket_name) | ||
bulk_index_files_in_opensearch( | ||
documents_to_index, | ||
open_search_host_url, | ||
open_search_http_auth, | ||
open_search_ca_certs, | ||
) | ||
|
||
|
||
def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]: | ||
""" | ||
Construct a list of documents to be indexed in OpenSearch from file metadata. | ||
Args: | ||
files (List[Dict]): The list of file metadata dictionaries. | ||
bucket_name (str): The S3 bucket name where the files are stored. | ||
Returns: | ||
List[Dict]: A list of documents ready for indexing. | ||
""" | ||
documents_to_index = [] | ||
for file in files: | ||
object_key = file["consignment_reference"] + "/" + str(file["file_id"]) | ||
|
||
logger.info(f"Processing file: {object_key}") | ||
|
||
file_stream = None | ||
document = file | ||
|
||
try: | ||
file_stream = get_s3_file(bucket_name, object_key) | ||
except Exception as e: | ||
logger.error(f"Failed to obtain file {object_key}: {e}") | ||
|
||
document = add_text_content(file, file_stream) | ||
|
||
documents_to_index.append( | ||
{"file_id": file["file_id"], "document": document} | ||
) | ||
return documents_to_index | ||
|
||
|
||
def _fetch_files_in_consignment( | ||
consignment_reference: str, database_url: str | ||
) -> List[Dict]: | ||
""" | ||
Fetch file metadata associated with the given consignment reference. | ||
Args: | ||
consignment_reference (str): The consignment reference identifier. | ||
database_url (str): The connection string for the PostgreSQL database. | ||
Returns: | ||
List[Dict]: A list of file metadata dictionaries. | ||
""" | ||
engine = create_engine(database_url) | ||
Base = declarative_base() | ||
Base.metadata.reflect(bind=engine) | ||
Session = sessionmaker(bind=engine) | ||
session = Session() | ||
|
||
query = """ | ||
SELECT | ||
f."FileId" AS file_id, | ||
f."FileName" AS file_name, | ||
f."FileReference" AS file_reference, | ||
f."FilePath" AS file_path, | ||
f."CiteableReference" AS citeable_reference, | ||
s."SeriesId" AS series_id, | ||
s."Name" AS series_name, | ||
b."Name" AS transferring_body, | ||
b."BodyId" AS transferring_body_id, | ||
b."Description" AS transferring_body_description, | ||
c."ConsignmentId" AS consignment_id, | ||
c."ConsignmentReference" AS consignment_reference, | ||
fm."PropertyName", | ||
fm."Value" | ||
FROM | ||
"File" f | ||
JOIN | ||
"Consignment" c ON f."ConsignmentId" = c."ConsignmentId" | ||
JOIN | ||
"Series" s ON c."SeriesId" = s."SeriesId" | ||
JOIN | ||
"Body" b ON s."BodyId" = b."BodyId" | ||
LEFT JOIN | ||
"FileMetadata" fm ON f."FileId" = fm."FileId" | ||
WHERE | ||
c."ConsignmentReference" = :consignment_reference | ||
AND f."FileType" = 'File'; | ||
""" | ||
try: | ||
result = session.execute( | ||
text(query), {"consignment_reference": consignment_reference} | ||
).fetchall() | ||
except pg8000.Error as e: | ||
raise Exception(f"Database query failed: {e}") | ||
finally: | ||
session.close() | ||
|
||
# Process query results | ||
files_data = {} | ||
|
||
for row in result: | ||
file_id = str(row.file_id) | ||
if file_id not in files_data: | ||
files_data[file_id] = { | ||
"file_id": str(row.file_id), | ||
"file_name": str(row.file_name), | ||
"file_reference": str(row.file_reference), | ||
"file_path": str(row.file_path), | ||
"citeable_reference": str(row.citeable_reference), | ||
"series_id": str(row.series_id), | ||
"series_name": str(row.series_name), | ||
"transferring_body": str(row.transferring_body), | ||
"transferring_body_id": str(row.transferring_body_id), | ||
"transferring_body_description": str( | ||
row.transferring_body_description | ||
), | ||
"consignment_id": str(row.consignment_id), | ||
"consignment_reference": str(row.consignment_reference), | ||
} | ||
|
||
if row.PropertyName: | ||
files_data[file_id][row.PropertyName] = str(row.Value) | ||
|
||
return list(files_data.values()) | ||
|
||
|
||
def bulk_index_files_in_opensearch( | ||
documents: List[Dict[str, Union[str, Dict]]], | ||
open_search_host_url: str, | ||
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]], | ||
open_search_ca_certs: Optional[str] = None, | ||
) -> None: | ||
""" | ||
Perform bulk indexing of documents in OpenSearch. | ||
Args: | ||
documents (List[Dict[str, Union[str, Dict]]]): The documents to index. | ||
open_search_host_url (str): The OpenSearch cluster URL. | ||
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials. | ||
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification. | ||
Returns: | ||
None | ||
""" | ||
bulk_data = [] | ||
for doc in documents: | ||
bulk_data.append( | ||
json.dumps( | ||
{"index": {"_index": "documents", "_id": doc["file_id"]}} | ||
) | ||
) | ||
bulk_data.append(json.dumps(doc["document"])) | ||
|
||
bulk_payload = "\n".join(bulk_data) + "\n" | ||
|
||
open_search = OpenSearch( | ||
open_search_host_url, | ||
http_auth=open_search_http_auth, | ||
use_ssl=True, | ||
verify_certs=True, | ||
ca_certs=open_search_ca_certs, | ||
connection_class=RequestsHttpConnection, | ||
) | ||
|
||
opensearch_index = "documents" | ||
|
||
try: | ||
response = open_search.bulk(index=opensearch_index, body=bulk_payload) | ||
logger.info("Opensearch bulk command executed") | ||
logger.info(response) | ||
|
||
if response["errors"]: | ||
logger.error("Errors occurred during bulk indexing") | ||
for item in response["items"]: | ||
if "error" in item.get("index", {}): | ||
logger.error( | ||
f"Error for document ID {item['index']['_id']}: {item['index']['error']}" | ||
) | ||
else: | ||
logger.info("Bulk indexing completed successfully") | ||
except Exception as e: | ||
logger.error(f"Bulk indexing failed: {e}") |
53 changes: 53 additions & 0 deletions
53
data_management/opensearch_indexer/opensearch_indexer/index_consignment/lambda_function.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import logging | ||
import os | ||
from typing import Any, Dict | ||
|
||
from .bulk_index_consignment import bulk_index_files_in_opensearch_from_aws | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
def lambda_handler(event: Dict[str, Any], context: Any) -> None: | ||
""" | ||
AWS Lambda handler function to trigger the indexing of consignment files into OpenSearch. | ||
This function is invoked by an AWS event containing details of a consignment. It retrieves | ||
the necessary parameters from the event and environment variables, then calls the indexing | ||
function to process and index the files into OpenSearch. | ||
Args: | ||
event (Dict[str, Any]): The event data triggering the Lambda function. Expected to contain: | ||
- `detail` (Dict[str, Any]): A dictionary with `parameters` that includes: | ||
- `reference` (str): The consignment reference identifier. | ||
context (Any): AWS Lambda context object (not used in this function). | ||
Environment Variables: | ||
BUCKET_NAME (str): The name of the S3 bucket where the files are stored. | ||
SECRET_ID (str): The identifier for the AWS Secrets Manager secret containing database | ||
and OpenSearch credentials. | ||
Raises: | ||
Exception: If `consignment_reference`, `BUCKET_NAME`, or `SECRET_ID` are missing. | ||
""" | ||
logger.info("Lambda started") | ||
logger.info("Event received: %s", event) | ||
|
||
# Extract parameters from the event and environment variables | ||
detail = event.get("detail", {}) | ||
consignment_reference = detail.get("parameters", {}).get("reference") | ||
bucket_name = os.getenv("BUCKET_NAME") | ||
secret_id = os.getenv("SECRET_ID") | ||
|
||
# Validate required parameters | ||
if not consignment_reference or not bucket_name or not secret_id: | ||
error_message = "Missing consignment_reference, BUCKET_NAME, or SECRET_ID required for indexing" | ||
logger.error(error_message) | ||
raise Exception(error_message) | ||
|
||
# Log and process the consignment reference | ||
logger.info(f"Processing consignment reference: {consignment_reference}") | ||
bulk_index_files_in_opensearch_from_aws( | ||
consignment_reference, bucket_name, secret_id | ||
) | ||
logger.info("Lambda completed") |
Oops, something went wrong.