Skip to content

Commit

Permalink
add patch_microbatch_event_time (#10712)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 14, 2024
1 parent 139b9ac commit 6c111f2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240913-213312.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add test utility patch_microbatch_end_time for adapters testing
time: 2024-09-13T21:33:12.482336-04:00
custom:
Author: michelleark
Issue: "10713"
7 changes: 7 additions & 0 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from datetime import datetime
from io import StringIO
from typing import Any, Dict, List, Optional
from unittest import mock

import yaml

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.factory import Adapter
from dbt.cli.main import dbtRunner
from dbt.context.providers import BaseResolver
from dbt.contracts.graph.manifest import Manifest
from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -640,3 +642,8 @@ def safe_set_invocation_context():
if invocation_var is None:
invocation_var = _INVOCATION_CONTEXT_VAR
invocation_var.set(InvocationContext(os.environ))


def patch_microbatch_end_time(dt_str: str):
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
return mock.patch.object(BaseResolver, "_build_end_time", return_value=dt)
42 changes: 23 additions & 19 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
from unittest import mock

import pytest
from freezegun import freeze_time

from dbt.tests.util import relation_from_name, run_dbt, write_file
from dbt.tests.util import (
patch_microbatch_end_time,
relation_from_name,
run_dbt,
write_file,
)

input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
Expand Down Expand Up @@ -133,12 +137,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -152,17 +156,17 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
with patch_microbatch_end_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
with patch_microbatch_end_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand Down Expand Up @@ -198,12 +202,12 @@ def test_run_with_event_time(self, project):
run_dbt(["seed"])

# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -217,17 +221,17 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "raw_source", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
with patch_microbatch_end_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
with patch_microbatch_end_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand All @@ -253,12 +257,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -272,7 +276,7 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => INSERT BECAUSE INPUT MODEL ISN'T BEING FILTERED
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand All @@ -298,12 +302,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -317,7 +321,7 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -327,6 +331,6 @@ def test_run_with_event_time(self, project):
)

# re-run without changing current time => INSERT because .render() skips filtering
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

0 comments on commit 6c111f2

Please sign in to comment.