Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahbenizzy committed Nov 28, 2023
1 parent 058189c commit 9c9a941
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 2 deletions.
2 changes: 1 addition & 1 deletion contrib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ To get started, you'll want to do the following:
- [ ] Commit the files we just added
- [ ] Create a PR
- [ ] Tag one of the maintainers [elijahbenizzy](https://github.com/elijahbenizzy), [skrawcz](https://github.com/skrawcz), or [zilto](https://github.com/zilto) for a review
- [ ] Ping us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg) if you don't hear back within a few hours
- [ ] Ping us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg) if you don't hear back within a few days

#### Username Management

Expand Down
1 change: 1 addition & 0 deletions contrib/hamilton/contrib/user/elijahbenizzy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""elijahbenizzy's dataflows"""
14 changes: 14 additions & 0 deletions contrib/hamilton/contrib/user/elijahbenizzy/author.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# elijahbenizzy

Elijah is one of the co-authors of Hamilton! He loves building out tooling for clean, reliable, and scalable dataflows.

In his spare time, he enjoys cycling, cooking, reading antique maps, learning about contemporary history, and hacking with fractals.

# Github
https://github.com/elijahbenizzy

# Linkedin
https://linkedin.com/in/elijahenizzy

# X (Twitter)
https://twitter.com/elijahbenizzy
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Purpose of this module
Template module to show what needs to be filled out.

# Configuration Options
This module can be configured with the following options:
[list options]

# Limitations
Write limitations/assumptions/known issues here.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# --- START NOTICES (optional)
# --- END NOTICES
# --- START IMPORT SECTION
import logging

logger = logging.getLogger(__name__)

from hamilton import contrib

with contrib.catch_import_errors(__name__, __file__, logger):
# non-hamilton imports go here
pass

# hamilton imports go here; check for required version if need be.

# --- END IMPORT SECTION

# --- START HAMILTON DATAFLOW
import dataclasses
import logging
import os
from pathlib import Path
from typing import List

import boto3
import pandas as pd
from boto3 import Session

from hamilton.htypes import Collect, Parallelizable

# from hamilton.log_setup import setup_logging

logger = logging.getLogger(__name__)


def s3(aws_profile: str = "dagworks") -> boto3.resource:
"""Returns a boto3 resource for the 'aws_profile' profile"""

# Create a session using the 'dagworks' profile
session = Session(profile_name=aws_profile)

# Use the session to create the S3 resource
return session.resource("s3")


@dataclasses.dataclass
class ToDownload:
key: str
bucket: str


def ensured_save_dir(save_dir: str) -> str:
if not os.path.exists(save_dir):
Path(save_dir).mkdir()
return save_dir


def downloadable(
s3: boto3.resource, bucket: str, path_in_bucket: str, slice: int = None
) -> Parallelizable[ToDownload]:
"""Lists downloadables from the s3 bucket"""

bucket_obj = s3.Bucket(bucket)
objs = list(bucket_obj.objects.filter(Prefix=path_in_bucket).all())
if slice is not None:
objs = objs[:slice]
logger.info(f"Found {len(objs)} objects in {bucket}/{path_in_bucket}")
for obj in objs:
yield ToDownload(key=obj.key, bucket=bucket)


def _already_downloaded(path: str) -> bool:
"""Checks if the data is already downloaded"""
if os.path.exists(path):
return True
return False


def downloaded_data(
downloadable: ToDownload,
ensured_save_dir: str,
) -> str:
"""Downloads data, short-circuiting if the data already exists locally
:param s3:
:param bucket:
:param path_in_bucket:
:param save_dir:
:return:
"""
download_location = os.path.join(ensured_save_dir, downloadable.key)
if _already_downloaded(download_location):
logger.info(f"Already downloaded {download_location}")
return download_location
parent_path = os.path.dirname(download_location)
if not os.path.exists(parent_path):
os.makedirs(parent_path, exist_ok=True)
s3_resource = s3() # we want to ensure threadsafety --
# we could do this in a pool, but for now we'll just create it cause we're doing this in
# parallel

bucket = s3_resource.Bucket(downloadable.bucket)
bucket.download_file(downloadable.key, download_location)
logger.info(f"Downloaded {download_location}")
return download_location


def all_downloaded_data(downloaded_data: Collect[str]) -> List[str]:
"""Returns a list of all downloaded locations"""
out = []
for path in downloaded_data:
out.append(path)
return out


def _jsonl_parse(path: str) -> pd.DataFrame:
"""Loads a jsonl file into a dataframe"""
df = pd.read_json(path, lines=True)
return df[["created_at", "ip", "distinct_id", "timestamp", "person_id"]]


def processed_dataframe(all_downloaded_data: List[str]) -> pd.DataFrame:
"""Processes everything into a dataframe"""
out = []
for floc in all_downloaded_data:
out.append(_jsonl_parse(floc))
return pd.concat(out)


# --- END HAMILTON DATAFLOW
# --- START MAIN CODE
if __name__ == "__main__":
# Code to create an imaging showing on DAG workflow.
# run as a script to test Hamilton's execution
import __init__ as MODULE_NAME
from hamilton import base, driver

dr = driver.Driver(
{}, # CONFIG: fill as appropriate
MODULE_NAME,
adapter=base.DefaultAdapter(),
)
# saves to current working directory creating dag.png.
dr.display_all_functions("dag", {"format": "png", "view": False})
# --- END MAIN CODE
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
pandas
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"schema": "1.0",
"use_case_tags": ["example"],
"secondary_tags": {
"language": "English"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

logger = logging.getLogger(__name__)

from hamilton import contrib # noqa E402
from hamilton import contrib

with contrib.catch_import_errors(__name__, __file__, logger):
# non-hamilton imports go here
Expand Down

0 comments on commit 9c9a941

Please sign in to comment.