diff --git a/Makefile b/Makefile index 8945341..d43a8b6 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,9 @@ send-message-v1: send-message-v2: @awslocal sns publish --topic-arn arn:aws:sns:us-east-1:000000000000:judgments --message file://aws_examples/sns/parsed-judgment-v2.json +send-message-s3: + @awslocal s3 cp aws_examples/s3/te-editorial-out-int/test3.tar.gz s3://inbound-bucket/QX/e31b117f-ff09-49b6-a697-7952c7a67384/QX.tar.gz + delete-document: @curl --anyauth --user admin:admin -X DELETE -i http://localhost:8000/v1/documents\?database\=Judgments\&uri\=/ewca/civ/2022/111.xml diff --git a/aws_examples/s3/te-editorial-out-int/test3.tar.gz b/aws_examples/s3/te-editorial-out-int/test3.tar.gz new file mode 100644 index 0000000..1499c03 Binary files /dev/null and b/aws_examples/s3/te-editorial-out-int/test3.tar.gz differ diff --git a/ds-caselaw-ingester/lambda_function.py b/ds-caselaw-ingester/lambda_function.py index d5c8639..51c84a1 100644 --- a/ds-caselaw-ingester/lambda_function.py +++ b/ds-caselaw-ingester/lambda_function.py @@ -44,7 +44,9 @@ def from_event(cls, event): @classmethod def from_message(cls, message): - if "parameters" in message.keys(): + if message.get("Records", [{}])[0].get("eventSource") == "aws:s3": + return S3Message(message["Records"][0]) + elif "parameters" in message.keys(): return V2Message(message) else: return V1Message(message) @@ -52,6 +54,10 @@ def from_message(cls, message): def __init__(self, message): self.message = message + def update_consignment_reference(self, new_ref): + """In most cases we trust we already have the correct consignment reference""" + return + class V1Message(Message): def is_v1(self): @@ -105,9 +111,7 @@ def get_consignment_reference(self): if result: return result - raise InvalidMessageException( - "Malformed v2 message, please supply a consignment-reference or s3-folder-url" - ) + raise InvalidMessageException("Malformed v2 message, please supply a reference") def save_s3_response(self, sqs_client, s3_client): s3_bucket = self.message.get("parameters", {}).get("s3Bucket") @@ -121,6 +125,35 @@ def save_s3_response(self, sqs_client, s3_client): return filename +class S3Message(V2Message): + """An SNS message generated directly by adding a file to an S3 bucket""" + + def __init__(self, *args, **kwargs): + self._consignment = None + super().__init__(*args, **kwargs) + + def get_consignment_reference(self): + # We use the filename as a first draft of the consignment reference, + # but later update it with the value from the tar gz + if self._consignment: + return self._consignment + return self.message["s3"]["object"]["key"].split("/")[-1].partition(".")[0] + + def update_consignment_reference(self, new_ref): + self._consignment = new_ref + + def save_s3_response(self, sqs_client, s3_client): + s3_key = self.message["s3"]["object"]["key"] + s3_bucket = self.message["s3"]["bucket"]["name"] + reference = self.get_consignment_reference() + filename = os.path.join("/tmp", f"{reference}.tar.gz") + s3_client.download_file(s3_bucket, s3_key, filename) + if not os.path.exists(filename): + raise RuntimeError(f"File {filename} not created") + print(f"tar.gz saved locally as {filename}") + return filename + + class ReportableException(Exception): def __init__(self, *args, **kwargs): rollbar.report_message("Something happened!", "warning", str(self)) @@ -205,7 +238,7 @@ def extract_docx_filename(metadata: dict, consignment_reference: str) -> str: return metadata["parameters"]["TRE"]["payload"]["filename"] except KeyError: raise DocxFilenameNotFoundException( - f"No .docx filename was found in metadata. Consignment Ref: {consignment_reference}" + f"No .docx filename was found in metadata. Consignment Ref: {consignment_reference}, metadata: {metadata}" ) @@ -317,7 +350,9 @@ def copy_file(tarfile, input_filename, output_filename, uri, s3_client: Session. file = tarfile.extractfile(input_filename) store_file(file, uri, output_filename, s3_client) except KeyError: - raise FileNotFoundException(f"File was not found: {input_filename}") + raise FileNotFoundException( + f"File was not found: {input_filename}, files were {tarfile.getnames()} " + ) def send_retry_message( @@ -477,10 +512,8 @@ def handler(event, context): tar = tarfile.open(filename, mode="r") metadata = extract_metadata(tar, consignment_reference) - - if not message.is_v1(): - # this is just for debug purposes, it should be safely removable - store_file(open(filename, mode="rb"), "v2-debug", "debug.tar.gz", s3_client) + message.update_consignment_reference(metadata["parameters"]["TRE"]["reference"]) + consignment_reference = message.get_consignment_reference() # Extract and parse the judgment XML xml_file_name = metadata["parameters"]["TRE"]["payload"]["xml"] @@ -544,11 +577,17 @@ def handler(event, context): s3_client, ) - if api_client.get_published(uri): + force_publish = ( + metadata.get("parameters", {}) + .get("INGESTER_OPTIONS", {}) + .get("auto_publish", False) + ) + if force_publish is True: + print(f"auto_publishing {consignment_reference}") + if api_client.get_published(uri) or force_publish: update_published_documents(uri, s3_client) tar.close() print("Ingestion complete") - return message.message diff --git a/ds-caselaw-ingester/tests.py b/ds-caselaw-ingester/tests.py index e03c3b1..9d4342d 100644 --- a/ds-caselaw-ingester/tests.py +++ b/ds-caselaw-ingester/tests.py @@ -44,7 +44,23 @@ } """ +s3_message = { + "Records": [ + { + "eventSource": "aws:s3", + "s3": { + "bucket": { + "name": "staging-tre-court-document-pack-out", + }, + "object": { + "key": "QX/e31b117f-ff09-49b6-a697-7952c7a67384/FCL-12345.tar.gz", + }, + }, + } + ] +} v2_message = json.loads(v2_message_raw) +s3_message_raw = json.dumps(s3_message) def create_fake_tdr_file(*args, **kwargs): @@ -82,6 +98,8 @@ def test_handler_messages_v1( assert "Upload Successful" in log assert "Ingestion complete" in log + assert "auto_publish" not in log + @patch("lambda_function.api_client", autospec=True) @patch("lambda_function.extract_metadata", autospec=True) @patch("lambda_function.tarfile") @@ -101,7 +119,14 @@ def test_handler_messages_v2( ) metadata.return_value = { "parameters": { - "TRE": {"payload": {"xml": "", "filename": "temp.docx", "images": []}}, + "TRE": { + "reference": "TDR-2020-FAR", + "payload": { + "xml": "", + "filename": "temp.docx", + "images": [], + }, + }, "TDR": { "Source-Organization": "", "Contact-Name": "", @@ -113,15 +138,6 @@ def test_handler_messages_v2( } } - # metadata.return_value = { - # "uri": "https://caselaw.nationalarchives.gov.uk/id/eat/2022/1", - # "court": "EAT", - # "cite": "[2022] EAT 1", - # "date": "2021-09-28", - # "name": "SECRETARY OF STATE FOR JUSTICE v MR ALAN JOHNSON", - # "attachments": [], - # } - message = v2_message_raw event = {"Records": [{"Sns": {"Message": message}}]} lambda_function.handler(event=event, context=None) @@ -134,6 +150,60 @@ def test_handler_messages_v2( assert "Updated judgment xml" in log assert "Upload Successful" in log assert "Ingestion complete" in log + assert "auto_publish" not in log + + @patch("lambda_function.api_client", autospec=True) + @patch("lambda_function.extract_metadata", autospec=True) + @patch("lambda_function.tarfile") + @patch("lambda_function.boto3.session.Session") + @patch("lambda_function.urllib3.PoolManager") + def test_handler_messages_s3( + self, urllib_pool, boto_session, tarfile, metadata, apiclient, capsys + ): + """Test that, with appropriate stubs, an S3 message passes through the parsing process""" + urllib_pool.return_value.request.return_value.status = 200 + urllib_pool.return_value.request.return_value.data = b"data" + tarfile.open.return_value.getmembers().return_value.name.extractfile.return_value = ( + b"3" + ) + boto_session.return_value.client.return_value.download_file = ( + create_fake_tdr_file + ) + metadata.return_value = { + "parameters": { + "TRE": { + "reference": "TDR-2020-FAR", + "payload": { + "xml": "", + "filename": "temp.docx", + "images": [], + }, + }, + "TDR": { + "Source-Organization": "", + "Contact-Name": "", + "Contact-Email": "", + "Internal-Sender-Identifier": "", + "Consignment-Completed-Datetime": "", + }, + "INGESTER_OPTIONS": {"auto_publish": True}, + "PARSER": {"uri": ""}, + } + } + + message = s3_message_raw + event = {"Records": [{"Sns": {"Message": message}}]} + lambda_function.handler(event=event, context=None) + + log = capsys.readouterr().out + assert "Ingester Start: Consignment reference FCL-12345" in log + assert "v1: False" in log + assert "tar.gz saved locally as /tmp/FCL-12345.tar.gz" in log + assert "Ingesting document" in log + assert "Updated judgment xml" in log + assert "Upload Successful" in log + assert "Ingestion complete" in log + assert "auto_publish" in log class TestLambda: diff --git a/scripts/inbound-s3-sns.json b/scripts/inbound-s3-sns.json new file mode 100644 index 0000000..b72b7fd --- /dev/null +++ b/scripts/inbound-s3-sns.json @@ -0,0 +1,8 @@ +{ + "TopicConfigurations": [ + { + "TopicArn": "arn:aws:sns:us-east-1:000000000000:judgments", + "Events": ["s3:ObjectCreated:*"] + } + ] +} diff --git a/scripts/setup-localstack.sh b/scripts/setup-localstack.sh index 0dd0322..5ba1e99 100755 --- a/scripts/setup-localstack.sh +++ b/scripts/setup-localstack.sh @@ -9,8 +9,9 @@ awslocal lambda create-function \ --zip-file fileb://dist/lambda.zip \ --handler ds-caselaw-ingester/lambda_function.handler \ --runtime python3.11 \ - --environment "Variables={MARKLOGIC_HOST=$MARKLOGIC_HOST,MARKLOGIC_USER=$MARKLOGIC_USER,MARKLOGIC_PASSWORD=$MARKLOGIC_PASSWORD,AWS_BUCKET_NAME=$AWS_BUCKET_NAME,AWS_SECRET_KEY=$AWS_SECRET_KEY,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_ENDPOINT_URL=$AWS_ENDPOINT_URL,SQS_QUEUE_URL=$SQS_QUEUE_URL,ROLLBAR_TOKEN=$ROLLBAR_TOKEN,ROLLBAR_ENV=$ROLLBAR_ENV,NOTIFY_API_KEY=$NOTIFY_API_KEY,NOTIFY_EDITORIAL_ADDRESS=$NOTIFY_EDITORIAL_ADDRESS,NOTIFY_NEW_JUDGMENT_TEMPLATE_ID=$NOTIFY_NEW_JUDGMENT_TEMPLATE_ID,EDITORIAL_UI_BASE_URL=$EDITORIAL_UI_BASE_URL,PUBLIC_ASSET_BUCKET=$PUBLIC_ASSET_BUCKET}" \ + --environment "Variables={MARKLOGIC_HOST=$MARKLOGIC_HOST,MARKLOGIC_USER=$MARKLOGIC_USER,MARKLOGIC_PASSWORD=$MARKLOGIC_PASSWORD,AWS_BUCKET_NAME=$AWS_BUCKET_NAME,AWS_SECRET_KEY=$AWS_SECRET_KEY,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_ENDPOINT_URL=$AWS_ENDPOINT_URL,SQS_QUEUE_URL=$SQS_QUEUE_URL,ROLLBAR_TOKEN=$ROLLBAR_TOKEN,ROLLBAR_ENV=$ROLLBAR_ENV,NOTIFY_API_KEY=$NOTIFY_API_KEY,NOTIFY_EDITORIAL_ADDRESS=$NOTIFY_EDITORIAL_ADDRESS,NOTIFY_NEW_JUDGMENT_TEMPLATE_ID=$NOTIFY_NEW_JUDGMENT_TEMPLATE_ID,EDITORIAL_UI_BASE_URL=$EDITORIAL_UI_BASE_URL,PUBLIC_ASSET_BUCKET=$PUBLIC_ASSET_BUCKET,LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT=500}" \ --role arn:aws:iam::000000000000:role/lambda-role \ + --timeout 500 awslocal sns create-topic \ --name judgments \ @@ -30,6 +31,25 @@ awslocal s3api create-bucket \ awslocal s3api create-bucket \ --bucket public-asset-bucket + + +awslocal sns create-topic \ + --name inbound-sns \ + --attributes consignment-reference=string,s3-folder-url=string,consignment-type=string,number-of-retries=number + +awslocal s3api create-bucket \ + --bucket inbound-bucket + +awslocal s3api put-bucket-notification-configuration \ + --bucket inbound-bucket \ + --notification-configuration file://scripts/inbound-s3-sns.json + +awslocal sns subscribe --protocol lambda \ +--region us-east-1 \ +--topic-arn arn:aws:sns:us-east-1:000000000000:inbound-sns \ +--notification-endpoint arn:aws:lambda:us-east-1:000000000000:function:te-lambda + + if [ -n "$1" ]; then awslocal s3 cp $1 s3://te-editorial-out-int else