Skip to content

Commit

Permalink
Add microbatch incremental strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
damian3031 committed Nov 29, 2024
1 parent 2e08c8a commit 42c00de
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/trino/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ def get_columns_in_relation(self, relation):
raise

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]
17 changes: 16 additions & 1 deletion dbt/adapters/trino/relation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass, field

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.base.relation import BaseRelation, EventTimeFilter, Policy
from dbt.adapters.contracts.relation import ComponentName


Expand All @@ -12,3 +12,18 @@ class TrinoRelation(BaseRelation):
# Overridden as Trino converts relation identifiers to lowercase
def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
return self.path.get_lowered_part(field) == value.lower()

# Overridden because Trino cannot compare a TIMESTAMP column with a VARCHAR literal.
def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
"""
Returns "" if start and end are both None
"""
filter = ""
if event_time_filter.start and event_time_filter.end:
filter = f"{event_time_filter.field_name} >= TIMESTAMP '{event_time_filter.start}' and {event_time_filter.field_name} < TIMESTAMP '{event_time_filter.end}'"
elif event_time_filter.start:
filter = f"{event_time_filter.field_name} >= TIMESTAMP '{event_time_filter.start}'"
elif event_time_filter.end:
filter = f"{event_time_filter.field_name} < TIMESTAMP '{event_time_filter.end}'"

return filter
31 changes: 31 additions & 0 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,34 @@
)
{% endif %}
{% endmacro %}


{% macro trino__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append(model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.get("__dbt_internal_microbatch_event_time_end") -%}
{% do incremental_predicates.append(model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete from {{ target }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch


@pytest.mark.iceberg
class TestTrinoMicrobatchIceberg(BaseMicrobatch):
pass

0 comments on commit 42c00de

Please sign in to comment.