diff --git a/README.md b/README.md index 112833d..5a21423 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ For a high level introduction to `etlhelper`, see the FOSS4GUK 2019 presentation + [Installation](#installation) + [Connect to databases](#connect-to-databases) + [Transfer data](#transfer-data) + + [Utilities](#utilities) + [Recipes](#recipes) + [Development](#development) + [References](#references) @@ -559,6 +560,39 @@ a list. Data transformation can then be performed via [memory-efficient iterator-chains](https://dbader.org/blog/python-iterator-chains). +## Utilities + +The following utility functions provide useful database metadata. + + +### Table info + + +The `table_info` function provides basic metadata for a table. An optional schema +can be used. Note that for `sqlite` the schema value is currently ignored. + +```python +from etlhelper.utils import table_info + +with ORACLEDB.connect("ORA_PASSWORD") as conn: + columns = table_info('my_table', conn, schema='my_schema') +``` + +The returned value is a list of named tuples of four values. Each tuple represents +one column in the table, giving its name, type, if it has a NOT NULL constraint +and if is has a DEFAULT value constraint. For example, + +```python +[ + Column(name='ID', type='NUMBER', not_null=1, has_default=0), + Column(name='VALUE', type='VARCHAR2', not_null=0, has_default=1), +] +``` + +the ID column is of type NUMBER and has a NOT NULL constraint but not a DEFAULT value, +while the VALUE column is of type VARCHAR2, can be NULL but does have a DEFAULT value. + + ## Recipes The following recipes demonstrate how `etlhelper` can be used. diff --git a/etlhelper/__init__.py b/etlhelper/__init__.py index b3a962b..8c45e81 100644 --- a/etlhelper/__init__.py +++ b/etlhelper/__init__.py @@ -26,6 +26,9 @@ get_connection_string, get_sqlalchemy_connection_string, ) +from etlhelper.utils import ( + table_info, +) from . import _version __version__ = _version.get_versions()['version'] diff --git a/etlhelper/db_helpers/db_helper.py b/etlhelper/db_helpers/db_helper.py index b041c99..8c9a2bd 100644 --- a/etlhelper/db_helpers/db_helper.py +++ b/etlhelper/db_helpers/db_helper.py @@ -16,8 +16,9 @@ class DbHelper(metaclass=ABCMeta): """ sql_exceptions = None connect_exceptions = None + table_info_query = None # The following are used to help create parameterized queries. Although - # paramstyle is require by DBAPI2, most drivers support both a named and + # paramstyle is required by DBAPI2, most drivers support both a named and # positional style. paramstyle = None named_paramstyle = None diff --git a/etlhelper/db_helpers/mssql.py b/etlhelper/db_helpers/mssql.py index b9cc5d7..db2a083 100644 --- a/etlhelper/db_helpers/mssql.py +++ b/etlhelper/db_helpers/mssql.py @@ -2,6 +2,7 @@ Database helper for mssql """ import warnings +from textwrap import dedent from etlhelper.db_helpers.db_helper import DbHelper @@ -9,6 +10,17 @@ class MSSQLDbHelper(DbHelper): """ MS Sql server helper class """ + table_info_query = dedent(""" + SELECT + column_name as name, + data_type as type, + (case when is_nullable = 'NO' then 1 else 0 end) as not_null, + (case when column_default is not null then 1 else 0 end) as has_default + FROM INFORMATION_SCHEMA.COLUMNS + WHERE LOWER(table_name) = LOWER(?) + AND LOWER(table_schema) LIKE COALESCE(LOWER(?), '%%') + """).strip() + def __init__(self): super().__init__() self.required_params = {'host', 'port', 'dbname', 'user', 'odbc_driver'} diff --git a/etlhelper/db_helpers/oracle.py b/etlhelper/db_helpers/oracle.py index e563435..6445675 100644 --- a/etlhelper/db_helpers/oracle.py +++ b/etlhelper/db_helpers/oracle.py @@ -1,6 +1,7 @@ """ Database helper for Oracle """ +from textwrap import dedent import warnings from etlhelper.db_helpers.db_helper import DbHelper @@ -9,6 +10,17 @@ class OracleDbHelper(DbHelper): """ Oracle DB helper class """ + table_info_query = dedent(""" + SELECT + column_name as name, + data_type as type, + (case when nullable = 'N' then 1 else 0 end) as not_null, + (case when data_default is not null then 1 else 0 end) as has_default + FROM all_tab_columns + WHERE LOWER(table_name) = LOWER(:1) + AND REGEXP_LIKE(LOWER(owner), '^' || COALESCE(LOWER(:2), '.*') || '$') + """).strip() + def __init__(self): super().__init__() self.required_params = {'host', 'port', 'dbname', 'user'} diff --git a/etlhelper/db_helpers/postgres.py b/etlhelper/db_helpers/postgres.py index 43419c6..ea94875 100644 --- a/etlhelper/db_helpers/postgres.py +++ b/etlhelper/db_helpers/postgres.py @@ -1,6 +1,7 @@ """ Database helper for PostgreSQL """ +from textwrap import dedent import warnings from etlhelper.db_helpers.db_helper import DbHelper @@ -9,6 +10,27 @@ class PostgresDbHelper(DbHelper): """ Postgres db helper class """ + table_info_query = dedent(""" + SELECT + pg_attribute.attname AS name, + pg_catalog.format_type(pg_attribute.atttypid, pg_attribute.atttypmod) AS type, + (case when pg_attribute.attnotnull then 1 else 0 end) as not_null, + (case when pg_attribute.atthasdef then 1 else 0 end) as has_default + FROM + pg_catalog.pg_attribute + INNER JOIN + pg_catalog.pg_class ON pg_class.oid = pg_attribute.attrelid + INNER JOIN + pg_catalog.pg_namespace ON pg_namespace.oid = pg_class.relnamespace + WHERE + pg_attribute.attnum > 0 + AND NOT pg_attribute.attisdropped + AND pg_class.relname = %s + AND pg_namespace.nspname ~ COALESCE(%s, '.*') + ORDER BY + attnum ASC; + """).strip() + def __init__(self): super().__init__() self.required_params = {'host', 'port', 'dbname', 'user'} diff --git a/etlhelper/db_helpers/sqlite.py b/etlhelper/db_helpers/sqlite.py index df4d90e..b2240f6 100644 --- a/etlhelper/db_helpers/sqlite.py +++ b/etlhelper/db_helpers/sqlite.py @@ -2,6 +2,7 @@ Database helper for SQLite """ from contextlib import contextmanager +from textwrap import dedent import warnings from etlhelper.db_helpers.db_helper import DbHelper @@ -10,6 +11,21 @@ class SQLiteDbHelper(DbHelper): """ SQLite DB helper class """ + # schema_name is not used for SQLite but is required as parameter to be + # consistent with other databases. The WHERE clause is always true, + # whether schema_name is NULL or not. + table_info_query = dedent(""" + SELECT + name, + type, + "notnull" as not_null, + (case when dflt_value is not null then 1 else 0 end) as has_default + FROM pragma_table_info(:table_name) + -- this effectively ignores the unused schema_name + -- parameter since schemas are not used in sqlite + WHERE COALESCE(TRUE, :schema_name) + ;""").strip() + def __init__(self): super().__init__() self.required_params = {'filename'} diff --git a/etlhelper/utils.py b/etlhelper/utils.py new file mode 100644 index 0000000..12aab37 --- /dev/null +++ b/etlhelper/utils.py @@ -0,0 +1,39 @@ +""" +Utility functions to help with tasks such as programatically generating SQL queries. +""" +from collections import namedtuple + +from etlhelper import fetchall +from etlhelper.exceptions import ETLHelperQueryError +from etlhelper.db_helper_factory import DB_HELPER_FACTORY + +Column = namedtuple('Column', ['name', 'type', 'not_null', 'has_default']) + + +def table_info(table, conn, schema=None): + """ + Return basic metadata for each of the columns of 'table' on 'conn'. + + :param table: str, the table to describe + :param conn: dbapi connection + :param schema: str, optional name of schema for table + :returns columns: list, tuples of (name, type, not_null, has_default) + """ + helper = DB_HELPER_FACTORY.from_conn(conn) + + params = (table, schema) + result = fetchall(helper.table_info_query, conn, parameters=params) + columns = [Column(*row) for row in result] + + if not columns: + schema_table = f"{schema}.{table}" if schema else table + msg = f"Table name '{schema_table}' not found." + raise ETLHelperQueryError(msg) + + # If same table exists in another schema, duplicate columns may be returned + if len(columns) > len(set(col.name for col in columns)): + msg = (f"Table name {table} is not unique in database. " + "Please specify the schema.") + raise ETLHelperQueryError(msg) + + return columns diff --git a/test/conftest.py b/test/conftest.py index fc34d46..42c1d15 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -47,8 +47,8 @@ def pgtestdb_test_tables(test_table_data, pgtestdb_conn, pgtestdb_insert_sql): CREATE TABLE src ( id integer primary key, - value double precision, - simple_text text, + value double precision not null, + simple_text text default 'default', utf8_text text, day date, date_time timestamp without time zone diff --git a/test/integration/db/test_mssql.py b/test/integration/db/test_mssql.py index b055755..d6800ce 100644 --- a/test/integration/db/test_mssql.py +++ b/test/integration/db/test_mssql.py @@ -19,6 +19,7 @@ generate_insert_sql, load, ) +from etlhelper.utils import table_info, Column from etlhelper.exceptions import ( ETLHelperConnectionError, ETLHelperInsertError, @@ -144,9 +145,10 @@ def test_copy_table_rows_happy_path_fast_true( def test_copy_table_rows_on_error(test_tables, testdb_conn, test_table_data): # Arrange duplicate_id_row_sql = """ - INSERT INTO dest (id) + INSERT INTO dest (id, value) VALUES ( - 1 + 1, + 1.234 )""".strip() execute(duplicate_id_row_sql, testdb_conn) @@ -240,6 +242,54 @@ def test_generate_insert_sql_dictionary(testdb_conn): generate_insert_sql('my_table', data, testdb_conn) +def test_table_info_no_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='id', type='int', not_null=0, has_default=0), + Column(name='value', type='float', not_null=1, has_default=0), + Column(name='simple_text', type='nvarchar', not_null=0, has_default=1), + Column(name='utf8_text', type='nvarchar', not_null=0, has_default=0), + Column(name='day', type='date', not_null=0, has_default=0), + Column(name='date_time', type='datetime2', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn) + + # Assert + assert columns == expected + + +def test_table_info_with_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='id', type='int', not_null=0, has_default=0), + Column(name='value', type='float', not_null=1, has_default=0), + Column(name='simple_text', type='nvarchar', not_null=0, has_default=1), + Column(name='utf8_text', type='nvarchar', not_null=0, has_default=0), + Column(name='day', type='date', not_null=0, has_default=0), + Column(name='date_time', type='datetime2', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn, schema='etlhelper') + + # Assert + assert columns == expected + + +def test_table_info_bad_table_name_no_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'bad_table' not found."): + table_info('bad_table', testdb_conn) + + +def test_table_info_bad_table_name_with_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'etlhelper.bad_table' not found."): + table_info('bad_table', testdb_conn, schema='etlhelper') + + # -- Fixtures here -- INSERT_SQL = dedent(""" @@ -297,8 +347,8 @@ def test_tables(test_table_data, testdb_conn): CREATE TABLE src ( id integer unique, - value double precision, - simple_text nvarchar(max), + value double precision not null, + simple_text nvarchar(max) default 'default', utf8_text nvarchar(max), day date, date_time datetime2(6) diff --git a/test/integration/db/test_oracle.py b/test/integration/db/test_oracle.py index 13f5fc0..82eb33c 100644 --- a/test/integration/db/test_oracle.py +++ b/test/integration/db/test_oracle.py @@ -19,6 +19,7 @@ generate_insert_sql, load, ) +from etlhelper.utils import table_info, Column from etlhelper.exceptions import ( ETLHelperConnectionError, ETLHelperInsertError, @@ -108,11 +109,10 @@ def test_copy_table_rows_happy_path(test_tables, testdb_conn, test_table_data): def test_copy_table_rows_on_error(test_tables, testdb_conn, test_table_data): # Arrange duplicate_id_row_sql = """ - INSERT INTO dest (id, day, date_time) + INSERT INTO dest (id, value) VALUES ( 1, - TO_DATE('2003/05/03 21:02:44', 'yyyy/mm/dd hh24:mi:ss'), - TO_DATE('2003/05/03 21:02:44', 'yyyy/mm/dd hh24:mi:ss') + 1.234 )""".strip() execute(duplicate_id_row_sql, testdb_conn) @@ -125,23 +125,23 @@ def test_copy_table_rows_on_error(test_tables, testdb_conn, test_table_data): sql = "SELECT * FROM dest" result = get_rows(sql, testdb_conn) + # Check that first row was caught as error, noting that Oracle + # changes the case of column names + row, exception = errors[0] + assert row.ID == 1 + assert "unique" in str(exception).lower() + + # Check that other rows were inserted correctly # Fix result date and datetime strings to native classes fixed_dates = [] - for row in result: + for row in result[1:]: fixed_dates.append(( *row[:4], row.DAY.date(), row.DATE_TIME )) - # Check that first row was caught as error, noting that Oracle - # changes the case of column names - row, exception = errors[0] - assert row.ID == 1 - assert "unique" in str(exception).lower() - - # Check that other rows were inserted correctly - assert fixed_dates[1:] == test_table_data[1:] + assert fixed_dates == test_table_data[1:] def test_get_rows_with_parameters(test_tables, testdb_conn, @@ -247,6 +247,54 @@ def test_generate_insert_sql_dictionary(testdb_conn): assert sql == expected +def test_table_info_no_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='ID', type='NUMBER', not_null=0, has_default=0), + Column(name='VALUE', type='NUMBER', not_null=1, has_default=0), + Column(name='SIMPLE_TEXT', type='VARCHAR2', not_null=0, has_default=1), + Column(name='UTF8_TEXT', type='VARCHAR2', not_null=0, has_default=0), + Column(name='DAY', type='DATE', not_null=0, has_default=0), + Column(name='DATE_TIME', type='DATE', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn) + + # Assert + assert columns == expected + + +def test_table_info_with_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='ID', type='NUMBER', not_null=0, has_default=0), + Column(name='VALUE', type='NUMBER', not_null=1, has_default=0), + Column(name='SIMPLE_TEXT', type='VARCHAR2', not_null=0, has_default=1), + Column(name='UTF8_TEXT', type='VARCHAR2', not_null=0, has_default=0), + Column(name='DAY', type='DATE', not_null=0, has_default=0), + Column(name='DATE_TIME', type='DATE', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn, schema='etlhelper') + + # Assert + assert columns == expected + + +def test_table_info_bad_table_name_no_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'bad_table' not found."): + table_info('bad_table', testdb_conn) + + +def test_table_info_bad_table_name_with_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'etlhelper.bad_table' not found."): + table_info('bad_table', testdb_conn, schema='etlhelper') + + # -- Fixtures here -- INSERT_SQL = dedent(""" @@ -284,8 +332,8 @@ def test_tables(test_table_data, testdb_conn): CREATE TABLE src ( id NUMBER UNIQUE, - value NUMBER, - simple_text VARCHAR2(100), + value NUMBER not null, + simple_text VARCHAR2(100) default 'default', utf8_text VARCHAR2(100), day DATE, date_time DATE diff --git a/test/integration/db/test_sqlite.py b/test/integration/db/test_sqlite.py index 9a4eb5c..f8c0648 100644 --- a/test/integration/db/test_sqlite.py +++ b/test/integration/db/test_sqlite.py @@ -20,6 +20,7 @@ generate_insert_sql, load, ) +from etlhelper.utils import table_info, Column from etlhelper.exceptions import ( ETLHelperConnectionError, ETLHelperInsertError, @@ -93,9 +94,10 @@ def test_copy_table_rows_happy_path(test_tables, testdb_conn, test_table_data): def test_copy_table_rows_on_error(test_tables, testdb_conn, test_table_data): # Arrange duplicate_id_row_sql = """ - INSERT INTO dest (id) + INSERT INTO dest (id, value) VALUES ( - 1 + 1, + 1.234 )""".strip() execute(duplicate_id_row_sql, testdb_conn) @@ -195,6 +197,54 @@ def test_generate_insert_sql_dictionary(testdb_conn): assert sql == expected +def test_table_info_no_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='id', type='integer', not_null=0, has_default=0), + Column(name='value', type='float', not_null=1, has_default=0), + Column(name='simple_text', type='text', not_null=0, has_default=1), + Column(name='utf8_text', type='text', not_null=0, has_default=0), + Column(name='day', type='date', not_null=0, has_default=0), + Column(name='date_time', type='timestamp', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn) + + # Assert + assert columns == expected + + +def test_table_info_with_schema_no_duplicates(testdb_conn, test_tables): + # Arrange + expected = [ + Column(name='id', type='integer', not_null=0, has_default=0), + Column(name='value', type='float', not_null=1, has_default=0), + Column(name='simple_text', type='text', not_null=0, has_default=1), + Column(name='utf8_text', type='text', not_null=0, has_default=0), + Column(name='day', type='date', not_null=0, has_default=0), + Column(name='date_time', type='timestamp', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', testdb_conn, schema='etlhelper') + + # Assert + assert columns == expected + + +def test_table_info_bad_table_name_no_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'bad_table' not found."): + table_info('bad_table', testdb_conn) + + +def test_table_info_bad_table_name_with_schema(testdb_conn, test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'etlhelper.bad_table' not found."): + table_info('bad_table', testdb_conn, schema='etlhelper') + + # -- Fixtures here -- INSERT_SQL = dedent(""" @@ -238,9 +288,9 @@ def test_tables(test_table_data, testdb_conn): create_src_sql = dedent(""" CREATE TABLE src ( - id INTEGER PRIMARY KEY, - value float, - simple_text text, + id integer primary key, + value float not null, + simple_text text default 'default', utf8_text text, day date, date_time timestamp diff --git a/test/integration/etl/test_etl_transform.py b/test/integration/etl/test_etl_transform.py index 1f5fe3d..51abad3 100644 --- a/test/integration/etl/test_etl_transform.py +++ b/test/integration/etl/test_etl_transform.py @@ -49,10 +49,8 @@ def test_copy_table_rows_on_error(pgtestdb_test_tables, pgtestdb_conn, test_table_data): # Arrange duplicate_id_row_sql = """ - INSERT INTO dest (id) - VALUES ( - 1 - )""".strip() + INSERT INTO dest (id, value) + VALUES (1, 1.234)""".strip() execute(duplicate_id_row_sql, pgtestdb_conn) # Act @@ -78,12 +76,12 @@ def test_copy_table_rows_on_error(pgtestdb_test_tables, pgtestdb_conn, def transform_return_list(rows): # Simple transform function that changes size and number of rows - return [(row.id,) for row in rows if row.id > 1] + return [(row.id, row.value) for row in rows if row.id > 1] def transform_return_generator(rows): # Simple transform function that changes size and number of rows - return ((row.id,) for row in rows if row.id > 1) + return ((row.id, row.value) for row in rows if row.id > 1) @pytest.mark.parametrize('my_transform', @@ -91,10 +89,10 @@ def transform_return_generator(rows): def test_copy_rows_transform(pgtestdb_conn, pgtestdb_test_tables, my_transform): # Arrange select_sql = "SELECT * FROM src" - insert_sql = "INSERT INTO dest (id) VALUES (%s)" + insert_sql = "INSERT INTO dest (id, value) VALUES (%s, %s)" - expected = [(2, None, None, None, None, None), - (3, None, None, None, None, None)] + expected = [(2, 2.234, 'default', None, None, None), + (3, 2.234, 'default', None, None, None)] # Act copy_rows(select_sql, pgtestdb_conn, insert_sql, pgtestdb_conn, diff --git a/test/integration/test_utils.py b/test/integration/test_utils.py new file mode 100644 index 0000000..aea7b22 --- /dev/null +++ b/test/integration/test_utils.py @@ -0,0 +1,99 @@ +"""Tests for utils functions.""" +import pytest + +from etlhelper.exceptions import ETLHelperQueryError +from etlhelper.utils import Column, table_info + +# pylint: disable=unused-argument, missing-docstring + + +def test_table_info_no_schema_no_duplicates(pgtestdb_conn, pgtestdb_test_tables): + # Arrange + expected = [ + Column('id', 'integer', not_null=1, has_default=0), + Column('value', 'double precision', not_null=1, has_default=0), + Column('simple_text', 'text', not_null=0, has_default=1), + Column('utf8_text', 'text', not_null=0, has_default=0), + Column('day', 'date', not_null=0, has_default=0), + Column('date_time', 'timestamp without time zone', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', pgtestdb_conn) + + # Assert + assert columns == expected + + +def test_table_info_with_schema_no_duplicates(pgtestdb_conn, pgtestdb_test_tables): + # Arrange + expected = [ + Column('id', 'integer', not_null=1, has_default=0), + Column('value', 'double precision', not_null=1, has_default=0), + Column('simple_text', 'text', not_null=0, has_default=1), + Column('utf8_text', 'text', not_null=0, has_default=0), + Column('day', 'date', not_null=0, has_default=0), + Column('date_time', 'timestamp without time zone', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', pgtestdb_conn, schema='public') + + # Assert + assert columns == expected + + +def test_table_info_no_schema_with_duplicates(pgtestdb_conn, duplicate_schema): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r'Table name src is not unique'): + table_info('src', pgtestdb_conn) + + +def test_table_info_with_schema_with_duplicates(pgtestdb_conn, duplicate_schema): + # Arrange + expected = [ + Column('id', 'integer', not_null=1, has_default=0), + Column('value', 'double precision', not_null=1, has_default=0), + Column('simple_text', 'text', not_null=0, has_default=1), + Column('utf8_text', 'text', not_null=0, has_default=0), + Column('day', 'date', not_null=0, has_default=0), + Column('date_time', 'timestamp without time zone', not_null=0, has_default=0) + ] + + # Act + columns = table_info('src', pgtestdb_conn, schema='public') + + # Assert + assert columns == expected + + +def test_table_info_bad_table_name_no_schema(pgtestdb_conn, pgtestdb_test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'bad_table' not found."): + table_info('bad_table', pgtestdb_conn) + + +def test_table_info_bad_table_name_with_schema(pgtestdb_conn, pgtestdb_test_tables): + # Arrange, act and assert + with pytest.raises(ETLHelperQueryError, match=r"Table name 'public.bad_table' not found."): + table_info('bad_table', pgtestdb_conn, schema='public') + + +# Fixtures here + +@pytest.fixture(scope='function') +def duplicate_schema(pgtestdb_conn, pgtestdb_test_tables): + # Set up + with pgtestdb_conn.cursor() as cursor: + # Create a duplicate of the test tables in a new schema + cursor.execute("CREATE SCHEMA IF NOT EXISTS duplicate", pgtestdb_conn) + cursor.execute("SELECT * INTO duplicate.src FROM src", pgtestdb_conn) + pgtestdb_conn.commit() + + # Return control to run test + yield + + # Tear down + with pgtestdb_conn.cursor() as cursor: + cursor.execute("DROP SCHEMA IF EXISTS duplicate CASCADE", pgtestdb_conn) + pgtestdb_conn.commit()