Skip to content

Commit

Permalink
Use on_table_exists_logic macro for full-refresh in incremental mater…
Browse files Browse the repository at this point in the history
…ialization
  • Loading branch information
damian3031 committed May 7, 2024
1 parent 2931724 commit ad034bc
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 21 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20240430-120758.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: on_table_exists modes (rename, drop, replace) determine how table is recreated
during a full-refresh run of an incremental model
time: 2024-04-30T12:07:58.484083+02:00
custom:
Author: damian3031
Issue: "395"
PR: "406"
52 changes: 31 additions & 21 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,42 @@

{% materialization incremental, adapter='trino', supported_languages=['sql'] -%}

{#-- Set vars --#}
{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- configs --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set language = model['language'] -%}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{%- set on_table_exists = config.get('on_table_exists', 'rename') -%}
{% if on_table_exists not in ['rename', 'drop', 'replace'] %}
{%- set log_message = 'Invalid value for on_table_exists (%s) specified. Setting default value (%s).' % (on_table_exists, 'rename') -%}
{% do log(log_message) %}
{%- set on_table_exists = 'rename' -%}
{% endif %}

{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) %}
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}
-- the temp_ relation should not already exist in the database; get_relation
{#-- the temp_ and backup_ relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation.
-- later, before we try to use this name for the current operation.#}
{%- set preexisting_tmp_relation = load_cached_relation(tmp_relation)-%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}

{#--- grab current tables grants config for comparision later on#}
{% set grant_config = config.get('grants') %}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

-- drop the temp relation if it exists already in the database
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_tmp_relation) }}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks) }}

Expand All @@ -58,11 +72,8 @@
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{#-- Can't replace a table - we must drop --#}
{% do adapter.drop_relation(existing_relation) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{#-- Create table with given `on_table_exists` mode #}
{% do on_table_exists_logic(on_table_exists, existing_relation, intermediate_relation, backup_relation, target_relation) %}

{% else %}
{#-- Create the temp relation, either as a view or as a temp table --#}
Expand All @@ -86,6 +97,7 @@
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
Expand All @@ -94,9 +106,7 @@
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(tmp_relation) %}

{% do drop_relation_if_exists(tmp_relation) %}
{{ run_hooks(post_hooks) }}

{% set should_revoke =
Expand Down
94 changes: 94 additions & 0 deletions tests/functional/adapter/materialization/test_on_table_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,49 @@ def test_run_seed_test(self, project):
check_relations_equal(project.adapter, ["seed", "materialization"])


class TestOnTableExistsDropIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `drop` configuration for incremental materialization and full refresh flag,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_drop",
"models": {"+materialized": "incremental", "+on_table_exists": "drop"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'drop'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'drop table if exists "{project.database}"."{project.test_schema}"."materialization"'
not in logs
)
results, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"], expect_pass=True)
assert len(results) == 1
assert (
f'drop table if exists "{project.database}"."{project.test_schema}"."materialization"'
in logs
)
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


class BaseOnTableExistsReplace(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for table materialization,
Expand Down Expand Up @@ -105,3 +148,54 @@ class TestOnTableExistsReplaceIceberg(BaseOnTableExistsReplace):
@pytest.mark.delta
class TestOnTableExistsReplaceDelta(BaseOnTableExistsReplace):
pass


class BaseOnTableExistsReplaceIncrementalFullRefresh(BaseOnTableExists):
"""
Testing on_table_exists = `replace` configuration for incremental materialization and full refresh flag,
using dbt seed, run and tests commands and validate data load correctness.
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "table_drop",
"models": {"+materialized": "incremental", "+on_table_exists": "replace"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1
# run models two times to check on_table_exists = 'replace'
results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert "create or replace table" not in logs
results, logs = run_dbt_and_capture(["--debug", "run", "--full-refresh"], expect_pass=True)
assert len(results) == 1
assert "create or replace table" in logs
# test tests
results = run_dbt(["test"], expect_pass=True)
assert len(results) == 3

# check if the data was loaded correctly
check_relations_equal(project.adapter, ["seed", "materialization"])


@pytest.mark.iceberg
class TestOnTableExistsReplaceIcebergIncrementalFullRefresh(
BaseOnTableExistsReplaceIncrementalFullRefresh
):
pass


@pytest.mark.delta
class TestOnTableExistsReplaceDeltaIncrementalFullRefresh(
BaseOnTableExistsReplaceIncrementalFullRefresh
):
pass

0 comments on commit ad034bc

Please sign in to comment.