From 9c9a9411d3209a55ecd5e2b02521bba026c13a59 Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Mon, 27 Nov 2023 20:53:41 -0800 Subject: [PATCH] WIP --- contrib/README.md | 2 +- .../contrib/user/elijahbenizzy/__init__.py | 1 + .../contrib/user/elijahbenizzy/author.md | 14 ++ .../s3_parallel_dataframe/README.md | 9 ++ .../s3_parallel_dataframe/__init__.py | 145 ++++++++++++++++++ .../s3_parallel_dataframe/dag.png | 0 .../s3_parallel_dataframe/requirements.txt | 2 + .../s3_parallel_dataframe/tags.json | 7 + .../s3_parallel_dataframe/valid_configs.jsonl | 1 + .../dataflow_template/__init__.py | 2 +- 10 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/__init__.py create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/author.md create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/README.md create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/__init__.py create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/dag.png create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/requirements.txt create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/tags.json create mode 100644 contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/valid_configs.jsonl diff --git a/contrib/README.md b/contrib/README.md index ceb79d80a..05136d159 100644 --- a/contrib/README.md +++ b/contrib/README.md @@ -156,7 +156,7 @@ To get started, you'll want to do the following: - [ ] Commit the files we just added - [ ] Create a PR - [ ] Tag one of the maintainers [elijahbenizzy](https://github.com/elijahbenizzy), [skrawcz](https://github.com/skrawcz), or [zilto](https://github.com/zilto) for a review - - [ ] Ping us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg) if you don't hear back within a few hours + - [ ] Ping us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg) if you don't hear back within a few days #### Username Management diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/__init__.py b/contrib/hamilton/contrib/user/elijahbenizzy/__init__.py new file mode 100644 index 000000000..3190f2fba --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/__init__.py @@ -0,0 +1 @@ +"""elijahbenizzy's dataflows""" diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/author.md b/contrib/hamilton/contrib/user/elijahbenizzy/author.md new file mode 100644 index 000000000..4a0392171 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/author.md @@ -0,0 +1,14 @@ +# elijahbenizzy + +Elijah is one of the co-authors of Hamilton! He loves building out tooling for clean, reliable, and scalable dataflows. + +In his spare time, he enjoys cycling, cooking, reading antique maps, learning about contemporary history, and hacking with fractals. + +# Github +https://github.com/elijahbenizzy + +# Linkedin +https://linkedin.com/in/elijahenizzy + +# X (Twitter) +https://twitter.com/elijahbenizzy diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/README.md b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/README.md new file mode 100644 index 000000000..956f7e639 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/README.md @@ -0,0 +1,9 @@ +# Purpose of this module +Template module to show what needs to be filled out. + +# Configuration Options +This module can be configured with the following options: +[list options] + +# Limitations +Write limitations/assumptions/known issues here. diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/__init__.py b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/__init__.py new file mode 100644 index 000000000..4f2e2fd99 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/__init__.py @@ -0,0 +1,145 @@ +# --- START NOTICES (optional) +# --- END NOTICES +# --- START IMPORT SECTION +import logging + +logger = logging.getLogger(__name__) + +from hamilton import contrib + +with contrib.catch_import_errors(__name__, __file__, logger): + # non-hamilton imports go here + pass + +# hamilton imports go here; check for required version if need be. + +# --- END IMPORT SECTION + +# --- START HAMILTON DATAFLOW +import dataclasses +import logging +import os +from pathlib import Path +from typing import List + +import boto3 +import pandas as pd +from boto3 import Session + +from hamilton.htypes import Collect, Parallelizable + +# from hamilton.log_setup import setup_logging + +logger = logging.getLogger(__name__) + + +def s3(aws_profile: str = "dagworks") -> boto3.resource: + """Returns a boto3 resource for the 'aws_profile' profile""" + + # Create a session using the 'dagworks' profile + session = Session(profile_name=aws_profile) + + # Use the session to create the S3 resource + return session.resource("s3") + + +@dataclasses.dataclass +class ToDownload: + key: str + bucket: str + + +def ensured_save_dir(save_dir: str) -> str: + if not os.path.exists(save_dir): + Path(save_dir).mkdir() + return save_dir + + +def downloadable( + s3: boto3.resource, bucket: str, path_in_bucket: str, slice: int = None +) -> Parallelizable[ToDownload]: + """Lists downloadables from the s3 bucket""" + + bucket_obj = s3.Bucket(bucket) + objs = list(bucket_obj.objects.filter(Prefix=path_in_bucket).all()) + if slice is not None: + objs = objs[:slice] + logger.info(f"Found {len(objs)} objects in {bucket}/{path_in_bucket}") + for obj in objs: + yield ToDownload(key=obj.key, bucket=bucket) + + +def _already_downloaded(path: str) -> bool: + """Checks if the data is already downloaded""" + if os.path.exists(path): + return True + return False + + +def downloaded_data( + downloadable: ToDownload, + ensured_save_dir: str, +) -> str: + """Downloads data, short-circuiting if the data already exists locally + + :param s3: + :param bucket: + :param path_in_bucket: + :param save_dir: + :return: + """ + download_location = os.path.join(ensured_save_dir, downloadable.key) + if _already_downloaded(download_location): + logger.info(f"Already downloaded {download_location}") + return download_location + parent_path = os.path.dirname(download_location) + if not os.path.exists(parent_path): + os.makedirs(parent_path, exist_ok=True) + s3_resource = s3() # we want to ensure threadsafety -- + # we could do this in a pool, but for now we'll just create it cause we're doing this in + # parallel + + bucket = s3_resource.Bucket(downloadable.bucket) + bucket.download_file(downloadable.key, download_location) + logger.info(f"Downloaded {download_location}") + return download_location + + +def all_downloaded_data(downloaded_data: Collect[str]) -> List[str]: + """Returns a list of all downloaded locations""" + out = [] + for path in downloaded_data: + out.append(path) + return out + + +def _jsonl_parse(path: str) -> pd.DataFrame: + """Loads a jsonl file into a dataframe""" + df = pd.read_json(path, lines=True) + return df[["created_at", "ip", "distinct_id", "timestamp", "person_id"]] + + +def processed_dataframe(all_downloaded_data: List[str]) -> pd.DataFrame: + """Processes everything into a dataframe""" + out = [] + for floc in all_downloaded_data: + out.append(_jsonl_parse(floc)) + return pd.concat(out) + + +# --- END HAMILTON DATAFLOW +# --- START MAIN CODE +if __name__ == "__main__": + # Code to create an imaging showing on DAG workflow. + # run as a script to test Hamilton's execution + import __init__ as MODULE_NAME + from hamilton import base, driver + + dr = driver.Driver( + {}, # CONFIG: fill as appropriate + MODULE_NAME, + adapter=base.DefaultAdapter(), + ) + # saves to current working directory creating dag.png. + dr.display_all_functions("dag", {"format": "png", "view": False}) +# --- END MAIN CODE diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/dag.png b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/dag.png new file mode 100644 index 000000000..e69de29bb diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/requirements.txt b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/requirements.txt new file mode 100644 index 000000000..b4fd8fec0 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/requirements.txt @@ -0,0 +1,2 @@ +boto3 +pandas diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/tags.json b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/tags.json new file mode 100644 index 000000000..e0b903290 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/tags.json @@ -0,0 +1,7 @@ +{ + "schema": "1.0", + "use_case_tags": ["example"], + "secondary_tags": { + "language": "English" + } +} diff --git a/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/valid_configs.jsonl b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/valid_configs.jsonl new file mode 100644 index 000000000..0967ef424 --- /dev/null +++ b/contrib/hamilton/contrib/user/elijahbenizzy/s3_parallel_dataframe/valid_configs.jsonl @@ -0,0 +1 @@ +{} diff --git a/contrib/hamilton/contrib/user/example_dataflow_template/dataflow_template/__init__.py b/contrib/hamilton/contrib/user/example_dataflow_template/dataflow_template/__init__.py index 55f8dedcb..bd696ec34 100644 --- a/contrib/hamilton/contrib/user/example_dataflow_template/dataflow_template/__init__.py +++ b/contrib/hamilton/contrib/user/example_dataflow_template/dataflow_template/__init__.py @@ -5,7 +5,7 @@ logger = logging.getLogger(__name__) -from hamilton import contrib # noqa E402 +from hamilton import contrib with contrib.catch_import_errors(__name__, __file__, logger): # non-hamilton imports go here