diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 061f09f..c875a3b 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -121,7 +121,9 @@ def persist_lines(config, lines) -> None: if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete): record = stream_utils.add_metadata_values_to_record(o) else: - record = stream_utils.remove_metadata_values_from_record(o) + record = stream_utils.remove_metadata_values_from_record( + o, stream_to_sync[stream].stream_schema_message['schema'] + ) # Flatten record record = flatten_record( diff --git a/target_bigquery/stream_utils.py b/target_bigquery/stream_utils.py index b31e3bd..095d29d 100644 --- a/target_bigquery/stream_utils.py +++ b/target_bigquery/stream_utils.py @@ -111,15 +111,18 @@ def parse_datetime(dt): return extended_record -def remove_metadata_values_from_record(record_message): - """Remove any metadata _sdc columns from incoming record message +def remove_metadata_values_from_record(record_message, schema): + """Remove metadata _sdc columns from incoming record message if not in SCHEMA The location of the required attributes are fixed in the stream """ - record = record_message['record'] - record.pop('_sdc_extracted_at', None) - record.pop('_sdc_batched_at', None) - record.pop('_sdc_deleted_at', None) - return record + expected_metadata = {k for k in schema['properties'].keys() if k.startswith('_sdc_')} + + reduced_record = record_message['record'] + reduced_record = { + k: v for k, v in reduced_record.items() if not k.startswith('_sdc_') or k in expected_metadata + } + + return reduced_record def stream_name_to_dict(stream_name, separator='-'): """Transform stream name string to dictionary""" diff --git a/tests/integration/test_target_bigquery.py b/tests/integration/test_target_bigquery.py index 63b8831..3659e72 100644 --- a/tests/integration/test_target_bigquery.py +++ b/tests/integration/test_target_bigquery.py @@ -617,7 +617,7 @@ def test_logical_streams_from_pg_with_hard_delete_mapping(self): # ---------------------------------------------------------------------- # Check rows in table_two # ---------------------------------------------------------------------- - delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838000, tzinfo=timezone.utc) + delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838328, tzinfo=timezone.utc) expected_table_two = [ {'cid': 1, 'cvarchar': "updated row", "_sdc_deleted_at": None}, {'cid': 2, 'cvarchar': 'updated row', "_sdc_deleted_at": None},