Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign documents a UUID on ingestion instead of using NCN-based URI #236

8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ jobs:
SKIP: no-commit-to-branch
test:
env:
MARKLOGIC_HOST: ""
MARKLOGIC_USER: ""
MARKLOGIC_PASSWORD: ""
MARKLOGIC_HOST: ml-host
MARKLOGIC_USER: ml-user
MARKLOGIC_PASSWORD: ml-password
MARKLOGIC_USE_HTTPS: 0
AWS_BUCKET_NAME: judgments-original-versions
name: Run unit tests
runs-on: ubuntu-24.04
steps:
Expand Down
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ repos:
rev: v0.8.4
hooks:
- id: ruff-format
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
hooks:
- id: mypy
files: ^ds-caselaw-ingester/
additional_dependencies:
- types-requests
- types-python-dateutil
- types-pytz
- boto3-stubs[s3,sns]
- ds-caselaw-marklogic-api-client~=29.0.0

- repo: https://github.com/pre-commit/mirrors-prettier
rev: v4.0.0-alpha.8
Expand Down
22 changes: 0 additions & 22 deletions ds-caselaw-ingester/content_sqid.py

This file was deleted.

125 changes: 58 additions & 67 deletions ds-caselaw-ingester/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
import json
import logging
import os
import tarfile
import xml.etree.ElementTree as ET
from contextlib import suppress
from urllib.parse import unquote_plus
from uuid import uuid4
from xml.sax.saxutils import escape
from caselawclient.models.identifiers.neutral_citation import NeutralCitationNumber
from caselawclient.models.documents import DocumentURIString

import boto3
import boto3.s3
import rollbar
from boto3.session import Session
from botocore.exceptions import NoCredentialsError
from caselawclient.Client import (
DEFAULT_USER_AGENT,
MarklogicApiClient,
MarklogicResourceNotFoundError,
)
from caselawclient.client_helpers import VersionAnnotation, VersionType
from caselawclient.models.documents import DocumentURIString
from caselawclient.models.identifiers.neutral_citation import NeutralCitationNumber
from dotenv import load_dotenv
from mypy_boto3_s3.client import S3Client
from notifications_python_client.notifications import NotificationsAPIClient
import logging
from caselawclient.models.documents import Document

logger = logging.getLogger("ingester")
logger.setLevel(logging.DEBUG)

load_dotenv()

rollbar.init(os.getenv("ROLLBAR_TOKEN"), environment=os.getenv("ROLLBAR_ENV"))

MARKLOGIC_HOST: str = os.environ["MARKLOGIC_HOST"]
MARKLOGIC_USER: str = os.environ["MARKLOGIC_USER"]
MARKLOGIC_PASSWORD: str = os.environ["MARKLOGIC_PASSWORD"]
MARKLOGIC_USE_HTTPS: bool = bool(os.environ["MARKLOGIC_USE_HTTPS"])

AWS_BUCKET_NAME: str = os.environ["AWS_BUCKET_NAME"]

api_client = MarklogicApiClient(
host=os.getenv("MARKLOGIC_HOST", default=None),
username=os.getenv("MARKLOGIC_USER", default=None),
password=os.getenv("MARKLOGIC_PASSWORD", default=None),
use_https=os.getenv("MARKLOGIC_USE_HTTPS", default=False),
host=MARKLOGIC_HOST,
username=MARKLOGIC_USER,
password=MARKLOGIC_PASSWORD,
use_https=MARKLOGIC_USE_HTTPS,
user_agent=f"ds-caselaw-ingester/unknown {DEFAULT_USER_AGENT}",
)

Expand All @@ -44,7 +54,7 @@ def __init__(self, metadata):

@property
def is_tdr(self) -> bool:
return "TDR" in self.parameters.keys()
return "TDR" in self.parameters

@property
def force_publish(self):
Expand All @@ -63,7 +73,7 @@ def from_event(cls, event):
def from_message(cls, message: dict):
if message.get("Records", [{}])[0].get("eventSource") == "aws:s3":
return S3Message(message["Records"][0])
elif "parameters" in message.keys():
elif "parameters" in message:
return V2Message(message)
else:
raise InvalidMessageException(f"Did not recognise message type. {message}")
Expand Down Expand Up @@ -229,18 +239,6 @@ def extract_metadata(tar: tarfile.TarFile, consignment_reference: str):
return decoder.decode(te_metadata_file.read().decode("utf-8"))


def extract_uri(metadata: dict, consignment_reference: str) -> str:
uri = metadata["parameters"]["PARSER"].get("uri", "")

if uri:
uri = uri.replace("https://caselaw.nationalarchives.gov.uk/id/", "")

if not uri:
uri = f"failures/{consignment_reference}"

return uri


# called by tests
def get_consignment_reference(message):
return Message.from_message(message).get_consignment_reference()
Expand All @@ -263,10 +261,10 @@ def extract_lambda_versions(versions: list[dict[str, str]]) -> list[tuple[str, s
return version_tuples


def store_file(file, folder, filename, s3_client: Session.client):
def store_file(file, folder, filename, s3_client: S3Client):
pathname = f"{folder}/{filename}"
try:
s3_client.upload_fileobj(file, os.getenv("AWS_BUCKET_NAME"), pathname)
s3_client.upload_fileobj(file, AWS_BUCKET_NAME, pathname)
print(f"Upload Successful {pathname}")
except FileNotFoundError:
print(f"The file {pathname} was not found")
Expand All @@ -290,7 +288,7 @@ def personalise_email(uri: str, metadata: dict) -> dict:
}


def copy_file(tarfile, input_filename, output_filename, uri, s3_client: Session.client):
def copy_file(tarfile, input_filename, output_filename, uri, s3_client: S3Client):
try:
file = tarfile.extractfile(input_filename)
store_file(file, uri, output_filename, s3_client)
Expand Down Expand Up @@ -395,14 +393,14 @@ def __init__(self, message: Message):
print(f"Ingester Start: Consignment reference {self.consignment_reference}")
print(f"Received Message: {self.message.message}")
self.local_tar_filename = self.save_tar_file_in_s3()
self.tar = tarfile.open(self.local_tar_filename, mode="r")
self.metadata = extract_metadata(self.tar, self.consignment_reference)
self.message.update_consignment_reference(self.metadata["parameters"]["TRE"]["reference"])
self.consignment_reference = self.message.get_consignment_reference()
self.xml_file_name = self.metadata["parameters"]["TRE"]["payload"]["xml"]
self.uri = extract_uri(self.metadata, self.consignment_reference)
self.uri = DocumentURIString("d-" + str(uuid4()))
with tarfile.open(self.local_tar_filename, mode="r") as tar:
self.metadata = extract_metadata(tar, self.consignment_reference)
self.message.update_consignment_reference(self.metadata["parameters"]["TRE"]["reference"])
self.xml_file_name = self.metadata["parameters"]["TRE"]["payload"]["xml"]
self.xml = get_best_xml(self.uri, tar, self.xml_file_name, self.consignment_reference)
print(f"Ingesting document {self.uri}")
self.xml = get_best_xml(self.uri, self.tar, self.xml_file_name, self.consignment_reference)

def save_tar_file_in_s3(self):
"""This should be mocked out for testing -- get the tar file from S3 and
Expand Down Expand Up @@ -459,7 +457,7 @@ def set_document_identifiers(self) -> None:
doc.save_identifiers()
logger.info(f"Ingested document had NCN {ncn}")
else:
logger.info(f"Ingested document had NCN (NOT FOUND)")
logger.info("Ingested document had NCN (NOT FOUND)")

def send_updated_judgment_notification(self) -> None:
personalisation = personalise_email(self.uri, self.metadata)
Expand All @@ -476,10 +474,7 @@ def send_updated_judgment_notification(self) -> None:
print(f'Sent update notification to {os.getenv("NOTIFY_EDITORIAL_ADDRESS")} (Message ID: {response["id"]})')

def send_new_judgment_notification(self) -> None:
if "/press-summary/" in self.uri:
doctype = "Press Summary"
else:
doctype = "Judgment"
doctype = "Press Summary" if "/press-summary/" in self.uri else "Judgment"

personalisation = personalise_email(self.uri, self.metadata)
personalisation["doctype"] = doctype
Expand Down Expand Up @@ -535,48 +530,49 @@ def save_files_to_s3(self) -> None:
modified_targz_filename = (
self.local_tar_filename if docx_filename else modify_filename(self.local_tar_filename, "_nodocx")
)
store_file(
open(self.local_tar_filename, mode="rb"),
self.uri,
os.path.basename(modified_targz_filename),
s3_client,
)
with open(self.local_tar_filename, mode="rb") as local_tar:
store_file(
local_tar,
self.uri,
os.path.basename(modified_targz_filename),
s3_client,
)
print(f"saved tar.gz as {modified_targz_filename!r}")

# Store docx and rename
# The docx_filename is None for files which have been reparsed.
if docx_filename is not None:
copy_file(
self.tar,
f"{self.consignment_reference}/{docx_filename}",
f'{self.uri.replace("/", "_")}.docx',
self.uri,
s3_client,
)
with tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
tar,
f"{self.consignment_reference}/{docx_filename}",
f'{self.uri.replace("/", "_")}.docx',
self.uri,
s3_client,
)

# Store parser log
try:
with suppress(FileNotFoundException), tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
self.tar,
tar,
f"{self.consignment_reference}/parser.log",
"parser.log",
self.uri,
s3_client,
)
except FileNotFoundException:
pass

# Store images
image_list = self.metadata["parameters"]["TRE"]["payload"]["images"]
if image_list:
for image_filename in image_list:
copy_file(
self.tar,
f"{self.consignment_reference}/{image_filename}",
image_filename,
self.uri,
s3_client,
)
with tarfile.open(self.local_tar_filename, mode="r") as tar:
copy_file(
tar,
f"{self.consignment_reference}/{image_filename}",
image_filename,
self.uri,
s3_client,
)

@property
def metadata_object(self) -> Metadata:
Expand Down Expand Up @@ -611,9 +607,6 @@ def send_email(self) -> None:

raise RuntimeError(f"Didn't recognise originator {originator!r}")

def close_tar(self) -> None:
self.tar.close()

def upload_xml(self) -> None:
self.updated = self.update_document_xml()
self.inserted = False if self.updated else self.insert_document_xml()
Expand Down Expand Up @@ -641,7 +634,7 @@ def process_message(message):
ingest.send_email()

# Store metadata in Marklogic
has_TDR_data = "TDR" in ingest.metadata["parameters"].keys()
has_TDR_data = "TDR" in ingest.metadata["parameters"]
if has_TDR_data:
ingest.store_metadata()

Expand All @@ -655,8 +648,6 @@ def process_message(message):
else:
ingest.unpublish_updated_judgment()

ingest.close_tar()

print("Ingestion complete")
return message.message

Expand Down
45 changes: 0 additions & 45 deletions ds-caselaw-ingester/test_sqid.py

This file was deleted.

Loading
Loading