Skip to content

Commit

Permalink
expose typemap, add colmap (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikerlandson authored Dec 10, 2021
1 parent 0472f18 commit df7a3f3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion osc_ingest_trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

# defines the release version for this python package
__version__ = "0.2.1snap1"
__version__ = "0.2.1"

from .sqlcols import *
from .sqltypes import *
Expand Down
14 changes: 7 additions & 7 deletions osc_ingest_trino/sqltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
'Int64': 'bigint',
'bool': 'boolean',
'category': 'varchar',
'datetime64[ns, UTC]': 'timestamp(6)',
'datetime64[ns, UTC]': 'timestamp',
}

def pandas_type_to_sql(pt, typemap={}):
Expand All @@ -29,11 +29,11 @@ def pandas_type_to_sql(pt, typemap={}):
return st
raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))

def create_table_schema_pairs(df, typemap = {}, indent = 4):
def create_table_schema_pairs(df, typemap = {}, colmap = {}, indent = 4):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
ptypes = [str(e) for e in df.dtypes.to_list()]
stypes = [pandas_type_to_sql(e, typemap=typemap) for e in ptypes]
pz = list(zip(df.columns.to_list(), stypes))
return ",\n".join([f"{' '*indent}{e[0]} {e[1]}" for e in pz])

if not isinstance(colmap, dict):
raise ValueError("colmap must be a dict")
columns = df.columns.to_list()
types = [colmap.get(col, pandas_type_to_sql(str(df[col].dtype), typemap=typemap)) for col in columns]
return ",\n".join([f"{' '*indent}{e[0]} {e[1]}" for e in zip(columns, types)])
21 changes: 17 additions & 4 deletions osc_ingest_trino/trino_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,26 @@ def _remove_trailing_slash(s):
def _prefix(pfx, schema, table):
return _remove_trailing_slash(pfx).format(schema = schema, table = table)

def attach_trino_engine(env_var_prefix = 'TRINO'):
sqlstring = 'trino://{user}@{host}:{port}/'.format(
def attach_trino_engine(env_var_prefix = 'TRINO', catalog = None, schema = None, verbose = False):
sqlstring = 'trino://{user}@{host}:{port}'.format(
user = os.environ[f'{env_var_prefix}_USER'],
host = os.environ[f'{env_var_prefix}_HOST'],
port = os.environ[f'{env_var_prefix}_PORT']
)
if catalog is not None:
sqlstring += f'/{catalog}'
if schema is not None:
if catalog is None:
raise ValueError(f'connection schema specified without a catalog')
sqlstring += f'/{schema}'

sqlargs = {
'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
'http_scheme': 'https'
}

if verbose: print(f'using connect string: {sqlstring}')

engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()
return engine
Expand Down Expand Up @@ -90,13 +100,16 @@ def ingest_unmanaged_parquet(df, schema, table, bucket, partition_columns=[], ap
if verbose: print(f'{tmp} --> {dst}')
bucket.upload_file(tmp, dst)

def unmanaged_parquet_tabledef(df, catalog, schema, table, bucket, partition_columns = [], verbose = False):
def unmanaged_parquet_tabledef(df, catalog, schema, table, bucket,
partition_columns = [],
typemap = {}, colmap = {},
verbose = False):
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
raise ValueError("partition_columns must be list of column names")

columnschema = create_table_schema_pairs(df)
columnschema = create_table_schema_pairs(df, typemap=typemap, colmap=colmap)

tabledef = f"create table if not exists {catalog}.{schema}.{table} (\n"
tabledef += f"{columnschema}\n"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

setup(
name = "osc-ingest-tools",
version = "0.2.1snap1",
version = "0.2.1",
description = "python tools to assist with standardized data ingestion workflows for the OS-Climate project",
long_description = README,
long_description_content_type = "text/markdown",
Expand Down

0 comments on commit df7a3f3

Please sign in to comment.