Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/fix docstrings to all files. #62

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

python tools to assist with standardized data ingestion workflows

## Installation, Usage, and Release Management

### Install from PyPi

```
```python
pip install osc-ingest-tools
```

Expand Down Expand Up @@ -100,14 +102,14 @@ checks.
Enabling automatic formatting via [pre-commit](https://pre-commit.com/) is
recommended:

```
```shell
pip install black isort pre-commit
pre-commit install
```

To ensure compliance with static check tools, developers may wish to run;

```
```shell
pip install black isort
# auto-sort imports
isort .
Expand All @@ -117,7 +119,7 @@ black .

Code can then be tested using tox.

```
```shell
# run static checks and tests
tox
# run only tests
Expand All @@ -139,7 +141,7 @@ To release a new version of this library, authorized developers should;

E.g.,

```
```shell
git commit -sm "Release v0.3.4"
git tag v0.3.4
git push --follow-tags
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to simplify use of S3-based data, Pandas dataframes, and Trino SQL tables."""

from .boto3_utils import attach_s3_bucket, upload_directory_to_s3
from .dotenv_utils import load_credentials_dotenv
from .sqlcols import (
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/boto3_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""AWS S3 interoperability functions."""

import os
from pathlib import Path
from typing import Union
Expand All @@ -12,6 +14,13 @@


def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str, verbose: bool = False) -> None:
"""Upload files to an S3 bucket.

path -- the directory containing all files to be uploaded.
bucket -- the S3 bucket.
prefix -- the prefix prepended to each filename before uploading.
verbose -- if True, print each file uploaded (with its prefix).
"""
path = str(path)
prefix = str(prefix)
for subdir, dirs, files in os.walk(path):
Expand All @@ -25,6 +34,7 @@ def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str,


def attach_s3_bucket(env_var_prefix: str) -> Bucket:
"""Return the S3 Bucket resource asscoiated with env_var_prefix (typically from `credentials.env`)."""
s3 = boto3.resource(
service_name="s3",
endpoint_url=os.environ[f"{env_var_prefix}_ENDPOINT"],
Expand Down
9 changes: 6 additions & 3 deletions osc_ingest_trino/dotenv_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to read credentials files and inject secrets into the environment."""

import os
import pathlib

Expand All @@ -9,9 +11,10 @@


def load_credentials_dotenv() -> None:
# Load some standard environment variables from a dot-env file, if it exists.
# If no such file can be found, does not fail, and so allows these environment vars to
# be populated in some other way
"""Load some standard environment variables from a dot-env file, if it exists.

If no such file can be found, do not raise, allowing these environment vars to be populated in some other way.
"""
dotenv_dir = os.environ.get("CREDENTIAL_DOTENV_DIR", os.environ.get("PWD", "/opt/app-root/src"))
dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
Expand Down
5 changes: 5 additions & 0 deletions osc_ingest_trino/sqlcols.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas column names to SQL column names."""

import re
from typing import List, Union, cast

Expand All @@ -16,6 +18,7 @@

# 63 seems to be a common max column name length
def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str], str]:
"""Convert name to a SQL-compliant table or column name, abbreviating some common words."""
if isinstance(name, list):
return [cast(str, sql_compliant_name(e, maxlen=maxlen)) for e in name]
w = str(name).casefold().rstrip().lstrip()
Expand All @@ -40,6 +43,7 @@ def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str


def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: int = 63) -> pd.DataFrame:
"""Ensure that all column names for df are SQL-compliant."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
icols = df.columns.to_list()
Expand All @@ -51,6 +55,7 @@ def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: in


def enforce_partition_column_order(df: pd.DataFrame, pcols: List[str], inplace: bool = False) -> pd.DataFrame:
"""Reorder columns names of df to match the order given by pcols."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(pcols, list):
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/sqltypes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas dataframes to SQL equivalents."""

from typing import Dict

import pandas as pd
Expand Down Expand Up @@ -25,6 +27,7 @@


def pandas_type_to_sql(pt: str, typemap: Dict[str, str] = {}):
"""Return the SQL type corresponding to the pandas type `pt` (using special mappings, if any, from `typemap`)."""
if not isinstance(typemap, dict):
raise ValueError("typemap must be a dict")
# user defined typemap overrides _p2smap
Expand All @@ -40,6 +43,13 @@ def create_table_schema_pairs(
colmap: Dict[str, str] = {},
indent: int = 4,
) -> str:
"""Create SQL column, type pairs that can appear in a CREATE TABLE operation.

df -- the dataframe to be rendered as a SQL table
typemap -- mappings from dtypes to SQL types above and beyond our defaults
colmap -- mappings of df column names to SQL column names if not using defaults
indent -- how many spaces of indent to make our SQL declarations pretty
"""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(colmap, dict):
Expand Down
39 changes: 39 additions & 0 deletions osc_ingest_trino/trino_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Trino interoperability functions."""

import math
import os
import uuid
Expand Down Expand Up @@ -32,6 +34,13 @@ def attach_trino_engine(
schema: Optional[str] = None,
verbose: Optional[bool] = False,
) -> Engine:
"""Return a SQLAlchemy engine object representing a Trino instance.

env_var_prefix -- a prefix for all environment variables related to the Trino instance.
catalog -- the Trino catalog.
schema -- the Trino schema.
verbose -- if True, print the full string used to connect.
"""
sqlstring = "trino://{user}@{host}:{port}".format(
user=os.environ[f"{env_var_prefix}_USER"],
host=os.environ[f"{env_var_prefix}_HOST"],
Expand All @@ -57,6 +66,12 @@ def attach_trino_engine(
def _do_sql(
sql: Union[sqlalchemy.sql.elements.TextClause, str], engine: Engine, verbose: bool = False
) -> Optional[Sequence[Row[Any]]]:
"""Execute SQL query, returning the query result.

sql -- the SQL query.
engine -- the SQLAlchemy engine representing the Trino database.
verbose -- if True, print the values returned from executing the string.
"""
if type(sql) is not sqlalchemy.sql.elements.TextClause:
sql = text(str(sql))
if verbose:
Expand Down Expand Up @@ -86,6 +101,22 @@ def fast_pandas_ingest_via_hive( # noqa: C901
colmap: Dict[str, str] = {},
verbose: bool = False,
) -> None:
"""Efficiently export a dataframe into a Trino database.

df -- the dataframe to export.
engine -- the SQLAlchemy engine representing the Trino database.
catalog -- the Trino catalog.
schema -- the Trino schema.
table -- the name of the table created in the schema.
hive_bucket -- the backing store of the Hive metastore.
hive_catalog -- the Hive metastore catalog (where schemas are created).
hive_schema -- the Hive metastore schema (where tables will be created).
partition_columns -- if not empty, defines the partition columns of the table created.
overwrite -- if True, an existing table will be overwritten.
typemap -- used to format types that cannot otherwise be properly inferred.
colmap -- used to format column names that cannot otherwise be properly inferred.
verbose -- if True, print the queries being executed and the results of those queries.
"""
uh8 = uuid.uuid4().hex[:8]
hive_table = f"ingest_temp_{uh8}"

Expand Down Expand Up @@ -157,6 +188,8 @@ def fast_pandas_ingest_via_hive( # noqa: C901


class TrinoBatchInsert(object):
"""A class used to bundle together basic Trino parameters."""

def __init__(
self,
catalog: Optional[str] = None,
Expand All @@ -165,6 +198,7 @@ def __init__(
optimize: bool = False,
verbose: bool = False,
):
"""Initialize TrinoBatchInsert objects."""
self.catalog = catalog
self.schema = schema
self.batch_size = batch_size
Expand All @@ -175,6 +209,7 @@ def __init__(
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
# https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_iter: List[Tuple]) -> None:
"""Implement `callable` interface for row-by-row insertion."""
fqname = self._full_table_name(sqltbl)
batch: List[str] = []
self.ninserts = 0
Expand All @@ -201,6 +236,7 @@ def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_it
print(f"execute optimize: {x}")

def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
"""Implement actual row-by-row insertion of BATCH data into table FQNAME using DBCXN database connection."""
if self.verbose:
print(f"inserting {len(batch)} records")
TrinoBatchInsert._print_batch(batch)
Expand All @@ -218,6 +254,7 @@ def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
print(f"batch insert result: {x}")

def _full_table_name(self, sqltbl: Table) -> str:
"""Return fully qualified table name for SQLTBL table within this TrinoBatchInsert object."""
# start with table name
name: str = f"{sqltbl.name}"
# prepend schema - allow override from this class
Expand All @@ -231,6 +268,7 @@ def _full_table_name(self, sqltbl: Table) -> str:

@staticmethod
def _sqlform(x: Any) -> str:
"""Format the value of x so it can appear in a SQL Values context."""
if x is None:
return "NULL"
if isinstance(x, str):
Expand All @@ -254,6 +292,7 @@ def _sqlform(x: Any) -> str:

@staticmethod
def _print_batch(batch: List[str]) -> None:
"""For batch, a list of SQL query lines, print up to the first 5 such."""
if len(batch) > 5:
print("\n".join(f" {e}" for e in batch[:3]))
print(" ...")
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/unmanaged/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions create and clean up unmanaged Hive tables."""

from .unmanaged_hive_ingest import (
drop_unmanaged_data,
drop_unmanaged_table,
Expand Down
8 changes: 8 additions & 0 deletions osc_ingest_trino/unmanaged/unmanaged_hive_ingest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to create, ingest, and drop unmanaged Hive tables."""

import shutil
import uuid

Expand All @@ -17,6 +19,7 @@


def _remove_trailing_slash(s):
"""Remove trailing slash from s."""
s = str(s)
if len(s) == 0:
return s
Expand All @@ -26,10 +29,12 @@ def _remove_trailing_slash(s):


def _prefix(pfx, schema, table):
"""Translate pfx, schema, and table names into S3 bucket name."""
return _remove_trailing_slash(pfx).format(schema=schema, table=table)


def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default_prefix, verbose=False):
"""Drop catalog.schema.table from Hive metastore and also delete its S3 backing store."""
sql = text(f"drop table if exists {catalog}.{schema}.{table}")
with engine.begin() as cxn:
qres = cxn.execute(sql)
Expand All @@ -40,6 +45,7 @@ def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default


def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=False):
"""Delete data that may have been orphaned when its table was dropped in Hive metastore."""
dres = bucket.objects.filter(Prefix=f"{_prefix(prefix, schema, table)}/").delete()
if verbose:
print(dres)
Expand All @@ -49,6 +55,7 @@ def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=F
def ingest_unmanaged_parquet(
df, schema, table, bucket, partition_columns=[], append=True, workdir="/tmp", prefix=_default_prefix, verbose=False
):
"""Ingest data from df into Hive metastore table with backing store bucket."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
Expand Down Expand Up @@ -83,6 +90,7 @@ def ingest_unmanaged_parquet(
def unmanaged_parquet_tabledef(
df, catalog, schema, table, bucket, partition_columns=[], typemap={}, colmap={}, verbose=False
):
"""Return a SQL string that would create a table suitable for ingesting df into Hive metastore backed by bucket."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
Expand Down
Loading
Loading