diff --git a/osc_ingest_trino/__init__.py b/osc_ingest_trino/__init__.py index 04b20ea..404d828 100644 --- a/osc_ingest_trino/__init__.py +++ b/osc_ingest_trino/__init__.py @@ -5,15 +5,27 @@ """ # defines the release version for this python package -__version__ = "0.1.1" +__version__ = "0.2.0" from .sqlcols import * from .sqltypes import * +from .boto3_utils import * +from .dotenv_utils import * +from .trino_utils import * __all__ = [ "sql_compliant_name", "enforce_sql_column_names", + "enforce_partition_column_order", "pandas_type_to_sql", "create_table_schema_pairs", + "attach_s3_bucket", + "upload_directory_to_s3", + "load_credentials_dotenv", + "attach_trino_engine", + "drop_unmanaged_table", + "drop_unmanaged_data", + "ingest_unmanaged_parquet", + "unmanaged_parquet_tabledef", ] diff --git a/osc_ingest_trino/boto3_utils.py b/osc_ingest_trino/boto3_utils.py new file mode 100644 index 0000000..3737ae0 --- /dev/null +++ b/osc_ingest_trino/boto3_utils.py @@ -0,0 +1,29 @@ +import os +import boto3 + +__all__ = [ + "upload_directory_to_s3", + "attach_s3_bucket", +] + +def upload_directory_to_s3(path, bucket, prefix, verbose=False): + path = str(path) + prefix = str(prefix) + for subdir, dirs, files in os.walk(path): + for f in files: + pfx = subdir.replace(path, prefix) + src = os.path.join(subdir, f) + dst = os.path.join(pfx, f) + if verbose: + print(f'{src} --> {dst}') + bucket.upload_file(src, dst) + +def attach_s3_bucket(env_var_prefix): + s3 = boto3.resource( + service_name="s3", + endpoint_url=os.environ[f"{env_var_prefix}_ENDPOINT"], + aws_access_key_id=os.environ[f"{env_var_prefix}_ACCESS_KEY"], + aws_secret_access_key=os.environ[f"{env_var_prefix}_SECRET_KEY"], + ) + return s3.Bucket(os.environ[f"{env_var_prefix}_BUCKET"]) + diff --git a/osc_ingest_trino/dotenv_utils.py b/osc_ingest_trino/dotenv_utils.py new file mode 100644 index 0000000..e14dd05 --- /dev/null +++ b/osc_ingest_trino/dotenv_utils.py @@ -0,0 +1,16 @@ +import os +import pathlib +from dotenv import load_dotenv + +__all__ = [ + "load_credentials_dotenv", +] + +def load_credentials_dotenv(): + # Load some standard environment variables from a dot-env file, if it exists. + # If no such file can be found, does not fail, and so allows these environment vars to + # be populated in some other way + dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src')) + dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env' + if os.path.exists(dotenv_path): + load_dotenv(dotenv_path=dotenv_path, override=True) diff --git a/osc_ingest_trino/sqlcols.py b/osc_ingest_trino/sqlcols.py index aa5283d..c5fde16 100644 --- a/osc_ingest_trino/sqlcols.py +++ b/osc_ingest_trino/sqlcols.py @@ -4,6 +4,7 @@ __all__ = [ "sql_compliant_name", "enforce_sql_column_names", + "enforce_partition_column_order", ] _wsdedup = re.compile(r"\s+") @@ -44,3 +45,19 @@ def enforce_sql_column_names(df, inplace=False, maxlen=63): rename_map = dict(list(zip(icols, ocols))) return df.rename(columns=rename_map, inplace=inplace) +def enforce_partition_column_order(df, pcols, inplace=False): + if not isinstance(df, pd.DataFrame): + raise ValueError("df must be a pandas DataFrame") + if not isinstance(pcols, list): + raise ValueError("pcols must be list of column names") + pcols = [str(e) for e in pcols] + cols = list(df.columns.values) + for c in pcols: + cols.remove(c) + cols.append(c) + if not inplace: + return df[cols] + for c in cols: + s = df[c] + df.drop(columns=[c], inplace=True) + df[c] = s diff --git a/osc_ingest_trino/sqltypes.py b/osc_ingest_trino/sqltypes.py index 288097c..c6cd264 100644 --- a/osc_ingest_trino/sqltypes.py +++ b/osc_ingest_trino/sqltypes.py @@ -29,13 +29,11 @@ def pandas_type_to_sql(pt, typemap={}): return st raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt)) -# add ability to specify optional dict for specific fields? -# if column name is present, use specified value? -def create_table_schema_pairs(df, typemap={}): +def create_table_schema_pairs(df, typemap = {}, indent = 4): if not isinstance(df, pd.DataFrame): raise ValueError("df must be a pandas DataFrame") ptypes = [str(e) for e in df.dtypes.to_list()] stypes = [pandas_type_to_sql(e, typemap=typemap) for e in ptypes] pz = list(zip(df.columns.to_list(), stypes)) - return ",\n".join([" {n} {t}".format(n=e[0],t=e[1]) for e in pz]) + return ",\n".join([f"{' '*indent}{e[0]} {e[1]}" for e in pz]) diff --git a/osc_ingest_trino/trino_utils.py b/osc_ingest_trino/trino_utils.py new file mode 100644 index 0000000..4965ccd --- /dev/null +++ b/osc_ingest_trino/trino_utils.py @@ -0,0 +1,110 @@ +import os +import shutil +import uuid + +import trino +import pandas as pd +from sqlalchemy.engine import create_engine + +from .boto3_utils import upload_directory_to_s3 +from .sqltypes import create_table_schema_pairs + +__all__ = [ + "attach_trino_engine", + "drop_unmanaged_table", + "drop_unmanaged_data", + "ingest_unmanaged_parquet", + "unmanaged_parquet_tabledef", +] + +_default_prefix = 'trino/{schema}/{table}' + +def _remove_trailing_slash(s): + s = str(s) + if len(s) == 0: return s + if (s[-1] != '/'): return s + return _remove_trailing_slash(s[:-1]) + +def _prefix(pfx, schema, table): + return _remove_trailing_slash(pfx).format(schema = schema, table = table) + +def attach_trino_engine(env_var_prefix = 'TRINO'): + sqlstring = 'trino://{user}@{host}:{port}/'.format( + user = os.environ[f'{env_var_prefix}_USER'], + host = os.environ[f'{env_var_prefix}_HOST'], + port = os.environ[f'{env_var_prefix}_PORT'] + ) + sqlargs = { + 'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']), + 'http_scheme': 'https' + } + engine = create_engine(sqlstring, connect_args = sqlargs) + connection = engine.connect() + return engine + +def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default_prefix, verbose=False): + sql = f'drop table if exists {catalog}.{schema}.{table}' + qres = engine.execute(sql) + dres = bucket.objects \ + .filter(Prefix = f'{_prefix(prefix, schema, table)}/') \ + .delete() + if verbose: + print(dres) + return qres + +def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=False): + dres = bucket.objects \ + .filter(Prefix = f'{_prefix(prefix, schema, table)}/') \ + .delete() + if verbose: print(dres) + return dres + +def ingest_unmanaged_parquet(df, schema, table, bucket, partition_columns=[], append=True, workdir='/tmp', prefix=_default_prefix, verbose=False): + if not isinstance(df, pd.DataFrame): + raise ValueError("df must be a pandas DataFrame") + if not isinstance(partition_columns, list): + raise ValueError("partition_columns must be list of column names") + + s3pfx = _prefix(prefix, schema, table) + + if not append: + dres = bucket.objects.filter(Prefix = f'{s3pfx}/').delete() + if verbose: print(dres) + + if len(partition_columns) > 0: + # tell pandas to write a directory tree, using partitions + tmp = f'{workdir}/{table}' + # pandas does not clean out destination directory for you: + shutil.rmtree(tmp, ignore_errors=True) + df.to_parquet(tmp, + partition_cols=partition_columns, + index=False) + # upload the tree onto S3 + upload_directory_to_s3(tmp, bucket, s3pfx, verbose=verbose) + else: + # do not use partitions: a single parquet file is created + parquet = f'{uuid.uuid4().hex}.parquet' + tmp = f'{workdir}/{parquet}' + df.to_parquet(tmp, index=False) + dst = f'{s3pfx}/{parquet}' + if verbose: print(f'{tmp} --> {dst}') + bucket.upload_file(tmp, dst) + +def unmanaged_parquet_tabledef(df, catalog, schema, table, bucket, partition_columns = [], verbose = False): + if not isinstance(df, pd.DataFrame): + raise ValueError("df must be a pandas DataFrame") + if not isinstance(partition_columns, list): + raise ValueError("partition_columns must be list of column names") + + columnschema = create_table_schema_pairs(df) + + tabledef = f"create table if not exists {catalog}.{schema}.{table} (\n" + tabledef += f"{columnschema}\n" + tabledef += ") with (\n format = 'parquet',\n" + if len(partition_columns) > 0: + tabledef += f" partitioned_by = array{partition_columns},\n" + tabledef += f" external_location = 's3a://{bucket.name}/trino/{schema}/{table}/'\n)" + + if verbose: print(tabledef) + return tabledef + diff --git a/setup.py b/setup.py index b3ee0d9..bab9f4f 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setup( name = "osc-ingest-tools", - version = "0.1.1", + version = "0.2.0", description = "python tools to assist with standardized data ingestion workflows for the OS-Climate project", long_description = README, long_description_content_type = "text/markdown", @@ -26,7 +26,7 @@ ], packages = find_packages(), include_package_data = True, - install_requires = ["pandas"], + install_requires = ["pandas", "trino", "boto3", "sqlalchemy", "sqlalchemy-trino", "python-dotenv"], entry_points = { "console_scripts": [] },