Skip to content

Commit

Permalink
Add DdlFactory and 3 more targets (#6)
Browse files Browse the repository at this point in the history
* First shot at mysql

* Three builds

* SQLite appears to be running

* Use tagged image name

* Add todos
  • Loading branch information
droher authored May 11, 2019
1 parent 903e7f6 commit da7aa6c
Show file tree
Hide file tree
Showing 20 changed files with 441 additions and 115 deletions.
14 changes: 8 additions & 6 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
- Change repo name to containerball

- Validation container to help find bugs/discrepancies in Retrosheet data
- Build targets out of cleansed CSV that converts T/F to 1/0 and blanks to NULL

- Validation container/script to help find bugs/discrepancies in Retrosheet data

- Potentially generate ddl in separate container

- Data bugs to notify Retrosheet about:
- Repeat roster row in BRF1914.ROS (Felix Chouinard)
Expand All @@ -9,21 +13,19 @@

- Logic bugs to notify Chadwick Bureau about:
- cwcomment issue handling multiline comment in 2007 ASG (about Soriano)
- repeat entries in cwgame table (find specifics before sending)
- repeat entries in cwdaily table (find specifics before sending)

- Targets to implement:
- MySQL
- SQLite
- Parquet (will require dtype mapper)
- Flat file dumps (find good host, e.g. Mega/OneDrive)
- Druid
- Drill
- Postgres cstore_fdw
- Clickhouse
- Presto
- Superset (backed by one of the columnar stores above)
- RStudio
- Anaconda/Jupyter
- Spark (1+ language/backend)
- Spark (1+ language, off Hive?)
- Tensorflow
- Keras
- DataFusion
Expand Down
9 changes: 0 additions & 9 deletions data_build/citus_fdw.py

This file was deleted.

3 changes: 3 additions & 0 deletions data_build/ddl_generators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pathlib import Path

OUTPUT_PATH = Path("/ddl")
54 changes: 4 additions & 50 deletions data_build/ddl_generators/ddl_maker.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,9 @@
from pathlib import Path
from sqlalchemy import MetaData
from sqlalchemy.schema import CreateTable, CreateSchema
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.dialects import postgresql

from ddl_generators import OUTPUT_PATH
from ddl_generators.factories import all_factories
from ddl_generators.schemas import all_metadata

CSV_PATH_PREFIX = Path("/data")
OUTPUT_PATH = Path("/ddl")

DdlString = str


def make_load_ddl(metadata: MetaData, dialect: Dialect) -> DdlString:
ddl = []
schema_ddl = str(CreateSchema(metadata.schema).compile(dialect=dialect))
ddl.append(schema_ddl)
for table_obj in metadata.tables.values():
table_ddl = str(CreateTable(table_obj).compile(dialect=dialect))
ddl.append(table_ddl)
return ";\n".join(d for d in ddl) + ";\n"


def make_postgres_copy_ddl(metadata: MetaData, csv_dir: Path) -> DdlString:
copy_ddl_template = """
ALTER TABLE {full_table_name} SET UNLOGGED;
COPY {full_table_name}({column_names}) FROM PROGRAM '{cmd}' CSV;
"""
cmd_template = "zstd --rm -cd {csv_path}"
ddl = []
for table_obj in metadata.tables.values():
table_name = table_obj.name
column_names = ", ".join((c.name for c in table_obj.columns.values()
if not(c.autoincrement is True)))
csv_path = csv_dir.joinpath(table_name).with_suffix(".csv.zst")
cmd = cmd_template.format(csv_path=csv_path)
copy_ddl = copy_ddl_template.format(full_table_name=table_obj.fullname, cmd=cmd,
column_names=column_names)
ddl.append(copy_ddl)
return "\n".join(ddl) + ";\n"


def build_postgres_ddl(*metadatas: MetaData) -> None:
for metadatum in metadatas:
csv_path = CSV_PATH_PREFIX.joinpath(metadatum.schema)
output_file = OUTPUT_PATH.joinpath("postgres.sql")
with open(output_file, "a+") as f:
f.write(make_load_ddl(metadatum, postgresql.dialect()))
f.write(make_postgres_copy_ddl(metadatum, csv_path))


if __name__ == "__main__":
OUTPUT_PATH.mkdir(exist_ok=True)
build_postgres_ddl(*all_metadata)
for factory in all_factories:
factory.build_ddl(*all_metadata)
9 changes: 9 additions & 0 deletions data_build/ddl_generators/factories/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import List
from ddl_generators.factories.target_ddl_factory import TargetDdlFactory
from ddl_generators.factories.postgres import PostgresDdlFactory
from ddl_generators.factories.mysql import MySqlDdlFactory
from ddl_generators.factories.postgres_cstore_fdw import PostgresCstoreFdwDdlFactory
from ddl_generators.factories.sqlite import SqliteDdlFactory

all_factories: List[TargetDdlFactory] = [PostgresDdlFactory(), MySqlDdlFactory(),
PostgresCstoreFdwDdlFactory(), SqliteDdlFactory()]
54 changes: 54 additions & 0 deletions data_build/ddl_generators/factories/mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import List
from sqlalchemy import MetaData, Float, Boolean, Column
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.interfaces import Dialect

from ddl_generators.factories.target_ddl_factory import DdlString, TargetDdlFactory


class MySqlDdlFactory(TargetDdlFactory):

@property
def target_name(self) -> str:
return "mysql"

@property
def dialect(self) -> Dialect:
return mysql.dialect()

def make_copy_ddl(self, metadata: MetaData) -> DdlString:
copy_ddl_template = """
LOAD DATA INFILE '{csv_file}' INTO TABLE {full_table_name}
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
({column_vars})
SET {exps};
"""
default_exp = "`{0}` = IF(@{0} = '', NULL, @{0})"
float_exp = "`{0}` = IF(@{0} IN ('', 'inf'), NULL, @{0})"
boolean_exp = "`{0}` = IF(@{0} = 'T', 1, IF(@{0} = 'F', 0, IF(@{0} = '', NULL, @{0})))"

csv_dir = self.data_path_prefix.joinpath(metadata.schema)
ddl = []
for table_obj in metadata.tables.values():
table_name = table_obj.name
# This bundle of joy below is to handle the fact that MySQL doesn't allow nulls
# to be represented as blanks in CSV files, or floats to have an infinite value,
# or booleans to be represented by T/F
cols: List[Column] = table_obj.columns.values()
float_col_list = [float_exp.format(c.name) for c in cols
if isinstance(c.type, Float)]
boolean_col_list = [boolean_exp.format(c.name) for c in cols
if isinstance(c.type, Boolean)]
default_col_list = [default_exp.format(c.name) for c in cols
if not isinstance(c.type, (Boolean, Float))]
column_vars = ", ".join("@{}".format(c.name) for c in cols)

exps = ",\n".join(float_col_list + boolean_col_list + default_col_list)

csv_file = csv_dir.joinpath(table_name).with_suffix(".csv")
copy_ddl = copy_ddl_template.format(csv_file=csv_file, column_vars=column_vars,
full_table_name=table_obj.fullname, exps=exps)
ddl.append(copy_ddl)
return "\n".join(ddl) + ";\n"
45 changes: 45 additions & 0 deletions data_build/ddl_generators/factories/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine.interfaces import Dialect

from ddl_generators.factories.target_ddl_factory import DdlString, TargetDdlFactory


class PostgresDdlFactory(TargetDdlFactory):

@property
def target_name(self) -> str:
return "postgres"

@property
def dialect(self) -> Dialect:
return postgresql.dialect()

def _get_csv_dir(self, table: Table) -> str:
name = table.fullname
if "." in name:
return name.split(".")[0]
else:
return name.split("_")[0]

def _get_csv_stem(self, table: Table) -> str:
name = table.fullname
if "." in name:
return "".join(name.split(".")[1:])
else:
return "_".join(name.split("_")[1:])

def make_copy_ddl(self, metadata: MetaData) -> DdlString:
copy_ddl_template = "COPY {full_table_name}({column_names}) FROM PROGRAM '{cmd}' CSV;"
cmd_template = "zstd --rm -cd {csv_file}"
ddl = []
for table_obj in metadata.tables.values():
csv_dir = self.data_path_prefix.joinpath(self._get_csv_dir(table_obj))
column_names = ", ".join((c.name for c in table_obj.columns.values()
if not (c.autoincrement is True)))
csv_file = csv_dir.joinpath(self._get_csv_stem(table_obj)).with_suffix(".csv.zst")
cmd = cmd_template.format(csv_file=csv_file)
copy_ddl = copy_ddl_template.format(full_table_name=table_obj.fullname, cmd=cmd,
column_names=column_names)
ddl.append(copy_ddl)
return "\n".join(ddl) + ";\n"
46 changes: 46 additions & 0 deletions data_build/ddl_generators/factories/postgres_cstore_fdw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from sqlalchemy.schema import MetaData
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy_fdw import ForeignTable
from sqlalchemy_fdw.dialect import PGDialectFdw

from ddl_generators.factories.postgres import PostgresDdlFactory
from ddl_generators.factories.target_ddl_factory import DdlString

CSTORE_SERVER = "cstore_server"


class PostgresCstoreFdwDdlFactory(PostgresDdlFactory):
"""
Almost exactly the same create/copy syntax as Postgres proper, except the create table syntax is different
and we need to create the server.
"""

@property
def target_name(self) -> str:
return "postgres_cstore_fdw"

@property
def dialect(self) -> Dialect:
return PGDialectFdw()

@staticmethod
def metadata_transform(metadata: MetaData) -> MetaData:
new_metadata = MetaData()
opts = {"pgfdw_server": CSTORE_SERVER, "pgfdw_options": {"compression": "pglz"}}
for table in metadata.tables.values():
# Need to namespace in the tablename because no schemas in fdw
table_name = "{}_{}".format(metadata.schema, table.name)
# Remove dummy cols as no need for PKs (and we can't autoincrement anyway)
cols = [c.copy() for c in table.columns.values() if c.autoincrement is not True]

ForeignTable(table_name, new_metadata, *cols, **opts)
return new_metadata

def make_create_ddl(self, metadata: MetaData) -> DdlString:
server_ddl = """
CREATE EXTENSION IF NOT EXISTS cstore_fdw;
CREATE SERVER IF NOT EXISTS {} FOREIGN DATA WRAPPER cstore_fdw;
""".format(CSTORE_SERVER)

existing_ddl = super().make_create_ddl(metadata)
return "{}\n{}".format(server_ddl, existing_ddl)
43 changes: 43 additions & 0 deletions data_build/ddl_generators/factories/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import sqlite
from sqlalchemy.engine.interfaces import Dialect

from ddl_generators.factories.target_ddl_factory import DdlString, TargetDdlFactory


class SqliteDdlFactory(TargetDdlFactory):
@property
def target_name(self) -> str:
return "sqlite"

@property
def dialect(self) -> Dialect:
return sqlite.dialect()

@staticmethod
def metadata_transform(metadata: MetaData) -> MetaData:
new_metadata = MetaData()
for table in metadata.tables.values():
# Need to namespace in the tablename because no schemas in sqlite
table_name = "{}_{}".format(metadata.schema, table.name)
# Remove dummy cols as no need for PKs (and we can't autoincrement anyway)
cols = [c.copy() for c in table.columns.values() if c.autoincrement is not True]

Table(table_name, new_metadata, *cols)
return new_metadata

def make_copy_ddl(self, metadata: MetaData) -> DdlString:
copy_ddl_template = ".import {csv_file} {full_table_name}"
ddl = [".mode csv"]
for table_obj in metadata.tables.values():
name: str = table_obj.fullname
csv_dir = self.data_path_prefix.joinpath(name[:name.index("_")])
csv_file = csv_dir.joinpath(name[name.index("_")+1:]).with_suffix(".csv")
copy_ddl = copy_ddl_template.format(full_table_name=name, csv_file=csv_file)
ddl.append(copy_ddl)
return "\n".join(ddl)

def make_create_ddl(self, metadata: MetaData) -> DdlString:
existing_ddl = super().make_create_ddl(metadata)
# Change booleans to CHAR(1) T/F
return existing_ddl.replace("BOOLEAN", "CHAR(1)").replace("(0, 1)", "('0', '1', 'T', 'F')")
59 changes: 59 additions & 0 deletions data_build/ddl_generators/factories/target_ddl_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from pathlib import Path
from typing import Optional

from sqlalchemy import MetaData
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.sql.ddl import CreateSchema, CreateTable

from ddl_generators import OUTPUT_PATH

DEFAULT_CSV_PATH_PREFIX = Path("/data")
DdlString = str


class TargetDdlFactory:

@property
def target_name(self) -> str:
raise NotImplementedError("Each target must implement its own target name (e.g. 'postgres'")

@property
def dialect(self) -> Optional[Dialect]:
raise NotImplementedError("Each target must implement its own copy function")

@property
def file_format(self) -> str:
return "sql"

@property
def data_path_prefix(self) -> Path:
return DEFAULT_CSV_PATH_PREFIX

def make_create_ddl(self, metadata: MetaData) -> DdlString:
ddl = []
if metadata.schema:
schema_ddl = str(CreateSchema(metadata.schema).compile(dialect=self.dialect))
ddl.append(schema_ddl)
for table_obj in metadata.tables.values():
table_ddl = str(CreateTable(table_obj).compile(dialect=self.dialect))
ddl.append(table_ddl)
return ";\n".join(d for d in ddl) + ";\n"

def make_copy_ddl(self, metadata: MetaData) -> DdlString:
raise NotImplementedError("Each target must implement its own copy function")

@staticmethod
def metadata_transform(metadata: MetaData) -> MetaData:
"""
Overridable function to transform the metadata into a suitable format, e.g.
for postgres_cstore_fdw, which requires table-level transformations
"""
return metadata

def build_ddl(self, *metadatas: MetaData) -> None:
for metadatum in metadatas:
transformed_metadata = self.metadata_transform(metadatum)
output_file = OUTPUT_PATH.joinpath("{}.{}".format(self.target_name, self.file_format))
with open(output_file, "a+") as f:
f.write(self.make_create_ddl(transformed_metadata))
f.write(self.make_copy_ddl(transformed_metadata))
Loading

0 comments on commit da7aa6c

Please sign in to comment.