Skip to content

Commit

Permalink
fix: Respect standard Singer stream metadata for key properties, repl…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 20, 2024
1 parent 8e8a44f commit 9b7a6bf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
1 change: 1 addition & 0 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class StreamMetadata(Metadata):
"""Stream metadata."""

table_key_properties: t.Sequence[str] | None = None
replication_key: str | None = None
forced_replication_method: str | None = None
valid_replication_keys: list[str] | None = None
schema_name: str | None = None
Expand Down
19 changes: 15 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,10 +1277,21 @@ def apply_catalog(self, catalog: singer.Catalog) -> None:

catalog_entry = catalog.get_stream(self.name)
if catalog_entry:
self.primary_keys = catalog_entry.key_properties
self.replication_key = catalog_entry.replication_key
if catalog_entry.replication_method:
self.forced_replication_method = catalog_entry.replication_method
stream_metadata = catalog_entry.metadata.root

self.primary_keys = (
catalog_entry.key_properties or stream_metadata.table_key_properties
)
self.replication_key = (
catalog_entry.replication_key or stream_metadata.replication_key
)

replication_method = (
catalog_entry.replication_method
or stream_metadata.forced_replication_method
)
if replication_method:
self.forced_replication_method = replication_method

def _get_state_partition_context(
self,
Expand Down

0 comments on commit 9b7a6bf

Please sign in to comment.