diff --git a/.changes/unreleased/Fixes-20250107-173719.yaml b/.changes/unreleased/Fixes-20250107-173719.yaml new file mode 100644 index 00000000000..2d2310f1bac --- /dev/null +++ b/.changes/unreleased/Fixes-20250107-173719.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Ensure warning about microbatch lacking filter inputs is always fired +time: 2025-01-07T17:37:19.373261-06:00 +custom: + Author: QMalcolm + Issue: "11159" diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index ba42c6637d3..023c5db9300 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -513,6 +513,7 @@ def load(self) -> Manifest: self.check_for_spaces_in_resource_names() self.check_for_microbatch_deprecations() self.check_forcing_batch_concurrency() + self.check_microbatch_model_has_a_filtered_input() return self.manifest @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self): f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})." ) - # Validate upstream node event_time (if configured) - has_input_with_event_time_config = False - for input_unique_id in node.depends_on.nodes: - input_node = self.manifest.expect(unique_id=input_unique_id) - input_event_time = input_node.config.event_time - if input_event_time: - if not isinstance(input_event_time, str): - raise dbt.exceptions.ParsingError( - f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." - ) - has_input_with_event_time_config = True - - if not has_input_with_event_time_config: - fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) - def check_forcing_batch_concurrency(self) -> None: if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): adapter = get_adapter(self.root_project) @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None: ) ) + def check_microbatch_model_has_a_filtered_input(self): + if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + # Validate upstream node event_time (if configured) + has_input_with_event_time_config = False + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time: + if not isinstance(input_event_time, str): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." + ) + has_input_with_event_time_config = True + + if not has_input_with_event_time_config: + fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 953b372b226..ef747138fbd 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -464,9 +464,11 @@ def test_run_with_event_time(self, project): assert len(catcher.caught_events) == 1 # our partition grain is "day" so running the same day without new data should produce the same results + catcher.caught_events = [] with patch_microbatch_end_time("2020-01-03 14:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[catcher.catch]) self.assert_row_count(project, "microbatch_model", 3) + assert len(catcher.caught_events) == 1 # add next two days of data test_schema_relation = project.adapter.Relation.create(