diff --git a/src/dagcellent/operators/sql_s3.py b/src/dagcellent/operators/sql_s3.py index d38a385..579eb62 100644 --- a/src/dagcellent/operators/sql_s3.py +++ b/src/dagcellent/operators/sql_s3.py @@ -103,13 +103,13 @@ class SqlToS3Operator(AWSSqlToS3Operator): def __init__( self, - database: str, table_name: str | None, chunksize: int = 10**6, fix_dtypes: bool = True, where_clause: str | None = None, join_clause: str | None = None, type_mapping: PyarrowMapping = Pyarrow2redshift, + database: str | None = None, **kwargs: Any, ) -> None: # type: ignore """Override constructor with extra chunksize argument.""" @@ -135,8 +135,11 @@ def _supported_source_connections(self) -> list[str]: raise ValueError("Unsupported connection type.") def _sql_hook_with_db(self) -> Engine: + """Add self.database to SQLAlchemy engine if it is not None.""" sql_hook = self._get_hook() engine = sql_hook.get_sqlalchemy_engine() # type: ignore + if self.database is None: + return engine # inject database name if not defined in connection URI # This works, but cannot cross query/reflect properly return safe_add_database_to_connection(engine, self.database)