From 6c6911dcd9da987add337817c292bd0f5508a656 Mon Sep 17 00:00:00 2001 From: Anthony Hashemi Date: Mon, 9 Oct 2023 12:12:09 +0100 Subject: [PATCH] Filter out non-courtdoc messages from eventbus --- aws_examples/sns/parsed-judgment-v2.json | 2 +- ds-caselaw-ingester/lambda_function.py | 19 +++++++++++++++++++ ds-caselaw-ingester/tests.py | 2 +- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/aws_examples/sns/parsed-judgment-v2.json b/aws_examples/sns/parsed-judgment-v2.json index 47d3db2..4099e77 100644 --- a/aws_examples/sns/parsed-judgment-v2.json +++ b/aws_examples/sns/parsed-judgment-v2.json @@ -1,6 +1,6 @@ { "properties": { - "messageType": "uk.gov.nationalarchives.tre.messages.courtdocumentpackage.available.CourtDocumentPackageAvailable", + "messageType": "uk.gov.nationalarchives.da.messages.courtdocumentpackage.available.CourtDocumentPackageAvailable", "timestamp": "2023-06-28T09:09:25.670688Z", "function": "staging-tre-court-document-pack-lambda", "producer": "TRE", diff --git a/ds-caselaw-ingester/lambda_function.py b/ds-caselaw-ingester/lambda_function.py index 0d5ab49..d8949c4 100644 --- a/ds-caselaw-ingester/lambda_function.py +++ b/ds-caselaw-ingester/lambda_function.py @@ -95,6 +95,9 @@ def save_s3_response(self, sqs_client, s3_client): return filename + def is_valid_ingester_message(self): + return True + class V2Message(Message): def is_v1(self): @@ -120,6 +123,14 @@ def save_s3_response(self, sqs_client, s3_client): print(f"tar.gz saved locally as {filename}") return filename + def is_valid_ingester_message(self): + message_type = self.message["properties"]["messageType"] + print(f"Message type: {message_type}") + return ( + message_type + == "uk.gov.nationalarchives.da.messages.courtdocumentpackage.available.CourtDocumentPackageAvailable" + ) + class ReportableException(Exception): def __init__(self, *args, **kwargs): @@ -127,6 +138,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) +class WrongMessageTypeError(ReportableException): + pass + + class S3HTTPError(ReportableException): pass @@ -413,6 +428,10 @@ def unpublish_updated_judgment(uri): def handler(event, context): message = Message.from_event(event) + if not message.is_valid_ingester_message(): + print("Message type is not valid for ingestion. Skipping.") + return + consignment_reference = message.get_consignment_reference() print(f"Ingester Start: Consignment reference {consignment_reference}") print(f"Received Message: {message.message}") diff --git a/ds-caselaw-ingester/tests.py b/ds-caselaw-ingester/tests.py index a0161bb..e98a199 100644 --- a/ds-caselaw-ingester/tests.py +++ b/ds-caselaw-ingester/tests.py @@ -26,7 +26,7 @@ { "properties": { "messageType": - "uk.gov.nationalarchives.tre.messages.judgmentpackage.available.JudgmentPackageAvailable", + "uk.gov.nationalarchives.da.messages.courtdocumentpackage.available.CourtDocumentPackageAvailable", "timestamp": "2023-05-15T09:14:53.791409Z", "function": "staging-tre-judgment-packer-lambda", "producer": "TRE",