Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Standard configurable load methods #1893

Merged
merged 13 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class {{ cookiecutter.destination_name }}Connector(SQLConnector):
allow_column_rename: bool = True # Whether RENAME COLUMN is supported.
allow_column_alter: bool = False # Whether altering column types is supported.
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_overwrite: bool = False # Whether overwrite load method is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

def get_sqlalchemy_url(self, config: dict) -> str:
Expand Down
1 change: 1 addition & 0 deletions samples/sample_target_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SQLiteConnector(SQLConnector):
allow_temp_tables = False
allow_column_alter = False
allow_merge_upsert = True
allow_overwrite: bool = False

def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str:
"""Generates a SQLAlchemy URL for SQLite."""
Expand Down
41 changes: 41 additions & 0 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers.capabilities import TargetLoadMethods

if t.TYPE_CHECKING:
from sqlalchemy.engine.reflection import Inspector
Expand All @@ -37,6 +38,7 @@
allow_column_rename: bool = True # Whether RENAME COLUMN is supported.
allow_column_alter: bool = False # Whether altering column types is supported.
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_overwrite: bool = False # Whether overwrite load method is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.
_cached_engine: Engine | None = None

Expand Down Expand Up @@ -732,6 +734,43 @@
if not schema_exists:
self.create_schema(schema_name)

@staticmethod
def get_truncate_table_ddl(
table_name: str,
) -> sqlalchemy.DDL:
"""Get the truncate table DDL statement.

Override this if your database uses a different syntax for truncating tables.

Args:
table_name: Fully qualified table name of column to alter.

Returns:
A sqlalchemy DDL instance.
"""
return sqlalchemy.DDL(

Check warning on line 751 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L751

Added line #L751 was not covered by tests
"TRUNCATE TABLE %(table_name)s",
{
"table_name": table_name,
},
)

def truncate_table(self, full_table_name: str) -> None:
"""Truncate target table.

Args:
full_table_name: Fully qualified table name of column to alter.
"""
if not self.allow_overwrite:
msg = "Truncating tables is not supported."

Check warning on line 765 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L765

Added line #L765 was not covered by tests
raise NotImplementedError(msg)

truncate_table_ddl = self.get_truncate_table_ddl(

Check warning on line 768 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L768

Added line #L768 was not covered by tests
table_name=full_table_name,
)
with self._connect() as conn:
conn.execute(truncate_table_ddl)

Check warning on line 772 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L772

Added line #L772 was not covered by tests

def prepare_table(
self,
full_table_name: str,
Expand All @@ -758,6 +797,8 @@
as_temp_table=as_temp_table,
)
return
elif self.config["load_method"] == TargetLoadMethods.OVERWRITE:
self.truncate_table(full_table_name=full_table_name)

Check warning on line 801 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L801

Added line #L801 was not covered by tests

for property_name, property_def in schema["properties"].items():
self.prepare_column(
Expand Down
34 changes: 34 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@
).to_dict()


class TargetLoadMethods(str, Enum):
"""Target-specific capabilities."""

# always write all input records whether that records already exists or not
APPEND_ONLY = "append-only"

# update existing records and insert new records
UPSERT = "upsert"

# delete all existing records and insert all input records
OVERWRITE = "overwrite"


TARGET_LOAD_METHOD_CONFIG = PropertiesList(
Property(
"load_method",
StringType(),
description=(
"The method to use when loading data into the destination. "
"`append-only` will always write all input records whether that records "
"already exists or not. `upsert` will update existing records and insert "
"new records. `overwrite` will delete all existing records and insert all "
"input records."
),
allowed_values=[
TargetLoadMethods.APPEND_ONLY,
TargetLoadMethods.UPSERT,
TargetLoadMethods.OVERWRITE,
],
default=TargetLoadMethods.APPEND_ONLY,
),
).to_dict()


class DeprecatedEnum(Enum):
"""Base class for capabilities enumeration."""

Expand Down
2 changes: 2 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
Expand Down Expand Up @@ -597,6 +598,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
target_jsonschema["properties"][k] = v

_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)

capabilities = cls.capabilities

Expand Down