From 4146535a9612a07d2d4622a65e782b296c6f2ad1 Mon Sep 17 00:00:00 2001 From: Micah Halter Date: Fri, 8 Nov 2024 14:34:18 -0500 Subject: [PATCH] refactor: move to the new `capepy` library --- main.py | 99 +++++------------------------------------------- requirements.txt | 3 +- 2 files changed, 12 insertions(+), 90 deletions(-) diff --git a/main.py b/main.py index a1f2007..6f0049e 100644 --- a/main.py +++ b/main.py @@ -1,79 +1,22 @@ """ETL script for raw Epi/HAI sequencing report pdf.""" import io -import sys from datetime import datetime -from pathlib import Path -import boto3 as boto3 -from awsglue.context import GlueContext -from awsglue.utils import getResolvedOptions +from capepy.aws.glue import EtlJob from pypdf import PdfReader -from pyspark.sql import SparkSession from tabula.io import read_pdf -# for our purposes here, the spark and glue context are only (currently) needed -# to get the logger. -spark_ctx = SparkSession.builder.getOrCreate() # pyright: ignore -glue_ctx = GlueContext(spark_ctx) -logger = glue_ctx.get_logger() - -# TODO: -# - add error handling for the format of the document being incorrect -# - figure out how we want to name and namespace clean files (e.g. will we -# take the object key we're given, strip the extension and replace it with -# one for the new format, or will we do something else) -# - see what we can extract out of here to be useful for other ETLs. imagine -# we'd have a few different things that could be made into a reusable -# package - -parameters = getResolvedOptions( - sys.argv, - [ - "RAW_BUCKET_NAME", - "ALERT_OBJ_KEY", - "CLEAN_BUCKET_NAME", - ], -) - -raw_bucket_name = parameters["RAW_BUCKET_NAME"] -alert_obj_key = parameters["ALERT_OBJ_KEY"] -clean_bucket_name = parameters["CLEAN_BUCKET_NAME"] +etl_job = EtlJob() # NOTE: for now we'll take the alert object key and change out the file # extension for the clean data (leaving all namespacing and such). this # will probably need to change -clean_obj_key = str(Path(alert_obj_key).with_suffix(".csv")) - -# NOTE: May need some creds here -s3_client = boto3.client("s3") - -# try to get the pdf object from S3 and handle any error that would keep us -# from continuing. -response = s3_client.get_object(Bucket=raw_bucket_name, Key=alert_obj_key) - -status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - -if status != 200: - err = ( - f"ERROR - Could not get object {alert_obj_key} from bucket " - f"{raw_bucket_name}. ETL Cannot continue." - ) - - logger.error(err) - - # NOTE: need to properly handle exception stuff here, and we probably want - # this going somewhere very visible (e.g. SNS topic or a perpetual log - # as someone will need to be made aware) - raise Exception(err) - -logger.info(f"Obtained object {alert_obj_key} from bucket {raw_bucket_name}.") - -# handle the document itself... +clean_obj_key = etl_job.parameters["OBJECT_KEY"].replace(".pdf", ".csv") # the response should contain a StreamingBody object that needs to be converted # to a file like object to make the pdf libraries happy -f = io.BytesIO(response.get("Body").read()) +f = io.BytesIO(etl_job.get_raw_file()) try: # get the report date from the 4th line of the pdf @@ -88,15 +31,16 @@ f"{err}" ) - logger.error(err_message) + etl_job.logger.error(err_message) date_reported = "" try: # get two tables from the pdf tables = read_pdf(f, multiple_tables=True, pages=2) - mlst_st = tables[0] # pyright: ignore - genes = tables[1] # pyright: ignore + assert isinstance(tables, list) + mlst_st = tables[0] + genes = tables[1] except (IndexError, KeyError) as err: err_message = ( f"ERROR - Could not properly read sequencing PDF tables. " @@ -104,7 +48,7 @@ f"{err}" ) - logger.error(err_message) + etl_job.logger.error(err_message) # NOTE: need to properly handle exception stuff here, and we probably # want this going somewhere very visible (e.g. SNS topic or a @@ -121,27 +65,4 @@ # write out the transformed data with io.StringIO() as csv_buff: interim.to_csv(csv_buff, index=False) - - response = s3_client.put_object( - Bucket=clean_bucket_name, Key=clean_obj_key, Body=csv_buff.getvalue() - ) - - status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - - if status != 200: - err = ( - f"ERROR - Could not write transformed data object {clean_obj_key} " - f"to bucket {clean_bucket_name}. ETL Cannot continue." - ) - - logger.error(err) - - # NOTE: need to properly handle exception stuff here, and we probably - # want this going somewhere very visible (e.g. SNS topic or a - # perpetual log as someone will need to be made aware) - raise Exception(err) - - logger.info( - f"Transformed {raw_bucket_name}/{alert_obj_key} and wrote result " - f"to {clean_bucket_name}/{clean_obj_key}" - ) + etl_job.write_clean_file(csv_buff.getvalue(), clean_obj_key) diff --git a/requirements.txt b/requirements.txt index de96c13..1f5f4d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8 boto3==1.34.103 +capepy>=1.0.0,<2.0.0 pandas==2.2.2 pyspark==3.5.1 python-docx==1.1.2 tabula-py==2.9.3 -pypdf==4.3.1 \ No newline at end of file +pypdf==4.3.1