Skip to content

Commit

Permalink
Fix test_logical_streams_from_pg_with_hard_delete_mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 22, 2022
1 parent e440b8b commit c484a7b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
4 changes: 3 additions & 1 deletion target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ def persist_lines(config, lines) -> None:
if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete):
record = stream_utils.add_metadata_values_to_record(o)
else:
record = stream_utils.remove_metadata_values_from_record(o)
record = stream_utils.remove_metadata_values_from_record(
o, stream_to_sync[stream].stream_schema_message['schema']
)

# Flatten record
record = flatten_record(
Expand Down
17 changes: 10 additions & 7 deletions target_bigquery/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,18 @@ def parse_datetime(dt):

return extended_record

def remove_metadata_values_from_record(record_message):
"""Remove any metadata _sdc columns from incoming record message
def remove_metadata_values_from_record(record_message, schema):
"""Remove metadata _sdc columns from incoming record message if not in SCHEMA
The location of the required attributes are fixed in the stream
"""
record = record_message['record']
record.pop('_sdc_extracted_at', None)
record.pop('_sdc_batched_at', None)
record.pop('_sdc_deleted_at', None)
return record
expected_metadata = {k for k in schema['properties'].keys() if k.startswith('_sdc_')}

reduced_record = record_message['record']
reduced_record = {
k: v for k, v in reduced_record.items() if not k.startswith('_sdc_') or k in expected_metadata
}

return reduced_record

def stream_name_to_dict(stream_name, separator='-'):
"""Transform stream name string to dictionary"""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def test_logical_streams_from_pg_with_hard_delete_mapping(self):
# ----------------------------------------------------------------------
# Check rows in table_two
# ----------------------------------------------------------------------
delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838000, tzinfo=timezone.utc)
delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838328, tzinfo=timezone.utc)
expected_table_two = [
{'cid': 1, 'cvarchar': "updated row", "_sdc_deleted_at": None},
{'cid': 2, 'cvarchar': 'updated row', "_sdc_deleted_at": None},
Expand Down

0 comments on commit c484a7b

Please sign in to comment.