Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Snapshot Merge Query #385

Merged
merged 11 commits into from
Jan 6, 2025
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241217-163126.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix the snapshot merge query for several adapters in new_record mode.
time: 2024-12-17T16:31:26.970526-05:00
custom:
Author: peterallenwebb
Issue: "385"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from dbt.tests.util import check_relations_equal, run_dbt

# Snapshot source data for the tests in this file
_seed_new_record_mode = """
BEGIN

create table {database}.{schema}.seed (
id INTEGER,
first_name VARCHAR(50),
Expand Down Expand Up @@ -88,6 +91,8 @@
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed;

END;
"""

_snapshot_actual_sql = """
Expand Down Expand Up @@ -119,6 +124,8 @@


_invalidate_sql = """
BEGIN

-- update records 11 - 21. Change email and updated_at field
update {schema}.seed set
updated_at = updated_at + interval '1 hour',
Expand All @@ -131,6 +138,7 @@
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;

END;
"""

_update_sql = """
Expand Down Expand Up @@ -169,8 +177,14 @@
where id >= 10 and id <= 20;
"""

# SQL to delete a record from the snapshot source data
_delete_sql = """
delete from {schema}.seed where id = 1
delete from {database}.{schema}.seed where id = 1
"""

# If the deletion worked correctly, this should return two rows, with one of them representing the deletion.
_delete_check_sql = """
select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1
"""


Expand Down Expand Up @@ -222,4 +236,15 @@ def test_snapshot_new_record_mode(
results = run_dbt(["snapshot"])
assert len(results) == 1

# TODO: Further validate results.
check_result = project.run_sql(_delete_check_sql, fetch="all")
valid_to = 0
scd_id = 1
is_deleted = 2
assert len(check_result) == 2
assert sum(
[1 for c in check_result if c[valid_to] is None and c[scd_id] is not None and c[is_deleted] == "True"]
) == 1
assert sum(
[1 for c in check_result if c[valid_to] is not None and c[scd_id] is not None and c[is_deleted] == "False"]
) == 1
assert check_result[0][scd_id] != check_result[1][scd_id]
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
with snapshot_query as (

{{ source_sql }}
Expand Down Expand Up @@ -169,12 +169,13 @@
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}

)
{%- endif %}

Expand Down Expand Up @@ -272,6 +273,17 @@
{% endif %}
{% endmacro %}

{% macro unique_key_reverse(unique_key) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
dbt_unique_key_{{ loop.index }} as {{ key }}
{%- if not loop.last %} , {%- endif %}
{% endfor %}
{% else %}
dbt_unique_key as {{ unique_key }}
{% endif %}
{% endmacro %}


{% macro unique_key_join_on(unique_key, identifier, from_identifier) %}
{% if unique_key | is_list %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

merge into {{ target.render() }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} and DBT_INTERNAL_SOURCE.dbt_change_type != 'insert'
adrianburusdbt marked this conversation as resolved.
Show resolved Hide resolved

when matched
{% if config.get("dbt_valid_to_current") %}
Expand Down
Loading