Skip to content

Commit

Permalink
Merge pull request #236 from nationalarchives/FCL-485-pdf-only-mvp-mo…
Browse files Browse the repository at this point in the history
…dify-ingester-to-understand-pdf-only-imports

Assign documents a UUID on ingestion instead of using NCN-based URI
  • Loading branch information
jacksonj04 authored Dec 19, 2024
2 parents 9e4f3ec + 10b0577 commit a142a87
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 211 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ jobs:
SKIP: no-commit-to-branch
test:
env:
MARKLOGIC_HOST: ""
MARKLOGIC_USER: ""
MARKLOGIC_PASSWORD: ""
MARKLOGIC_HOST: ml-host
MARKLOGIC_USER: ml-user
MARKLOGIC_PASSWORD: ml-password
MARKLOGIC_USE_HTTPS: 0
AWS_BUCKET_NAME: judgments-original-versions
name: Run unit tests
runs-on: ubuntu-24.04
steps:
Expand Down
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ repos:
rev: v0.8.4
hooks:
- id: ruff-format
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
hooks:
- id: mypy
files: ^ds-caselaw-ingester/
additional_dependencies:
- types-requests
- types-python-dateutil
- types-pytz
- boto3-stubs[s3,sns]
- ds-caselaw-marklogic-api-client~=29.0.0

- repo: https://github.com/pre-commit/mirrors-prettier
rev: v4.0.0-alpha.8
Expand Down
22 changes: 0 additions & 22 deletions ds-caselaw-ingester/content_sqid.py

This file was deleted.

125 changes: 58 additions & 67 deletions ds-caselaw-ingester/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
import json
import logging
import os
import tarfile
import xml.etree.ElementTree as ET
from contextlib import suppress
from urllib.parse import unquote_plus
from uuid import uuid4
from xml.sax.saxutils import escape
from caselawclient.models.identifiers.neutral_citation import NeutralCitationNumber
from caselawclient.models.documents import DocumentURIString

import boto3
import boto3.s3
import rollbar
from boto3.session import Session
from botocore.exceptions import NoCredentialsError
from caselawclient.Client import (
DEFAULT_USER_AGENT,
MarklogicApiClient,
MarklogicResourceNotFoundError,
)
from caselawclient.client_helpers import VersionAnnotation, VersionType
from caselawclient.models.documents import DocumentURIString
from caselawclient.models.identifiers.neutral_citation import NeutralCitationNumber
from dotenv import load_dotenv
from mypy_boto3_s3.client import S3Client
from notifications_python_client.notifications import NotificationsAPIClient
import logging
from caselawclient.models.documents import Document

logger = logging.getLogger("ingester")
logger.setLevel(logging.DEBUG)

load_dotenv()

rollbar.init(os.getenv("ROLLBAR_TOKEN"), environment=os.getenv("ROLLBAR_ENV"))

MARKLOGIC_HOST: str = os.environ["MARKLOGIC_HOST"]
MARKLOGIC_USER: str = os.environ["MARKLOGIC_USER"]
MARKLOGIC_PASSWORD: str = os.environ["MARKLOGIC_PASSWORD"]
MARKLOGIC_USE_HTTPS: bool = bool(os.environ["MARKLOGIC_USE_HTTPS"])

AWS_BUCKET_NAME: str = os.environ["AWS_BUCKET_NAME"]

api_client = MarklogicApiClient(
host=os.getenv("MARKLOGIC_HOST", default=None),
username=os.getenv("MARKLOGIC_USER", default=None),
password=os.getenv("MARKLOGIC_PASSWORD", default=None),
use_https=os.getenv("MARKLOGIC_USE_HTTPS", default=False),
host=MARKLOGIC_HOST,
username=MARKLOGIC_USER,
password=MARKLOGIC_PASSWORD,
use_https=MARKLOGIC_USE_HTTPS,
user_agent=f"ds-caselaw-ingester/unknown {DEFAULT_USER_AGENT}",
)

Expand All @@ -44,7 +54,7 @@ def __init__(self, metadata):

@property
def is_tdr(self) -> bool:
return "TDR" in self.parameters.keys()
return "TDR" in self.parameters

@property
def force_publish(self):
Expand All @@ -63,7 +73,7 @@ def from_event(cls, event):
def from_message(cls, message: dict):
if message.get("Records", [{}])[0].get("eventSource") == "aws:s3":
return S3Message(message["Records"][0])
elif "parameters" in message.keys():
elif "parameters" in message:
return V2Message(message)
else:
raise InvalidMessageException(f"Did not recognise message type. {message}")
Expand Down Expand Up @@ -229,18 +239,6 @@ def extract_metadata(tar: tarfile.TarFile, consignment_reference: str):
return decoder.decode(te_metadata_file.read().decode("utf-8"))


def extract_uri(metadata: dict, consignment_reference: str) -> str:
uri = metadata["parameters"]["PARSER"].get("uri", "")

if uri:
uri = uri.replace("https://caselaw.nationalarchives.gov.uk/id/", "")

if not uri:
uri = f"failures/{consignment_reference}"

return uri


# called by tests
def get_consignment_reference(message):
return Message.from_message(message).get_consignment_reference()
Expand All @@ -263,10 +261,10 @@ def extract_lambda_versions(versions: list[dict[str, str]]) -> list[tuple[str, s
return version_tuples


def store_file(file, folder, filename, s3_client: Session.client):
def store_file(file, folder, filename, s3_client: S3Client):
pathname = f"{folder}/{filename}"
try:
s3_client.upload_fileobj(file, os.getenv("AWS_BUCKET_NAME"), pathname)
s3_client.upload_fileobj(file, AWS_BUCKET_NAME, pathname)
print(f"Upload Successful {pathname}")
except FileNotFoundError:
print(f"The file {pathname} was not found")
Expand All @@ -290,7 +288,7 @@ def personalise_email(uri: str, metadata: dict) -> dict:
}


def copy_file(tarfile, input_filename, output_filename, uri, s3_client: Session.client):
def copy_file(tarfile, input_filename, output_filename, uri, s3_client: S3Client):
try:
file = tarfile.extractfile(input_filename)
store_file(file, uri, output_filename, s3_client)
Expand Down Expand Up @@ -395,14 +393,14 @@ def __init__(self, message: Message):
print(f"Ingester Start: Consignment reference {self.consignment_reference}")
print(f"Received Message: {self.message.message}")
self.local_tar_filename = self.save_tar_file_in_s3()
self.tar = tarfile.open(self.local_tar_filename, mode="r")
self.metadata = extract_metadata(self.tar, self.consignment_reference)
self.message.update_consignment_reference(self.metadata["parameters"]["TRE"]["reference"])
self.consignment_reference = self.message.get_consignment_reference()
self.xml_file_name = self.metadata["parameters"]["TRE"]["payload"]["xml"]
self.uri = extract_uri(self.metadata, self.consignment_reference)
self.uri = DocumentURIString("d-" + str(uuid4()))
with tarfile.open(self.local_tar_filename, mode="r") as tar:
self.metadata = extract_metadata(tar, self.consignment_reference)
self.message.update_consignment_reference(self.metadata["parameters"]["TRE"]["reference"])
self.xml_file_name = self.metadata["parameters"]["TRE"]["payload"]["xml"]
self.xml = get_best_xml(self.uri, tar, self.xml_file_name, self.consignment_reference)
print(f"Ingesting document {self.uri}")
self.xml = get_best_xml(self.uri, self.tar, self.xml_file_name, self.consignment_reference)

def save_tar_file_in_s3(self):
"""This should be mocked out for testing -- get the tar file from S3 and
Expand Down Expand Up @@ -459,7 +457,7 @@ def set_document_identifiers(self) -> None:
doc.save_identifiers()
logger.info(f"Ingested document had NCN {ncn}")
else:
logger.info(f"Ingested document had NCN (NOT FOUND)")
logger.info("Ingested document had NCN (NOT FOUND)")

def send_updated_judgment_notification(self) -> None:
personalisation = personalise_email(self.uri, self.metadata)
Expand All @@ -476,10 +474,7 @@ def send_updated_judgment_notification(self) -> None:
print(f'Sent update notification to {os.getenv("NOTIFY_EDITORIAL_ADDRESS")} (Message ID: {response["id"]})')

def send_new_judgment_notification(self) -> None:
if "/press-summary/" in self.uri:
doctype = "Press Summary"
else:
doctype = "Judgment"
doctype = "Press Summary" if "/press-summary/" in self.uri else "Judgment"

personalisation = personalise_email(self.uri, self.metadata)
personalisation["doctype"] = doctype
Expand Down Expand Up @@ -535,48 +530,49 @@ def save_files_to_s3(self) -> None:
modified_targz_filename = (
self.local_tar_filename if docx_filename else modify_filename(self.local_tar_filename, "_nodocx")
)
store_file(
open(self.local_tar_filename, mode="rb"),
self.uri,
os.path.basename(modified_targz_filename),
s3_client,
)
with open(self.local_tar_filename, mode="rb") as local_tar:
store_file(
local_tar,
self.uri,
os.path.basename(modified_targz_filename),
s3_client,
)
print(f"saved tar.gz as {modified_targz_filename!r}")

# Store docx and rename
# The docx_filename is None for files which have been reparsed.
if docx_filename is not None:
copy_file(
self.tar,
f"{self.consignment_reference}/{docx_filename}",
f'{self.uri.replace("/", "_")}.docx',
self.uri,
s3_client,
)
with tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
tar,
f"{self.consignment_reference}/{docx_filename}",
f'{self.uri.replace("/", "_")}.docx',
self.uri,
s3_client,
)

# Store parser log
try:
with suppress(FileNotFoundException), tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
self.tar,
tar,
f"{self.consignment_reference}/parser.log",
"parser.log",
self.uri,
s3_client,
)
except FileNotFoundException:
pass

# Store images
image_list = self.metadata["parameters"]["TRE"]["payload"]["images"]
if image_list:
for image_filename in image_list:
copy_file(
self.tar,
f"{self.consignment_reference}/{image_filename}",
image_filename,
self.uri,
s3_client,
)
with tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
tar,
f"{self.consignment_reference}/{image_filename}",
image_filename,
self.uri,
s3_client,
)

@property
def metadata_object(self) -> Metadata:
Expand Down Expand Up @@ -611,9 +607,6 @@ def send_email(self) -> None:

raise RuntimeError(f"Didn't recognise originator {originator!r}")

def close_tar(self) -> None:
self.tar.close()

def upload_xml(self) -> None:
self.updated = self.update_document_xml()
self.inserted = False if self.updated else self.insert_document_xml()
Expand Down Expand Up @@ -641,7 +634,7 @@ def process_message(message):
ingest.send_email()

# Store metadata in Marklogic
has_TDR_data = "TDR" in ingest.metadata["parameters"].keys()
has_TDR_data = "TDR" in ingest.metadata["parameters"]
if has_TDR_data:
ingest.store_metadata()

Expand All @@ -655,8 +648,6 @@ def process_message(message):
else:
ingest.unpublish_updated_judgment()

ingest.close_tar()

print("Ingestion complete")
return message.message

Expand Down
45 changes: 0 additions & 45 deletions ds-caselaw-ingester/test_sqid.py

This file was deleted.

Loading

0 comments on commit a142a87

Please sign in to comment.