-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move deferral resolution from merge_from_artifact
to RuntimeRefResolver
#9199
Changes from 8 commits
4f44339
0840647
8b5618e
658999f
6f1ad1e
5513e16
6b63e31
7854bfa
1377431
9db16a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
kind: Under the Hood | ||
body: Split up deferral across parsing (adding 'defer_relation' from state manifest) | ||
and runtime ref resolution" | ||
time: 2024-02-01T00:30:33.573665+01:00 | ||
custom: | ||
Author: jtcohen6 | ||
Issue: "9199" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,12 +57,7 @@ def message(self) -> str: | |
return f"Tracking: {self.user_state}" | ||
|
||
|
||
class MergedFromState(DebugLevel): | ||
def code(self) -> str: | ||
return "A004" | ||
|
||
def message(self) -> str: | ||
return f"Merged {self.num_merged} items from state (sample: {self.sample})" | ||
# Removed A004: MergedFromState | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're no longer fully overwriting deferred manifest nodes with the stateful manifest's alternative, I didn't see much use in keeping this event around |
||
|
||
|
||
class MissingProfileTarget(InfoLevel): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1886,7 +1886,12 @@ def write_manifest(manifest: Manifest, target_path: str, which: Optional[str] = | |
write_semantic_manifest(manifest=manifest, target_path=target_path) | ||
|
||
|
||
def parse_manifest(runtime_config, write_perf_info, write, write_json): | ||
def parse_manifest( | ||
runtime_config: RuntimeConfig, | ||
write_perf_info: bool, | ||
write: bool, | ||
write_json: bool, | ||
) -> Manifest: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before 6b63e31, this was where I was calling |
||
register_adapter(runtime_config, get_mp_context()) | ||
adapter = get_adapter(runtime_config) | ||
adapter.set_macro_context_generator(generate_runtime_macro_context) | ||
|
@@ -1895,6 +1900,7 @@ def parse_manifest(runtime_config, write_perf_info, write, write_json): | |
write_perf_info=write_perf_info, | ||
) | ||
|
||
# If we should (over)write the manifest in the target path, do that now | ||
if write and write_json: | ||
write_manifest(manifest, runtime_config.project_target_path) | ||
pm = plugins.get_plugin_manager(runtime_config.project_name) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -454,10 +454,10 @@ def print_results_line(self, results, execution_time) -> None: | |
|
||
def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: | ||
with adapter.connection_named("master"): | ||
self.defer_to_manifest() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The purpose of this method is now to add Previously, we needed to know at this point which objects exist in the target database locations (which requires the cache). Now that's cache check is happening within |
||
required_schemas = self.get_model_schemas(adapter, selected_uids) | ||
self.create_schemas(adapter, required_schemas) | ||
self.populate_adapter_cache(adapter, required_schemas) | ||
self.defer_to_manifest(adapter, selected_uids) | ||
self.safe_run_hooks(adapter, RunHookType.Start, {}) | ||
|
||
def after_run(self, adapter, results) -> None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,24 +127,15 @@ def get_selection_spec(self) -> SelectionSpec: | |
def get_node_selector(self) -> NodeSelector: | ||
raise NotImplementedError(f"get_node_selector not implemented for task {type(self)}") | ||
|
||
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): | ||
def defer_to_manifest(self): | ||
deferred_manifest = self._get_deferred_manifest() | ||
if deferred_manifest is None: | ||
return | ||
if self.manifest is None: | ||
raise DbtInternalError( | ||
"Expected to defer to manifest, but there is no runtime manifest to defer from!" | ||
) | ||
self.manifest.merge_from_artifact( | ||
adapter=adapter, | ||
other=deferred_manifest, | ||
selected=selected_uids, | ||
favor_state=bool(self.args.favor_state), | ||
) | ||
# We're rewriting the manifest because it's been mutated during merge_from_artifact. | ||
# This is to reflect which nodes had been deferred to (= replaced with) their counterparts. | ||
if self.args.write_json: | ||
write_manifest(self.manifest, self.config.project_target_path) | ||
self.manifest.merge_from_artifact(other=deferred_manifest) | ||
|
||
def get_graph_queue(self) -> GraphQueue: | ||
selector = self.get_node_selector() | ||
|
@@ -479,8 +470,8 @@ def populate_adapter_cache( | |
|
||
def before_run(self, adapter, selected_uids: AbstractSet[str]): | ||
with adapter.connection_named("master"): | ||
self.defer_to_manifest() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment on |
||
self.populate_adapter_cache(adapter) | ||
self.defer_to_manifest(adapter, selected_uids) | ||
|
||
def after_run(self, adapter, results): | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming this is a minor change and safe to make here from a runtime serialization/deserialization perspective 👍 Independent unit tests here: #10066
Additionally, we're not using this field for anything functional in dbt-core's implementation, it was previously just accessed in testing to assert certain behaviour had occurred during deferral.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving minor schema evolution changes