Skip to content

Commit

Permalink
Merge branch 'main' into update-devops-tooling
Browse files Browse the repository at this point in the history
Signed-off-by: Modeseven Industrial Solutions <[email protected]>
  • Loading branch information
ModeSevenIndustrialSolutions authored May 13, 2024
2 parents 9801241 + c6b7645 commit c0f612d
Show file tree
Hide file tree
Showing 19 changed files with 2,049 additions and 94 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 160
extend-ignore = E203, E501
27 changes: 0 additions & 27 deletions .github/workflows/ci.yml

This file was deleted.

16 changes: 1 addition & 15 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
### SIGNING ###

- name: "Sign packages with Sigstore"
uses: sigstore/[email protected].0
uses: sigstore/[email protected].1
with:
inputs: >-
./dist/*.tar.gz
Expand Down Expand Up @@ -86,20 +86,6 @@ jobs:
name: ${{ github.ref_name }}
path: dist/

- name: "📦 Publish release to GitHub"
uses: ModeSevenIndustrialSolutions/action-automatic-releases@latest
with:
# Valid inputs are:
# repo_token, automatic_release_tag, draft, prerelease, title, files
repo_token: ${{ secrets.GITHUB_TOKEN }}
prerelease: false
automatic_release_tag: ${{ github.ref_name }}
title: ${{ github.ref_name }}
files: |
dist/*.tar.gz
dist/*.whl
dist/*.sigstore
- name: "📦 Publish artefacts to GitHub"
# https://github.com/softprops/action-gh-release
uses: softprops/action-gh-release@v2
Expand Down
32 changes: 0 additions & 32 deletions .github/workflows/release.yml

This file was deleted.

3 changes: 2 additions & 1 deletion .github/workflows/test-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ jobs:
### SIGNING ###

- name: "Sign packages with Sigstore"
uses: sigstore/[email protected]
uses: sigstore/[email protected]

with:
inputs: >-
./dist/*.tar.gz
Expand Down
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ repos:
additional_dependencies:
- pep8-naming

- repo: https://github.com/adrienverge/yamllint.git
rev: v1.35.1
hooks:
- id: yamllint
args: [ "-d", "{rules: {line-length: {max: 120}}, ignore-from-file: [.gitignore],}", ]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.10.0"
hooks:
Expand All @@ -129,12 +135,6 @@ repos:
args: [--show-error-codes]
additional_dependencies: ["pytest", "types-requests"]

- repo: https://github.com/adrienverge/yamllint.git
rev: v1.35.1
hooks:
- id: yamllint
args: [ "-d", "{rules: {line-length: {max: 120}}, ignore-from-file: [.gitignore],}", ]

# Check for misspellings in documentation files
# - repo: https://github.com/codespell-project/codespell
# rev: v2.2.2
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

python tools to assist with standardized data ingestion workflows

## Install from PyPi
## Installation, Usage, and Release Management

### Install from PyPi

```console
pip install osc-ingest-tools
Expand Down Expand Up @@ -118,6 +120,7 @@ black .
Code can then be tested using tox:

```console
=======
# run static checks and tests
tox
# run only tests
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to simplify use of S3-based data, Pandas dataframes, and Trino SQL tables."""

from .boto3_utils import attach_s3_bucket, upload_directory_to_s3
from .dotenv_utils import load_credentials_dotenv
from .sqlcols import (
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/boto3_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""AWS S3 interoperability functions."""

import os
from pathlib import Path
from typing import Union
Expand All @@ -12,6 +14,13 @@


def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str, verbose: bool = False) -> None:
"""Upload files to an S3 bucket.
path -- the directory containing all files to be uploaded.
bucket -- the S3 bucket.
prefix -- the prefix prepended to each filename before uploading.
verbose -- if True, print each file uploaded (with its prefix).
"""
path = str(path)
prefix = str(prefix)
for subdir, dirs, files in os.walk(path):
Expand All @@ -25,6 +34,7 @@ def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str,


def attach_s3_bucket(env_var_prefix: str) -> Bucket:
"""Return the S3 Bucket resource asscoiated with env_var_prefix (typically from `credentials.env`)."""
s3 = boto3.resource(
service_name="s3",
endpoint_url=os.environ[f"{env_var_prefix}_ENDPOINT"],
Expand Down
9 changes: 6 additions & 3 deletions osc_ingest_trino/dotenv_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to read credentials files and inject secrets into the environment."""

import os
import pathlib

Expand All @@ -9,9 +11,10 @@


def load_credentials_dotenv() -> None:
# Load some standard environment variables from a dot-env file, if it exists.
# If no such file can be found, does not fail, and so allows these environment vars to
# be populated in some other way
"""Load some standard environment variables from a dot-env file, if it exists.
If no such file can be found, do not raise, allowing these environment vars to be populated in some other way.
"""
dotenv_dir = os.environ.get("CREDENTIAL_DOTENV_DIR", os.environ.get("PWD", "/opt/app-root/src"))
dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
Expand Down
5 changes: 5 additions & 0 deletions osc_ingest_trino/sqlcols.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas column names to SQL column names."""

import re
from typing import List, Union, cast

Expand All @@ -16,6 +18,7 @@

# 63 seems to be a common max column name length
def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str], str]:
"""Convert name to a SQL-compliant table or column name, abbreviating some common words."""
if isinstance(name, list):
return [cast(str, sql_compliant_name(e, maxlen=maxlen)) for e in name]
w = str(name).casefold().rstrip().lstrip()
Expand All @@ -40,6 +43,7 @@ def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str


def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: int = 63) -> pd.DataFrame:
"""Ensure that all column names for df are SQL-compliant."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
icols = df.columns.to_list()
Expand All @@ -51,6 +55,7 @@ def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: in


def enforce_partition_column_order(df: pd.DataFrame, pcols: List[str], inplace: bool = False) -> pd.DataFrame:
"""Reorder columns names of df to match the order given by pcols."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(pcols, list):
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/sqltypes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas dataframes to SQL equivalents."""

from typing import Dict

import pandas as pd
Expand Down Expand Up @@ -25,6 +27,7 @@


def pandas_type_to_sql(pt: str, typemap: Dict[str, str] = {}):
"""Return the SQL type corresponding to the pandas type `pt` (using special mappings, if any, from `typemap`)."""
if not isinstance(typemap, dict):
raise ValueError("typemap must be a dict")
# user defined typemap overrides _p2smap
Expand All @@ -40,6 +43,13 @@ def create_table_schema_pairs(
colmap: Dict[str, str] = {},
indent: int = 4,
) -> str:
"""Create SQL column, type pairs that can appear in a CREATE TABLE operation.
df -- the dataframe to be rendered as a SQL table
typemap -- mappings from dtypes to SQL types above and beyond our defaults
colmap -- mappings of df column names to SQL column names if not using defaults
indent -- how many spaces of indent to make our SQL declarations pretty
"""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(colmap, dict):
Expand Down
39 changes: 39 additions & 0 deletions osc_ingest_trino/trino_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Trino interoperability functions."""

import math
import os
import uuid
Expand Down Expand Up @@ -32,6 +34,13 @@ def attach_trino_engine(
schema: Optional[str] = None,
verbose: Optional[bool] = False,
) -> Engine:
"""Return a SQLAlchemy engine object representing a Trino instance.
env_var_prefix -- a prefix for all environment variables related to the Trino instance.
catalog -- the Trino catalog.
schema -- the Trino schema.
verbose -- if True, print the full string used to connect.
"""
sqlstring = "trino://{user}@{host}:{port}".format(
user=os.environ[f"{env_var_prefix}_USER"],
host=os.environ[f"{env_var_prefix}_HOST"],
Expand All @@ -57,6 +66,12 @@ def attach_trino_engine(
def _do_sql(
sql: Union[sqlalchemy.sql.elements.TextClause, str], engine: Engine, verbose: bool = False
) -> Optional[Sequence[Row[Any]]]:
"""Execute SQL query, returning the query result.
sql -- the SQL query.
engine -- the SQLAlchemy engine representing the Trino database.
verbose -- if True, print the values returned from executing the string.
"""
if type(sql) is not sqlalchemy.sql.elements.TextClause:
sql = text(str(sql))
if verbose:
Expand Down Expand Up @@ -86,6 +101,22 @@ def fast_pandas_ingest_via_hive( # noqa: C901
colmap: Dict[str, str] = {},
verbose: bool = False,
) -> None:
"""Efficiently export a dataframe into a Trino database.
df -- the dataframe to export.
engine -- the SQLAlchemy engine representing the Trino database.
catalog -- the Trino catalog.
schema -- the Trino schema.
table -- the name of the table created in the schema.
hive_bucket -- the backing store of the Hive metastore.
hive_catalog -- the Hive metastore catalog (where schemas are created).
hive_schema -- the Hive metastore schema (where tables will be created).
partition_columns -- if not empty, defines the partition columns of the table created.
overwrite -- if True, an existing table will be overwritten.
typemap -- used to format types that cannot otherwise be properly inferred.
colmap -- used to format column names that cannot otherwise be properly inferred.
verbose -- if True, print the queries being executed and the results of those queries.
"""
uh8 = uuid.uuid4().hex[:8]
hive_table = f"ingest_temp_{uh8}"

Expand Down Expand Up @@ -157,6 +188,8 @@ def fast_pandas_ingest_via_hive( # noqa: C901


class TrinoBatchInsert(object):
"""A class used to bundle together basic Trino parameters."""

def __init__(
self,
catalog: Optional[str] = None,
Expand All @@ -165,6 +198,7 @@ def __init__(
optimize: bool = False,
verbose: bool = False,
):
"""Initialize TrinoBatchInsert objects."""
self.catalog = catalog
self.schema = schema
self.batch_size = batch_size
Expand All @@ -175,6 +209,7 @@ def __init__(
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
# https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_iter: List[Tuple]) -> None:
"""Implement `callable` interface for row-by-row insertion."""
fqname = self._full_table_name(sqltbl)
batch: List[str] = []
self.ninserts = 0
Expand All @@ -201,6 +236,7 @@ def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_it
print(f"execute optimize: {x}")

def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
"""Implement actual row-by-row insertion of BATCH data into table FQNAME using DBCXN database connection."""
if self.verbose:
print(f"inserting {len(batch)} records")
TrinoBatchInsert._print_batch(batch)
Expand All @@ -218,6 +254,7 @@ def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
print(f"batch insert result: {x}")

def _full_table_name(self, sqltbl: Table) -> str:
"""Return fully qualified table name for SQLTBL table within this TrinoBatchInsert object."""
# start with table name
name: str = f"{sqltbl.name}"
# prepend schema - allow override from this class
Expand All @@ -231,6 +268,7 @@ def _full_table_name(self, sqltbl: Table) -> str:

@staticmethod
def _sqlform(x: Any) -> str:
"""Format the value of x so it can appear in a SQL Values context."""
if x is None:
return "NULL"
if isinstance(x, str):
Expand All @@ -254,6 +292,7 @@ def _sqlform(x: Any) -> str:

@staticmethod
def _print_batch(batch: List[str]) -> None:
"""For batch, a list of SQL query lines, print up to the first 5 such."""
if len(batch) > 5:
print("\n".join(f" {e}" for e in batch[:3]))
print(" ...")
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/unmanaged/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions create and clean up unmanaged Hive tables."""

from .unmanaged_hive_ingest import (
drop_unmanaged_data,
drop_unmanaged_table,
Expand Down
Loading

0 comments on commit c0f612d

Please sign in to comment.