From 3c7b2ab1a91f3e21ae4fd6af4ffea820e308f739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Fri, 15 Mar 2024 12:43:33 +0100 Subject: [PATCH 1/8] add load_method overwrite behaviour --- target_postgres/connector.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index c9d250f6..4105884f 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -31,6 +31,7 @@ VARCHAR, TypeDecorator, ) +from singer_sdk.helpers.capabilities import TargetLoadMethods from sshtunnel import SSHTunnelForwarder @@ -117,6 +118,10 @@ def prepare_table( # type: ignore[override] _, schema_name, table_name = self.parse_full_table_name(full_table_name) meta = sa.MetaData(schema=schema_name) table: sa.Table + + if self.config["load_method"] == TargetLoadMethods.OVERWRITE: + self.get_table(full_table_name=full_table_name).drop(self._engine) + if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( table_name=table_name, From 44854389bbe5da042e9907c37234bb374652f110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Fri, 15 Mar 2024 22:23:06 +0100 Subject: [PATCH 2/8] refactor: get_table_columns same signature as sdk --- target_postgres/connector.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 4105884f..3d122805 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -42,6 +42,7 @@ class PostgresConnector(SQLConnector): allow_column_rename: bool = True # Whether RENAME COLUMN is supported. allow_column_alter: bool = False # Whether altering column types is supported. allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported. + allow_overwrite: bool = True # Whether overwrite load method is supported. allow_temp_tables: bool = True # Whether temp tables are supported. def __init__(self, config: dict) -> None: @@ -120,7 +121,8 @@ def prepare_table( # type: ignore[override] table: sa.Table if self.config["load_method"] == TargetLoadMethods.OVERWRITE: - self.get_table(full_table_name=full_table_name).drop(self._engine) + if self.table_exists(full_table_name=full_table_name): + self.get_table(full_table_name=full_table_name).drop(self._engine) if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( @@ -779,10 +781,9 @@ def _get_column_type( # type: ignore[override] def get_table_columns( # type: ignore[override] self, - schema_name: str, - table_name: str, - connection: sa.engine.Connection, + full_table_name: str, column_names: list[str] | None = None, + connection: sa.engine.Connection | None = None, ) -> dict[str, sa.Column]: """Return a list of table columns. @@ -797,6 +798,11 @@ def get_table_columns( # type: ignore[override] Returns: An ordered list of column objects. """ + + if not connection: + return super().get_table_columns(full_table_name, column_names) + + _, schema_name, table_name = self.parse_full_table_name(full_table_name) inspector = sa.inspect(connection) columns = inspector.get_columns(table_name, schema_name) From 6ddc536b4125ad8e4e9ef5be8a7bdd0e62b4fbde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Fri, 15 Mar 2024 23:05:43 +0100 Subject: [PATCH 3/8] fix: all calls to function with the new syntax --- target_postgres/connector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 3d122805..8866c20d 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -141,8 +141,7 @@ def prepare_table( # type: ignore[override] ] # So we don't mess up the casing of the Table reference columns = self.get_table_columns( - schema_name=cast(str, schema_name), - table_name=table_name, + full_table_name=full_table_name, connection=connection, ) @@ -837,7 +836,8 @@ def column_exists( # type: ignore[override] assert schema_name is not None assert table_name is not None return column_name in self.get_table_columns( - schema_name=schema_name, table_name=table_name, connection=connection + full_table_name=full_table_name, + connection=connection ) From 4f81681c6706a7e5aa52aef06f31eb17ba70a282 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Sat, 16 Mar 2024 22:02:56 +0100 Subject: [PATCH 4/8] this version works --- target_postgres/connector.py | 131 +++++++++++++++++++++++------------ target_postgres/sinks.py | 17 +++-- 2 files changed, 97 insertions(+), 51 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 8866c20d..83462826 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -94,6 +94,33 @@ def interpret_content_encoding(self) -> bool: """ return self.config.get("interpret_content_encoding", False) + # def get_table(self, full_table_name: str, connection: sa.engine.Connection) -> sa.Table: + # """Return the table object. + + # Args: + # full_table_name: the fully qualified table name. + + # Returns: + # The table object. + # """ + # _, schema_name, table_name = self.parse_full_table_name(full_table_name) + # meta = sa.MetaData(schema=schema_name) + # meta.reflect(connection, only=[table_name]) + + def get_table_v2(self, full_table_name: str, connection: sa.engine.Connection) -> sa.Table: + """Return the table object. + + Args: + full_table_name: the fully qualified table name. + + Returns: + The table object. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sa.MetaData(schema=schema_name) + meta.reflect(connection, only=[table_name]) + return meta.tables[full_table_name] + def prepare_table( # type: ignore[override] self, full_table_name: str, @@ -119,13 +146,24 @@ def prepare_table( # type: ignore[override] _, schema_name, table_name = self.parse_full_table_name(full_table_name) meta = sa.MetaData(schema=schema_name) table: sa.Table - - if self.config["load_method"] == TargetLoadMethods.OVERWRITE: - if self.table_exists(full_table_name=full_table_name): - self.get_table(full_table_name=full_table_name).drop(self._engine) if not self.table_exists(full_table_name=full_table_name): - table = self.create_empty_table( + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + if self.config["load_method"] == TargetLoadMethods.OVERWRITE: + self.logger.info("I ENTERED MORE THAN ONCE") + self.get_table(full_table_name=full_table_name).drop(self._engine) + # meta.reflect(connection, only=[table_name]) + # meta.tables[full_table_name].drop(self._enigne) + # self.get_table(full_table_name=full_table_name).drop(self._engine) + self.create_empty_table( table_name=table_name, meta=meta, schema=schema, @@ -134,7 +172,8 @@ def prepare_table( # type: ignore[override] as_temp_table=as_temp_table, connection=connection, ) - return table + return + meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name @@ -778,44 +817,48 @@ def _get_column_type( # type: ignore[override] return t.cast(sa.types.TypeEngine, column.type) - def get_table_columns( # type: ignore[override] - self, - full_table_name: str, - column_names: list[str] | None = None, - connection: sa.engine.Connection | None = None, - ) -> dict[str, sa.Column]: - """Return a list of table columns. - - Overrode to support schema_name - - Args: - schema_name: schema name. - table_name: table name to get columns for. - connection: database connection. - column_names: A list of column names to filter to. - - Returns: - An ordered list of column objects. - """ - - if not connection: - return super().get_table_columns(full_table_name, column_names) - - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - inspector = sa.inspect(connection) - columns = inspector.get_columns(table_name, schema_name) - - return { - col_meta["name"]: sa.Column( - col_meta["name"], - col_meta["type"], - nullable=col_meta.get("nullable", False), - ) - for col_meta in columns - if not column_names - or col_meta["name"].casefold() in {col.casefold() for col in column_names} - } - + # def get_table_columns( # type: ignore[override] + # self, + # full_table_name: str, + # column_names: list[str] | None = None, + # connection: sa.engine.Connection | None = None, + # ) -> dict[str, sa.Column]: + # """Return a list of table columns. + + # Overrode to support schema_name + + # Args: + # schema_name: schema name. + # table_name: table name to get columns for. + # connection: database connection. + # column_names: A list of column names to filter to. + + # Returns: + # An ordered list of column objects. + # """ + # self.logger.info("THIS IS ANOTHER TEEEST") + + # if not connection: + # return super().get_table_columns(full_table_name, column_names) + + # if full_table_name not in self._table_cols_cache: + # _, schema_name, table_name = self.parse_full_table_name(full_table_name) + # inspector = sa.inspect(connection) + # columns = inspector.get_columns(table_name, schema_name) + + # self._table_cols_cache[full_table_name] = { + # col_meta["name"]: sa.Column( + # col_meta["name"], + # col_meta["type"], + # nullable=col_meta.get("nullable", False), + # ) + # for col_meta in columns + # if not column_names + # or col_meta["name"].casefold() + # in {col.casefold() for col in column_names} + # } + + # return self._table_cols_cache[full_table_name] def column_exists( # type: ignore[override] self, full_table_name: str, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index ea8b8df3..85ffa243 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -74,13 +74,16 @@ def process_batch(self, context: dict) -> None: # Use one connection so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): # Check structure of table - table: sa.Table = self.connector.prepare_table( - full_table_name=self.full_table_name, - schema=self.schema, - primary_keys=self.key_properties, - as_temp_table=False, - connection=connection, - ) + # table: sa.Table = self.connector.prepare_table( + # full_table_name=self.full_table_name, + # schema=self.schema, + # primary_keys=self.key_properties, + # as_temp_table=False, + # connection=connection, + # ) + + table = self.connector.get_table_v2(self.full_table_name, connection) + # Create a temp table (Creates from the table above) temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, From c50ad538419613c0901c31cff99c0aadd2373586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Sat, 16 Mar 2024 22:15:04 +0100 Subject: [PATCH 5/8] cleanup --- target_postgres/connector.py | 24 ++++++------------------ target_postgres/sinks.py | 14 ++++---------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 83462826..1408a1cf 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -94,21 +94,12 @@ def interpret_content_encoding(self) -> bool: """ return self.config.get("interpret_content_encoding", False) - # def get_table(self, full_table_name: str, connection: sa.engine.Connection) -> sa.Table: - # """Return the table object. - - # Args: - # full_table_name: the fully qualified table name. - - # Returns: - # The table object. - # """ - # _, schema_name, table_name = self.parse_full_table_name(full_table_name) - # meta = sa.MetaData(schema=schema_name) - # meta.reflect(connection, only=[table_name]) - - def get_table_v2(self, full_table_name: str, connection: sa.engine.Connection) -> sa.Table: - """Return the table object. + def get_table_from_metadata( + self, + full_table_name: str, + connection: sa.engine.Connection + ) -> sa.Table: + """Returns an existing table object from the database Args: full_table_name: the fully qualified table name. @@ -160,9 +151,6 @@ def prepare_table( # type: ignore[override] if self.config["load_method"] == TargetLoadMethods.OVERWRITE: self.logger.info("I ENTERED MORE THAN ONCE") self.get_table(full_table_name=full_table_name).drop(self._engine) - # meta.reflect(connection, only=[table_name]) - # meta.tables[full_table_name].drop(self._enigne) - # self.get_table(full_table_name=full_table_name).drop(self._engine) self.create_empty_table( table_name=table_name, meta=meta, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 85ffa243..27cdc819 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -73,18 +73,12 @@ def process_batch(self, context: dict) -> None: """ # Use one connection so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): - # Check structure of table - # table: sa.Table = self.connector.prepare_table( - # full_table_name=self.full_table_name, - # schema=self.schema, - # primary_keys=self.key_properties, - # as_temp_table=False, - # connection=connection, - # ) - - table = self.connector.get_table_v2(self.full_table_name, connection) + + table = self.connector.get_table_from_metadata(self.full_table_name, connection) # Create a temp table (Creates from the table above) + # TODO: maybe we should not even create the table copying the structure + # but just create the temp table using the schema temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, from_table=table, From 4f952c92d5a824a3d7f5a306fe3172216802a817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Sat, 16 Mar 2024 22:15:33 +0100 Subject: [PATCH 6/8] prepare_table no longer returns a table object --- target_postgres/connector.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 1408a1cf..258f0d13 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -184,8 +184,6 @@ def prepare_table( # type: ignore[override] column_object=column_object, ) - return meta.tables[full_table_name] - def copy_table_structure( self, full_table_name: str, From 4c78c4d4af51a4b32ff22ece6120a4cf63cf09e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Sat, 16 Mar 2024 22:20:37 +0100 Subject: [PATCH 7/8] cleanup --- target_postgres/connector.py | 44 +----------------------------------- 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 258f0d13..b945850f 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -149,7 +149,6 @@ def prepare_table( # type: ignore[override] return if self.config["load_method"] == TargetLoadMethods.OVERWRITE: - self.logger.info("I ENTERED MORE THAN ONCE") self.get_table(full_table_name=full_table_name).drop(self._engine) self.create_empty_table( table_name=table_name, @@ -803,48 +802,7 @@ def _get_column_type( # type: ignore[override] return t.cast(sa.types.TypeEngine, column.type) - # def get_table_columns( # type: ignore[override] - # self, - # full_table_name: str, - # column_names: list[str] | None = None, - # connection: sa.engine.Connection | None = None, - # ) -> dict[str, sa.Column]: - # """Return a list of table columns. - - # Overrode to support schema_name - - # Args: - # schema_name: schema name. - # table_name: table name to get columns for. - # connection: database connection. - # column_names: A list of column names to filter to. - - # Returns: - # An ordered list of column objects. - # """ - # self.logger.info("THIS IS ANOTHER TEEEST") - - # if not connection: - # return super().get_table_columns(full_table_name, column_names) - - # if full_table_name not in self._table_cols_cache: - # _, schema_name, table_name = self.parse_full_table_name(full_table_name) - # inspector = sa.inspect(connection) - # columns = inspector.get_columns(table_name, schema_name) - - # self._table_cols_cache[full_table_name] = { - # col_meta["name"]: sa.Column( - # col_meta["name"], - # col_meta["type"], - # nullable=col_meta.get("nullable", False), - # ) - # for col_meta in columns - # if not column_names - # or col_meta["name"].casefold() - # in {col.casefold() for col in column_names} - # } - - # return self._table_cols_cache[full_table_name] + def column_exists( # type: ignore[override] self, full_table_name: str, From 860b099264c83c2671438235913b53d8498ec283 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BCl=20Bonet?= Date: Sat, 16 Mar 2024 23:14:40 +0100 Subject: [PATCH 8/8] some signatures changed --- target_postgres/connector.py | 141 +++++++---------------------------- target_postgres/sinks.py | 2 - 2 files changed, 28 insertions(+), 115 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index b945850f..0a4ed9a4 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -120,7 +120,7 @@ def prepare_table( # type: ignore[override] connection: sa.engine.Connection, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sa.Table: + ) -> None: """Adapt target table to provided schema if possible. Args: @@ -141,17 +141,19 @@ def prepare_table( # type: ignore[override] if not self.table_exists(full_table_name=full_table_name): self.create_empty_table( full_table_name=full_table_name, + meta=meta, schema=schema, primary_keys=primary_keys, partition_keys=partition_keys, as_temp_table=as_temp_table, + connection=connection, ) return if self.config["load_method"] == TargetLoadMethods.OVERWRITE: self.get_table(full_table_name=full_table_name).drop(self._engine) self.create_empty_table( - table_name=table_name, + full_table_name=full_table_name, meta=meta, schema=schema, primary_keys=primary_keys, @@ -168,7 +170,6 @@ def prepare_table( # type: ignore[override] columns = self.get_table_columns( full_table_name=full_table_name, - connection=connection, ) for property_name, property_def in schema["properties"].items(): @@ -361,7 +362,7 @@ def pick_best_sql_type(sql_type_array: list): def create_empty_table( # type: ignore[override] self, - table_name: str, + full_table_name: str, meta: sa.MetaData, schema: dict, connection: sa.engine.Connection, @@ -387,6 +388,9 @@ def create_empty_table( # type: ignore[override] NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. """ + + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + columns: list[sa.Column] = [] primary_keys = primary_keys or [] try: @@ -440,66 +444,31 @@ def prepare_column( _, schema_name, table_name = self.parse_full_table_name(full_table_name) column_exists = column_object is not None or self.column_exists( - full_table_name, column_name, connection=connection + full_table_name, column_name, ) if not column_exists: self._create_empty_column( # We should migrate every function to use sa.Table # instead of having to know what the function wants - table_name=table_name, + full_table_name=full_table_name, column_name=column_name, sql_type=sql_type, - schema_name=cast(str, schema_name), - connection=connection, ) return self._adapt_column_type( - schema_name=cast(str, schema_name), - table_name=table_name, + full_table_name=full_table_name, column_name=column_name, sql_type=sql_type, connection=connection, column_object=column_object, ) - def _create_empty_column( # type: ignore[override] - self, - schema_name: str, - table_name: str, - column_name: str, - sql_type: sa.types.TypeEngine, - connection: sa.engine.Connection, - ) -> None: - """Create a new column. - - Args: - schema_name: The schema name. - table_name: The table name. - column_name: The name of the new column. - sql_type: SQLAlchemy type engine to be used in creating the new column. - connection: The database connection. - - Raises: - NotImplementedError: if adding columns is not supported. - """ - if not self.allow_column_add: - msg = "Adding columns is not supported." - raise NotImplementedError(msg) - - column_add_ddl = self.get_column_add_ddl( - schema_name=schema_name, - table_name=table_name, - column_name=column_name, - column_type=sql_type, - ) - connection.execute(column_add_ddl) def get_column_add_ddl( # type: ignore[override] self, table_name: str, - schema_name: str, column_name: str, column_type: sa.types.TypeEngine, ) -> sa.DDL: @@ -514,6 +483,8 @@ def get_column_add_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ + _, schema_name, table_name = self.parse_full_table_name(table_name) + column = sa.Column(column_name, column_type) return sa.DDL( @@ -531,12 +502,11 @@ def get_column_add_ddl( # type: ignore[override] def _adapt_column_type( # type: ignore[override] self, - schema_name: str, - table_name: str, + full_table_name: str, column_name: str, sql_type: sa.types.TypeEngine, - connection: sa.engine.Connection, - column_object: sa.Column | None, + connection: sa.engine.Connection | None = None, + column_object: sa.Column | None = None, ) -> None: """Adapt table column type to support the new JSON schema type. @@ -551,15 +521,21 @@ def _adapt_column_type( # type: ignore[override] Raises: NotImplementedError: if altering columns is not supported. """ + if connection is None: + super()._adapt_column_type( + full_table_name=full_table_name, + column_name=column_name, + sql_type=sql_type, + ) + return + current_type: sa.types.TypeEngine if column_object is not None: current_type = t.cast(sa.types.TypeEngine, column_object.type) else: current_type = self._get_column_type( - schema_name=schema_name, - table_name=table_name, + full_table_name=full_table_name, column_name=column_name, - connection=connection, ) # remove collation if present and save it @@ -586,14 +562,13 @@ def _adapt_column_type( # type: ignore[override] if not self.allow_column_alter: msg = ( "Altering columns is not supported. Could not convert column " - f"'{schema_name}.{table_name}.{column_name}' from '{current_type}' to " + f"'{full_table_name}.{column_name}' from '{current_type}' to " f"'{compatible_sql_type}'." ) raise NotImplementedError(msg) alter_column_ddl = self.get_column_alter_ddl( - schema_name=schema_name, - table_name=table_name, + table_name=full_table_name, column_name=column_name, column_type=compatible_sql_type, ) @@ -601,7 +576,6 @@ def _adapt_column_type( # type: ignore[override] def get_column_alter_ddl( # type: ignore[override] self, - schema_name: str, table_name: str, column_name: str, column_type: sa.types.TypeEngine, @@ -619,6 +593,7 @@ def get_column_alter_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ + _, schema_name, _ = self.parse_full_table_name(table_name) column = sa.Column(column_name, column_type) return sa.DDL( ( @@ -766,66 +741,6 @@ def catch_signal(self, signum, frame) -> None: """ exit(1) # Calling this to be sure atexit is called, so clean_up gets called - def _get_column_type( # type: ignore[override] - self, - schema_name: str, - table_name: str, - column_name: str, - connection: sa.engine.Connection, - ) -> sa.types.TypeEngine: - """Get the SQL type of the declared column. - - Args: - schema_name: The schema name. - table_name: The table name. - column_name: The name of the column. - connection: The database connection. - - Returns: - The type of the column. - - Raises: - KeyError: If the provided column name does not exist. - """ - try: - column = self.get_table_columns( - schema_name=schema_name, - table_name=table_name, - connection=connection, - )[column_name] - except KeyError as ex: - msg = ( - f"Column `{column_name}` does not exist in table" - "`{schema_name}.{table_name}`." - ) - raise KeyError(msg) from ex - - return t.cast(sa.types.TypeEngine, column.type) - - - def column_exists( # type: ignore[override] - self, - full_table_name: str, - column_name: str, - connection: sa.engine.Connection, - ) -> bool: - """Determine if the target column already exists. - - Args: - full_table_name: the target table name. - column_name: the target column name. - connection: the database connection. - - Returns: - True if table exists, False if not. - """ - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - assert schema_name is not None - assert table_name is not None - return column_name in self.get_table_columns( - full_table_name=full_table_name, - connection=connection - ) class NOTYPE(TypeDecorator): diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 27cdc819..8f4e5da2 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -335,7 +335,6 @@ def activate_version(self, new_version: int) -> None: if not self.connector.column_exists( full_table_name=self.full_table_name, column_name=self.version_column_name, - connection=connection, ): raise RuntimeError( f"{self.version_column_name} is required for activate version " @@ -346,7 +345,6 @@ def activate_version(self, new_version: int) -> None: or self.connector.column_exists( full_table_name=self.full_table_name, column_name=self.soft_delete_column_name, - connection=connection, ) ): raise RuntimeError(