Skip to content

Commit

Permalink
New parquet utils (#5)
Browse files Browse the repository at this point in the history
* add several new utility functions

* normalize version

* fixing bugs from RCs

* 0.2.0

* add prefix param

* 0.2.0
  • Loading branch information
erikerlandson authored Dec 5, 2021
1 parent 1576952 commit 0167dcb
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 7 deletions.
14 changes: 13 additions & 1 deletion osc_ingest_trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,27 @@
"""

# defines the release version for this python package
__version__ = "0.1.1"
__version__ = "0.2.0"

from .sqlcols import *
from .sqltypes import *
from .boto3_utils import *
from .dotenv_utils import *
from .trino_utils import *

__all__ = [
"sql_compliant_name",
"enforce_sql_column_names",
"enforce_partition_column_order",
"pandas_type_to_sql",
"create_table_schema_pairs",
"attach_s3_bucket",
"upload_directory_to_s3",
"load_credentials_dotenv",
"attach_trino_engine",
"drop_unmanaged_table",
"drop_unmanaged_data",
"ingest_unmanaged_parquet",
"unmanaged_parquet_tabledef",
]

29 changes: 29 additions & 0 deletions osc_ingest_trino/boto3_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import boto3

__all__ = [
"upload_directory_to_s3",
"attach_s3_bucket",
]

def upload_directory_to_s3(path, bucket, prefix, verbose=False):
path = str(path)
prefix = str(prefix)
for subdir, dirs, files in os.walk(path):
for f in files:
pfx = subdir.replace(path, prefix)
src = os.path.join(subdir, f)
dst = os.path.join(pfx, f)
if verbose:
print(f'{src} --> {dst}')
bucket.upload_file(src, dst)

def attach_s3_bucket(env_var_prefix):
s3 = boto3.resource(
service_name="s3",
endpoint_url=os.environ[f"{env_var_prefix}_ENDPOINT"],
aws_access_key_id=os.environ[f"{env_var_prefix}_ACCESS_KEY"],
aws_secret_access_key=os.environ[f"{env_var_prefix}_SECRET_KEY"],
)
return s3.Bucket(os.environ[f"{env_var_prefix}_BUCKET"])

16 changes: 16 additions & 0 deletions osc_ingest_trino/dotenv_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
import pathlib
from dotenv import load_dotenv

__all__ = [
"load_credentials_dotenv",
]

def load_credentials_dotenv():
# Load some standard environment variables from a dot-env file, if it exists.
# If no such file can be found, does not fail, and so allows these environment vars to
# be populated in some other way
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path=dotenv_path, override=True)
17 changes: 17 additions & 0 deletions osc_ingest_trino/sqlcols.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
__all__ = [
"sql_compliant_name",
"enforce_sql_column_names",
"enforce_partition_column_order",
]

_wsdedup = re.compile(r"\s+")
Expand Down Expand Up @@ -44,3 +45,19 @@ def enforce_sql_column_names(df, inplace=False, maxlen=63):
rename_map = dict(list(zip(icols, ocols)))
return df.rename(columns=rename_map, inplace=inplace)

def enforce_partition_column_order(df, pcols, inplace=False):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(pcols, list):
raise ValueError("pcols must be list of column names")
pcols = [str(e) for e in pcols]
cols = list(df.columns.values)
for c in pcols:
cols.remove(c)
cols.append(c)
if not inplace:
return df[cols]
for c in cols:
s = df[c]
df.drop(columns=[c], inplace=True)
df[c] = s
6 changes: 2 additions & 4 deletions osc_ingest_trino/sqltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ def pandas_type_to_sql(pt, typemap={}):
return st
raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))

# add ability to specify optional dict for specific fields?
# if column name is present, use specified value?
def create_table_schema_pairs(df, typemap={}):
def create_table_schema_pairs(df, typemap = {}, indent = 4):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
ptypes = [str(e) for e in df.dtypes.to_list()]
stypes = [pandas_type_to_sql(e, typemap=typemap) for e in ptypes]
pz = list(zip(df.columns.to_list(), stypes))
return ",\n".join([" {n} {t}".format(n=e[0],t=e[1]) for e in pz])
return ",\n".join([f"{' '*indent}{e[0]} {e[1]}" for e in pz])

110 changes: 110 additions & 0 deletions osc_ingest_trino/trino_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import shutil
import uuid

import trino
import pandas as pd
from sqlalchemy.engine import create_engine

from .boto3_utils import upload_directory_to_s3
from .sqltypes import create_table_schema_pairs

__all__ = [
"attach_trino_engine",
"drop_unmanaged_table",
"drop_unmanaged_data",
"ingest_unmanaged_parquet",
"unmanaged_parquet_tabledef",
]

_default_prefix = 'trino/{schema}/{table}'

def _remove_trailing_slash(s):
s = str(s)
if len(s) == 0: return s
if (s[-1] != '/'): return s
return _remove_trailing_slash(s[:-1])

def _prefix(pfx, schema, table):
return _remove_trailing_slash(pfx).format(schema = schema, table = table)

def attach_trino_engine(env_var_prefix = 'TRINO'):
sqlstring = 'trino://{user}@{host}:{port}/'.format(
user = os.environ[f'{env_var_prefix}_USER'],
host = os.environ[f'{env_var_prefix}_HOST'],
port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
'http_scheme': 'https'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()
return engine

def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default_prefix, verbose=False):
sql = f'drop table if exists {catalog}.{schema}.{table}'
qres = engine.execute(sql)
dres = bucket.objects \
.filter(Prefix = f'{_prefix(prefix, schema, table)}/') \
.delete()
if verbose:
print(dres)
return qres

def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=False):
dres = bucket.objects \
.filter(Prefix = f'{_prefix(prefix, schema, table)}/') \
.delete()
if verbose: print(dres)
return dres

def ingest_unmanaged_parquet(df, schema, table, bucket, partition_columns=[], append=True, workdir='/tmp', prefix=_default_prefix, verbose=False):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
raise ValueError("partition_columns must be list of column names")

s3pfx = _prefix(prefix, schema, table)

if not append:
dres = bucket.objects.filter(Prefix = f'{s3pfx}/').delete()
if verbose: print(dres)

if len(partition_columns) > 0:
# tell pandas to write a directory tree, using partitions
tmp = f'{workdir}/{table}'
# pandas does not clean out destination directory for you:
shutil.rmtree(tmp, ignore_errors=True)
df.to_parquet(tmp,
partition_cols=partition_columns,
index=False)
# upload the tree onto S3
upload_directory_to_s3(tmp, bucket, s3pfx, verbose=verbose)
else:
# do not use partitions: a single parquet file is created
parquet = f'{uuid.uuid4().hex}.parquet'
tmp = f'{workdir}/{parquet}'
df.to_parquet(tmp, index=False)
dst = f'{s3pfx}/{parquet}'
if verbose: print(f'{tmp} --> {dst}')
bucket.upload_file(tmp, dst)

def unmanaged_parquet_tabledef(df, catalog, schema, table, bucket, partition_columns = [], verbose = False):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
raise ValueError("partition_columns must be list of column names")

columnschema = create_table_schema_pairs(df)

tabledef = f"create table if not exists {catalog}.{schema}.{table} (\n"
tabledef += f"{columnschema}\n"
tabledef += ") with (\n format = 'parquet',\n"
if len(partition_columns) > 0:
tabledef += f" partitioned_by = array{partition_columns},\n"
tabledef += f" external_location = 's3a://{bucket.name}/trino/{schema}/{table}/'\n)"

if verbose: print(tabledef)
return tabledef

4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

setup(
name = "osc-ingest-tools",
version = "0.1.1",
version = "0.2.0",
description = "python tools to assist with standardized data ingestion workflows for the OS-Climate project",
long_description = README,
long_description_content_type = "text/markdown",
Expand All @@ -26,7 +26,7 @@
],
packages = find_packages(),
include_package_data = True,
install_requires = ["pandas"],
install_requires = ["pandas", "trino", "boto3", "sqlalchemy", "sqlalchemy-trino", "python-dotenv"],
entry_points = {
"console_scripts": []
},
Expand Down

0 comments on commit 0167dcb

Please sign in to comment.