From 155bee550b8bedaa439820b2913856e144c6fd21 Mon Sep 17 00:00:00 2001 From: Joel Balcaen Date: Tue, 23 Apr 2024 16:25:34 -0300 Subject: [PATCH] refactor --- lambdas/rich_pdf_ingestion/src/index.py | 67 +++++++++++++------ .../email_form_fill/state_machine.tf | 35 ++++++---- 2 files changed, 69 insertions(+), 33 deletions(-) diff --git a/lambdas/rich_pdf_ingestion/src/index.py b/lambdas/rich_pdf_ingestion/src/index.py index 2d9e80d..481bc73 100644 --- a/lambdas/rich_pdf_ingestion/src/index.py +++ b/lambdas/rich_pdf_ingestion/src/index.py @@ -2,6 +2,7 @@ import os import boto3 from pypdf import PdfReader +import uuid OBJECT_CREATED = "ObjectCreated" EXTRACTED_TEXT_S3_OBJECT_KEY_PREFIX = 'pdf_extraction_result' @@ -23,10 +24,29 @@ def extract_text_from_pdf(pdf_file_path): return text -def get_bucket_and_key(record): - bucket = record["s3"]["bucket"]["name"] - key = record["s3"]["object"]["key"] - return bucket, key +def parse_s3_arn(s3_arn): + # Remove the ARN prefix + s3_path = s3_arn.replace("arn:aws:s3:::", "") + + # Split the path into components + components = s3_path.split("/") + + # The first component is the bucket + bucket = components[0] + + # The rest of the components form the key + key = "/".join(components[1:]) + + # The folder is all components of the key except the last one + folder = "/".join(components[1:-1]) + filename_without_extension = os.path.splitext(os.path.basename(s3_path))[0] + + return { + "bucket": bucket, + "folder": folder, + "filename_without_extension": filename_without_extension, + "key": key + } def fetch_file(bucket, key): @@ -35,42 +55,47 @@ def fetch_file(bucket, key): return local_filename -def upload_text(extracted_text, bucket, key): - file_name = os.path.splitext( - os.path.basename(key) - )[0] + "_pdf_extracted_text.txt" +def upload_file(file_to_upload, bucket, key,): + with open(file_to_upload, "rb") as f: + s3.upload_fileobj(f, bucket, key) + + print(f"Stored file {s3_object_key} in bucket {bucket}") - local_file_path = f"{PATH_TO_WRITE_FILES}/{file_name}" - # build a new object key in an adjacent folder - parts = key.split('/') - parts.insert(-1, EXTRACTED_TEXT_S3_OBJECT_KEY_PREFIX) - s3_object_key = '/'.join(parts[:-1] + [file_name]) +def store_extracted_text_in_local_file(extracted_text): + local_file_path = f"{PATH_TO_WRITE_FILES}/{str(uuid.uuid4())}" with open(local_file_path, "w") as f: f.write(extracted_text) - with open(local_file_path, "rb") as f: - s3.upload_fileobj(f, bucket, s3_object_key) - - print(f"Stored file {s3_object_key} in bucket {bucket}") + return local_file_path def lambda_handler(event, context): print(event) - attachment_path = event['path'] - + attachment_s3_arn = event['path'] try: - bucket, key = get_bucket_and_key(attachment_path) + s3_info = parse_s3_arn(attachment_s3_arn) + bucket = s3_info["bucket"] + folder = s3_info['folder'] + key = s3_info["key"] + filename_without_extension = s3_info['filename_without_extension'] print(f"source_bucket: {bucket}, source_key: {key}") if os.path.splitext(key)[1][1:] == "pdf": local_filename = fetch_file(bucket, key) print("Extracting text from pdf") extracted_text = extract_text_from_pdf(local_filename) + extracted_text_local_file = store_extracted_text_in_local_file(extracted_text) print("Finished extracting text from pdf") - upload_text(extracted_text, bucket, key) + extracted_text_s3_key = "/".join([folder, filename_without_extension+"_extracted_pdf_content", str(uuid.uuid4())+".txt"]) + print("Uploading file to ", extracted_text_s3_key) + upload_file( + file_to_upload=extracted_text_local_file, + bucket=bucket, + key=extracted_text_s3_key + ) except Exception as e: print(e) diff --git a/state_machines/email_form_fill/state_machine.tf b/state_machines/email_form_fill/state_machine.tf index ebca9b4..8426933 100644 --- a/state_machines/email_form_fill/state_machine.tf +++ b/state_machines/email_form_fill/state_machine.tf @@ -110,7 +110,7 @@ resource "aws_sfn_state_machine" "sfn_state_machine" { "States": { "Download email attachments": { "Comment": "Extract attachments from a raw email MIME file and stores them in S3", - "Next": "Filter PDF attachments", + "Next": "Map", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "email-attachment-saver-dev", @@ -136,16 +136,30 @@ resource "aws_sfn_state_machine" "sfn_state_machine" { ], "Type": "Task" }, - "Extract Text/Tables/Images from PDF attachments": { - "End": true, + "Map": { + "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, - "StartAt": "Rich PDF Ingestion", + "StartAt": "Choice", "States": { + "Choice": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.extension", + "StringEquals": "pdf", + "Next": "Rich PDF Ingestion" + } + ], + "Default": "Pass" + }, + "Pass": { + "Type": "Pass", + "End": true + }, "Rich PDF Ingestion": { - "End": true, "OutputPath": "$.Payload", "Parameters": { "FunctionName": "arn:aws:lambda:us-east-1:446872271111:function:rich_pdf_ingestion:$LATEST", @@ -165,16 +179,13 @@ resource "aws_sfn_state_machine" "sfn_state_machine" { "MaxAttempts": 3 } ], - "Type": "Task" + "Type": "Task", + "End": true } } }, - "Type": "Map" - }, - "Filter PDF attachments": { - "InputPath": "$..attachment_arns[?(@.extension==pdf)]", - "Next": "Extract Text/Tables/Images from PDF attachments", - "Type": "Pass" + "ItemsPath": "$.attachment_arns", + "End": true } } }