diff --git a/.changes/unreleased/Under the Hood-20240913-213312.yaml b/.changes/unreleased/Under the Hood-20240913-213312.yaml new file mode 100644 index 00000000000..495b6f8de53 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240913-213312.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add test utility patch_microbatch_end_time for adapters testing +time: 2024-09-13T21:33:12.482336-04:00 +custom: + Author: michelleark + Issue: "10713" diff --git a/core/dbt/tests/util.py b/core/dbt/tests/util.py index 3844705b974..9c2713aa048 100644 --- a/core/dbt/tests/util.py +++ b/core/dbt/tests/util.py @@ -6,12 +6,14 @@ from datetime import datetime from io import StringIO from typing import Any, Dict, List, Optional +from unittest import mock import yaml from dbt.adapters.base.relation import BaseRelation from dbt.adapters.factory import Adapter from dbt.cli.main import dbtRunner +from dbt.context.providers import BaseResolver from dbt.contracts.graph.manifest import Manifest from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext from dbt_common.events.base_types import EventLevel @@ -640,3 +642,8 @@ def safe_set_invocation_context(): if invocation_var is None: invocation_var = _INVOCATION_CONTEXT_VAR invocation_var.set(InvocationContext(os.environ)) + + +def patch_microbatch_end_time(dt_str: str): + dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") + return mock.patch.object(BaseResolver, "_build_end_time", return_value=dt) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 8bdb5abab2c..2970f3d7718 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -2,9 +2,13 @@ from unittest import mock import pytest -from freezegun import freeze_time -from dbt.tests.util import relation_from_name, run_dbt, write_file +from dbt.tests.util import ( + patch_microbatch_end_time, + relation_from_name, + run_dbt, + write_file, +) input_model_sql = """ {{ config(materialized='table', event_time='event_time') }} @@ -133,12 +137,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data - with freeze_time("2020-01-03 13:57:00"): + with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) # our partition grain is "day" so running the same day without new data should produce the same results - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) @@ -152,17 +156,17 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "input_model", 5) # re-run without changing current time => no insert - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 3) # re-run by advancing time by one day changing current time => insert 1 row - with freeze_time("2020-01-04 14:57:00"): + with patch_microbatch_end_time("2020-01-04 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 4) # re-run by advancing time by one more day changing current time => insert 1 more row - with freeze_time("2020-01-05 14:57:00"): + with patch_microbatch_end_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) @@ -198,12 +202,12 @@ def test_run_with_event_time(self, project): run_dbt(["seed"]) # initial run -- backfills all data - with freeze_time("2020-01-03 13:57:00"): + with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) # our partition grain is "day" so running the same day without new data should produce the same results - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) @@ -217,17 +221,17 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "raw_source", 5) # re-run without changing current time => no insert - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 3) # re-run by advancing time by one day changing current time => insert 1 row - with freeze_time("2020-01-04 14:57:00"): + with patch_microbatch_end_time("2020-01-04 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 4) # re-run by advancing time by one more day changing current time => insert 1 more row - with freeze_time("2020-01-05 14:57:00"): + with patch_microbatch_end_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) @@ -253,12 +257,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data - with freeze_time("2020-01-03 13:57:00"): + with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) # our partition grain is "day" so running the same day without new data should produce the same results - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) @@ -272,7 +276,7 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "input_model", 5) # re-run without changing current time => INSERT BECAUSE INPUT MODEL ISN'T BEING FILTERED - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) @@ -298,12 +302,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data - with freeze_time("2020-01-03 13:57:00"): + with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) # our partition grain is "day" so running the same day without new data should produce the same results - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) @@ -317,7 +321,7 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "input_model", 5) # re-run without changing current time => no insert - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 3) @@ -327,6 +331,6 @@ def test_run_with_event_time(self, project): ) # re-run without changing current time => INSERT because .render() skips filtering - with freeze_time("2020-01-03 14:57:00"): + with patch_microbatch_end_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5)