Skip to content

Commit

Permalink
Always emit warning when microbatch models lack any filtered input no…
Browse files Browse the repository at this point in the history
…de (#11196) (#11199)

* Update `TestMicrobatchWithInputWithoutEventTime` to check running again raises warning

The first time the project is run, the appropriate warning about inputs is raised. However,
the warning is only being raised when a full parse happens. When partial parsing happens
the warning isn't getting raised. In the next commit we'll fix this issue. This commit updates
the test to show that the second run (with partial parsing) doesn't raise the update, and thus
the test fails.

* Update manifest loading to _always_ check microbatch model inputs

Of note we are at the point where multiple validations are iterating
all of the nodes in a manifest. We should refactor these _soon_ such that
we are not iterating over the same list multiple times.

* Add changie doc

(cherry picked from commit 2eb1a5c)

Co-authored-by: Quigley Malcolm <[email protected]>
  • Loading branch information
github-actions[bot] and QMalcolm authored Jan 8, 2025
1 parent 6a9e35a commit 42a58bb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250107-173719.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure warning about microbatch lacking filter inputs is always fired
time: 2025-01-07T17:37:19.373261-06:00
custom:
Author: QMalcolm
Issue: "11159"
38 changes: 23 additions & 15 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def load(self) -> Manifest:
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()
self.check_microbatch_model_has_a_filtered_input()

return self.manifest

Expand Down Expand Up @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self):
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)
Expand All @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None:
)
)

def check_microbatch_model_has_a_filtered_input(self):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
4 changes: 3 additions & 1 deletion tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,11 @@ def test_run_with_event_time(self, project):
assert len(catcher.caught_events) == 1

# our partition grain is "day" so running the same day without new data should produce the same results
catcher.caught_events = []
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)
assert len(catcher.caught_events) == 1

# add next two days of data
test_schema_relation = project.adapter.Relation.create(
Expand Down

0 comments on commit 42a58bb

Please sign in to comment.