Skip to content

Commit

Permalink
Merge pull request #121 from nationalarchives/ingest-backlog
Browse files Browse the repository at this point in the history
Add S3 support to ingester to be able to ingest the backlog
  • Loading branch information
dragon-dxw authored Nov 3, 2023
2 parents efd9b9a + f98986e commit daa236e
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 23 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ send-message-v1:
send-message-v2:
@awslocal sns publish --topic-arn arn:aws:sns:us-east-1:000000000000:judgments --message file://aws_examples/sns/parsed-judgment-v2.json

send-message-s3:
@awslocal s3 cp aws_examples/s3/te-editorial-out-int/test3.tar.gz s3://inbound-bucket/QX/e31b117f-ff09-49b6-a697-7952c7a67384/QX.tar.gz


delete-document:
@curl --anyauth --user admin:admin -X DELETE -i http://localhost:8000/v1/documents\?database\=Judgments\&uri\=/ewca/civ/2022/111.xml
Binary file added aws_examples/s3/te-editorial-out-int/test3.tar.gz
Binary file not shown.
63 changes: 51 additions & 12 deletions ds-caselaw-ingester/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,20 @@ def from_event(cls, event):

@classmethod
def from_message(cls, message):
if "parameters" in message.keys():
if message.get("Records", [{}])[0].get("eventSource") == "aws:s3":
return S3Message(message["Records"][0])
elif "parameters" in message.keys():
return V2Message(message)
else:
return V1Message(message)

def __init__(self, message):
self.message = message

def update_consignment_reference(self, new_ref):
"""In most cases we trust we already have the correct consignment reference"""
return


class V1Message(Message):
def is_v1(self):
Expand Down Expand Up @@ -105,9 +111,7 @@ def get_consignment_reference(self):
if result:
return result

raise InvalidMessageException(
"Malformed v2 message, please supply a consignment-reference or s3-folder-url"
)
raise InvalidMessageException("Malformed v2 message, please supply a reference")

def save_s3_response(self, sqs_client, s3_client):
s3_bucket = self.message.get("parameters", {}).get("s3Bucket")
Expand All @@ -121,6 +125,35 @@ def save_s3_response(self, sqs_client, s3_client):
return filename


class S3Message(V2Message):
"""An SNS message generated directly by adding a file to an S3 bucket"""

def __init__(self, *args, **kwargs):
self._consignment = None
super().__init__(*args, **kwargs)

def get_consignment_reference(self):
# We use the filename as a first draft of the consignment reference,
# but later update it with the value from the tar gz
if self._consignment:
return self._consignment
return self.message["s3"]["object"]["key"].split("/")[-1].partition(".")[0]

def update_consignment_reference(self, new_ref):
self._consignment = new_ref

def save_s3_response(self, sqs_client, s3_client):
s3_key = self.message["s3"]["object"]["key"]
s3_bucket = self.message["s3"]["bucket"]["name"]
reference = self.get_consignment_reference()
filename = os.path.join("/tmp", f"{reference}.tar.gz")
s3_client.download_file(s3_bucket, s3_key, filename)
if not os.path.exists(filename):
raise RuntimeError(f"File {filename} not created")
print(f"tar.gz saved locally as {filename}")
return filename


class ReportableException(Exception):
def __init__(self, *args, **kwargs):
rollbar.report_message("Something happened!", "warning", str(self))
Expand Down Expand Up @@ -205,7 +238,7 @@ def extract_docx_filename(metadata: dict, consignment_reference: str) -> str:
return metadata["parameters"]["TRE"]["payload"]["filename"]
except KeyError:
raise DocxFilenameNotFoundException(
f"No .docx filename was found in metadata. Consignment Ref: {consignment_reference}"
f"No .docx filename was found in metadata. Consignment Ref: {consignment_reference}, metadata: {metadata}"
)


Expand Down Expand Up @@ -317,7 +350,9 @@ def copy_file(tarfile, input_filename, output_filename, uri, s3_client: Session.
file = tarfile.extractfile(input_filename)
store_file(file, uri, output_filename, s3_client)
except KeyError:
raise FileNotFoundException(f"File was not found: {input_filename}")
raise FileNotFoundException(
f"File was not found: {input_filename}, files were {tarfile.getnames()} "
)


def send_retry_message(
Expand Down Expand Up @@ -477,10 +512,8 @@ def handler(event, context):

tar = tarfile.open(filename, mode="r")
metadata = extract_metadata(tar, consignment_reference)

if not message.is_v1():
# this is just for debug purposes, it should be safely removable
store_file(open(filename, mode="rb"), "v2-debug", "debug.tar.gz", s3_client)
message.update_consignment_reference(metadata["parameters"]["TRE"]["reference"])
consignment_reference = message.get_consignment_reference()

# Extract and parse the judgment XML
xml_file_name = metadata["parameters"]["TRE"]["payload"]["xml"]
Expand Down Expand Up @@ -544,11 +577,17 @@ def handler(event, context):
s3_client,
)

if api_client.get_published(uri):
force_publish = (
metadata.get("parameters", {})
.get("INGESTER_OPTIONS", {})
.get("auto_publish", False)
)
if force_publish is True:
print(f"auto_publishing {consignment_reference}")
if api_client.get_published(uri) or force_publish:
update_published_documents(uri, s3_client)

tar.close()

print("Ingestion complete")

return message.message
90 changes: 80 additions & 10 deletions ds-caselaw-ingester/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,23 @@
}
"""

s3_message = {
"Records": [
{
"eventSource": "aws:s3",
"s3": {
"bucket": {
"name": "staging-tre-court-document-pack-out",
},
"object": {
"key": "QX/e31b117f-ff09-49b6-a697-7952c7a67384/FCL-12345.tar.gz",
},
},
}
]
}
v2_message = json.loads(v2_message_raw)
s3_message_raw = json.dumps(s3_message)


def create_fake_tdr_file(*args, **kwargs):
Expand Down Expand Up @@ -82,6 +98,8 @@ def test_handler_messages_v1(
assert "Upload Successful" in log
assert "Ingestion complete" in log

assert "auto_publish" not in log

@patch("lambda_function.api_client", autospec=True)
@patch("lambda_function.extract_metadata", autospec=True)
@patch("lambda_function.tarfile")
Expand All @@ -101,7 +119,14 @@ def test_handler_messages_v2(
)
metadata.return_value = {
"parameters": {
"TRE": {"payload": {"xml": "", "filename": "temp.docx", "images": []}},
"TRE": {
"reference": "TDR-2020-FAR",
"payload": {
"xml": "",
"filename": "temp.docx",
"images": [],
},
},
"TDR": {
"Source-Organization": "",
"Contact-Name": "",
Expand All @@ -113,15 +138,6 @@ def test_handler_messages_v2(
}
}

# metadata.return_value = {
# "uri": "https://caselaw.nationalarchives.gov.uk/id/eat/2022/1",
# "court": "EAT",
# "cite": "[2022] EAT 1",
# "date": "2021-09-28",
# "name": "SECRETARY OF STATE FOR JUSTICE v MR ALAN JOHNSON",
# "attachments": [],
# }

message = v2_message_raw
event = {"Records": [{"Sns": {"Message": message}}]}
lambda_function.handler(event=event, context=None)
Expand All @@ -134,6 +150,60 @@ def test_handler_messages_v2(
assert "Updated judgment xml" in log
assert "Upload Successful" in log
assert "Ingestion complete" in log
assert "auto_publish" not in log

@patch("lambda_function.api_client", autospec=True)
@patch("lambda_function.extract_metadata", autospec=True)
@patch("lambda_function.tarfile")
@patch("lambda_function.boto3.session.Session")
@patch("lambda_function.urllib3.PoolManager")
def test_handler_messages_s3(
self, urllib_pool, boto_session, tarfile, metadata, apiclient, capsys
):
"""Test that, with appropriate stubs, an S3 message passes through the parsing process"""
urllib_pool.return_value.request.return_value.status = 200
urllib_pool.return_value.request.return_value.data = b"data"
tarfile.open.return_value.getmembers().return_value.name.extractfile.return_value = (
b"3"
)
boto_session.return_value.client.return_value.download_file = (
create_fake_tdr_file
)
metadata.return_value = {
"parameters": {
"TRE": {
"reference": "TDR-2020-FAR",
"payload": {
"xml": "",
"filename": "temp.docx",
"images": [],
},
},
"TDR": {
"Source-Organization": "",
"Contact-Name": "",
"Contact-Email": "",
"Internal-Sender-Identifier": "",
"Consignment-Completed-Datetime": "",
},
"INGESTER_OPTIONS": {"auto_publish": True},
"PARSER": {"uri": ""},
}
}

message = s3_message_raw
event = {"Records": [{"Sns": {"Message": message}}]}
lambda_function.handler(event=event, context=None)

log = capsys.readouterr().out
assert "Ingester Start: Consignment reference FCL-12345" in log
assert "v1: False" in log
assert "tar.gz saved locally as /tmp/FCL-12345.tar.gz" in log
assert "Ingesting document" in log
assert "Updated judgment xml" in log
assert "Upload Successful" in log
assert "Ingestion complete" in log
assert "auto_publish" in log


class TestLambda:
Expand Down
8 changes: 8 additions & 0 deletions scripts/inbound-s3-sns.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:000000000000:judgments",
"Events": ["s3:ObjectCreated:*"]
}
]
}
22 changes: 21 additions & 1 deletion scripts/setup-localstack.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ awslocal lambda create-function \
--zip-file fileb://dist/lambda.zip \
--handler ds-caselaw-ingester/lambda_function.handler \
--runtime python3.11 \
--environment "Variables={MARKLOGIC_HOST=$MARKLOGIC_HOST,MARKLOGIC_USER=$MARKLOGIC_USER,MARKLOGIC_PASSWORD=$MARKLOGIC_PASSWORD,AWS_BUCKET_NAME=$AWS_BUCKET_NAME,AWS_SECRET_KEY=$AWS_SECRET_KEY,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_ENDPOINT_URL=$AWS_ENDPOINT_URL,SQS_QUEUE_URL=$SQS_QUEUE_URL,ROLLBAR_TOKEN=$ROLLBAR_TOKEN,ROLLBAR_ENV=$ROLLBAR_ENV,NOTIFY_API_KEY=$NOTIFY_API_KEY,NOTIFY_EDITORIAL_ADDRESS=$NOTIFY_EDITORIAL_ADDRESS,NOTIFY_NEW_JUDGMENT_TEMPLATE_ID=$NOTIFY_NEW_JUDGMENT_TEMPLATE_ID,EDITORIAL_UI_BASE_URL=$EDITORIAL_UI_BASE_URL,PUBLIC_ASSET_BUCKET=$PUBLIC_ASSET_BUCKET}" \
--environment "Variables={MARKLOGIC_HOST=$MARKLOGIC_HOST,MARKLOGIC_USER=$MARKLOGIC_USER,MARKLOGIC_PASSWORD=$MARKLOGIC_PASSWORD,AWS_BUCKET_NAME=$AWS_BUCKET_NAME,AWS_SECRET_KEY=$AWS_SECRET_KEY,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_ENDPOINT_URL=$AWS_ENDPOINT_URL,SQS_QUEUE_URL=$SQS_QUEUE_URL,ROLLBAR_TOKEN=$ROLLBAR_TOKEN,ROLLBAR_ENV=$ROLLBAR_ENV,NOTIFY_API_KEY=$NOTIFY_API_KEY,NOTIFY_EDITORIAL_ADDRESS=$NOTIFY_EDITORIAL_ADDRESS,NOTIFY_NEW_JUDGMENT_TEMPLATE_ID=$NOTIFY_NEW_JUDGMENT_TEMPLATE_ID,EDITORIAL_UI_BASE_URL=$EDITORIAL_UI_BASE_URL,PUBLIC_ASSET_BUCKET=$PUBLIC_ASSET_BUCKET,LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT=500}" \
--role arn:aws:iam::000000000000:role/lambda-role \
--timeout 500

awslocal sns create-topic \
--name judgments \
Expand All @@ -30,6 +31,25 @@ awslocal s3api create-bucket \
awslocal s3api create-bucket \
--bucket public-asset-bucket



awslocal sns create-topic \
--name inbound-sns \
--attributes consignment-reference=string,s3-folder-url=string,consignment-type=string,number-of-retries=number

awslocal s3api create-bucket \
--bucket inbound-bucket

awslocal s3api put-bucket-notification-configuration \
--bucket inbound-bucket \
--notification-configuration file://scripts/inbound-s3-sns.json

awslocal sns subscribe --protocol lambda \
--region us-east-1 \
--topic-arn arn:aws:sns:us-east-1:000000000000:inbound-sns \
--notification-endpoint arn:aws:lambda:us-east-1:000000000000:function:te-lambda


if [ -n "$1" ]; then
awslocal s3 cp $1 s3://te-editorial-out-int
else
Expand Down

0 comments on commit daa236e

Please sign in to comment.