Skip to content

Commit

Permalink
Merge pull request #35 from nationalarchives/reduce-retries
Browse files Browse the repository at this point in the history
Tighten up retry mechanism
  • Loading branch information
Floppy authored May 12, 2022
2 parents 0eaf017 + 8f99a76 commit ec6c8a9
Showing 1 changed file with 70 additions and 72 deletions.
142 changes: 70 additions & 72 deletions ds-caselaw-ingester/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,80 +176,78 @@ def handler(event, context):
session = boto3.session.Session()
sqs_client = session.client('sqs')
s3_client = session.client('s3')

# Retrieve tar file from S3
http = urllib3.PoolManager()
try:
# Retrieve tar file from S3
http = urllib3.PoolManager()
file = http.request('GET', message["s3-folder-url"])

# Store it in the /tmp directory
filename = os.path.join("/tmp", f'{consignment_reference}.tar.gz')
with open(filename, 'wb') as out:
out.write(file.data)
out.close()

# Extract the judgment XML
tar = tarfile.open(filename, mode='r')
te_metadata_file = tar.extractfile(f'{consignment_reference}/TRE-{consignment_reference}-metadata.json')
metadata = decoder.decode(te_metadata_file.read().decode('utf-8'))

xml_file_name = metadata["parameters"]["TRE"]["payload"]["xml"]
try:
xml_file = tar.extractfile(f'{consignment_reference}/{xml_file_name}')
except KeyError:
xml_file = None

uri = extract_uri(metadata, consignment_reference)

if xml_file:
contents = xml_file.read()
elif 'failures' in uri:
contents = create_error_xml_contents(tar, consignment_reference)
else:
raise XmlFileNotFoundException(f'No XML file was found. Consignment Ref: {consignment_reference}')

ET.register_namespace("", "http://docs.oasis-open.org/legaldocml/ns/akn/3.0")
ET.register_namespace("uk", "https://caselaw.nationalarchives.gov.uk/akn")
xml = ET.XML(contents)

try:
api_client.get_judgment_xml(uri, show_unpublished=True)
api_client.save_judgment_xml(uri, xml)

# Notify editors that a document has been updated
send_updated_judgment_notification(uri, metadata)
print(f'Updated judgment {uri}')
except MarklogicResourceNotFoundError:
api_client.insert_judgment_xml(uri, xml)
# Notify editors that a new document is ready
send_new_judgment_notification(uri, metadata)
print(f'Inserted judgment {uri}')

# Store metadata
store_metadata(uri, metadata)

# Store docx and rename
docx_filename = extract_docx_filename(metadata)
if not filename:
raise DocxFilenameNotFoundException(f'No .docx filename was found in meta. Consignment Ref: {consignment_reference}')
copy_file(tar, f'{consignment_reference}/{docx_filename}', f'{uri.replace("/", "_")}.docx', uri, s3_client)

# Store parser log
copy_file(tar, f'{consignment_reference}/parser.log', 'parser.log', uri, s3_client)

# Store images
for image_filename in metadata["parameters"]["TRE"]["payload"]["images"]:
copy_file(tar, f'{consignment_reference}/{image_filename}', image_filename, uri, s3_client)

# Copy original tarfile
store_file(open(filename, mode='rb'), uri, os.path.basename(filename), s3_client)

if api_client.get_published(uri):
update_published_documents(uri, s3_client)

except (urllib3.exceptions.ProtocolError, tarfile.ReadError):
# Send retry message to sqs
except:
# Send retry message to sqs if the GET fails
send_retry_message(message, sqs_client)
except BaseException:
raise

# Store it in the /tmp directory
filename = os.path.join("/tmp", f'{consignment_reference}.tar.gz')
with open(filename, 'wb') as out:
out.write(file.data)
out.close()

# Extract the judgment XML
tar = tarfile.open(filename, mode='r')
te_metadata_file = tar.extractfile(f'{consignment_reference}/TRE-{consignment_reference}-metadata.json')
metadata = decoder.decode(te_metadata_file.read().decode('utf-8'))

xml_file_name = metadata["parameters"]["TRE"]["payload"]["xml"]
try:
xml_file = tar.extractfile(f'{consignment_reference}/{xml_file_name}')
except KeyError:
xml_file = None

uri = extract_uri(metadata, consignment_reference)

if xml_file:
contents = xml_file.read()
elif 'failures' in uri:
contents = create_error_xml_contents(tar, consignment_reference)
else:
raise XmlFileNotFoundException(f'No XML file was found. Consignment Ref: {consignment_reference}')

ET.register_namespace("", "http://docs.oasis-open.org/legaldocml/ns/akn/3.0")
ET.register_namespace("uk", "https://caselaw.nationalarchives.gov.uk/akn")
xml = ET.XML(contents)

try:
api_client.get_judgment_xml(uri, show_unpublished=True)
api_client.save_judgment_xml(uri, xml)

# Notify editors that a document has been updated
send_updated_judgment_notification(uri, metadata)
print(f'Updated judgment {uri}')
except MarklogicResourceNotFoundError:
api_client.insert_judgment_xml(uri, xml)
# Notify editors that a new document is ready
send_new_judgment_notification(uri, metadata)
print(f'Inserted judgment {uri}')

# Store metadata
store_metadata(uri, metadata)

# Store docx and rename
docx_filename = extract_docx_filename(metadata)
if not filename:
raise DocxFilenameNotFoundException(f'No .docx filename was found in meta. Consignment Ref: {consignment_reference}')
copy_file(tar, f'{consignment_reference}/{docx_filename}', f'{uri.replace("/", "_")}.docx', uri, s3_client)

# Store parser log
copy_file(tar, f'{consignment_reference}/parser.log', 'parser.log', uri, s3_client)

# Store images
for image_filename in metadata["parameters"]["TRE"]["payload"]["images"]:
copy_file(tar, f'{consignment_reference}/{image_filename}', image_filename, uri, s3_client)

# Copy original tarfile
store_file(open(filename, mode='rb'), uri, os.path.basename(filename), s3_client)

if api_client.get_published(uri):
update_published_documents(uri, s3_client)

return message

0 comments on commit ec6c8a9

Please sign in to comment.