Skip to content

Commit

Permalink
fix: Respect standard Singer stream metadata for key properties, repl…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 20, 2024
1 parent 8e8a44f commit 1696cf7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
1 change: 1 addition & 0 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class StreamMetadata(Metadata):
"""Stream metadata."""

table_key_properties: t.Sequence[str] | None = None
replication_key: str | None = None
forced_replication_method: str | None = None
valid_replication_keys: list[str] | None = None
schema_name: str | None = None
Expand Down
25 changes: 21 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
if t.TYPE_CHECKING:
import logging

from singer_sdk._singerlib.catalog import StreamMetadata
from singer_sdk.helpers import types
from singer_sdk.helpers._compat import Traversable
from singer_sdk.tap_base import Tap
Expand Down Expand Up @@ -1277,10 +1278,26 @@ def apply_catalog(self, catalog: singer.Catalog) -> None:

catalog_entry = catalog.get_stream(self.name)
if catalog_entry:
self.primary_keys = catalog_entry.key_properties
self.replication_key = catalog_entry.replication_key
if catalog_entry.replication_method:
self.forced_replication_method = catalog_entry.replication_method
stream_metadata: StreamMetadata | None
if stream_metadata := catalog_entry.metadata.get(()): # type: ignore[assignment]
table_key_properties = stream_metadata.table_key_properties
table_replication_key = stream_metadata.replication_key
table_replication_method = stream_metadata.forced_replication_method
else:
table_key_properties = None
table_replication_key = None
table_replication_method = None

self.primary_keys = catalog_entry.key_properties or table_key_properties
self.replication_key = (
catalog_entry.replication_key or table_replication_key
)

replication_method = (
catalog_entry.replication_method or table_replication_method
)
if replication_method:
self.forced_replication_method = replication_method

def _get_state_partition_context(
self,
Expand Down

0 comments on commit 1696cf7

Please sign in to comment.