Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor deferral: always merge_from_artifact & support all commands #9040

Merged
merged 9 commits into from
Jan 17, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231108-163613.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Consolidate deferral methods & flags
time: 2023-11-08T16:36:13.234324-05:00
custom:
Author: jtcohen6
Issue: 7965 8715
33 changes: 28 additions & 5 deletions core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def _get_params_by_source(ctx: Context, source_type: ParameterSource):
def _assign_params(
ctx: Context,
params_assigned_from_default: set,
params_assigned_from_user: set,
deprecated_env_vars: Dict[str, Callable],
):
"""Recursively adds all click params to flag object"""
Expand Down Expand Up @@ -173,15 +174,30 @@ def _assign_params(
object.__setattr__(self, flag_name, param_value)

# Track default assigned params.
if is_default:
# For flags that are accepted at both 'parent' and 'child' levels,
# we need to track user-provided and default values across both,
# to support detection of mutually exclusive flags later on.
if not is_default:
params_assigned_from_user.add(param_name)
if param_name in params_assigned_from_default:
params_assigned_from_default.remove(param_name)
if is_default and param_name not in params_assigned_from_user:
params_assigned_from_default.add(param_name)

if ctx.parent:
_assign_params(ctx.parent, params_assigned_from_default, deprecated_env_vars)
_assign_params(
ctx.parent,
params_assigned_from_default,
params_assigned_from_user,
deprecated_env_vars,
)

params_assigned_from_user = set() # type: Set[str]
params_assigned_from_default = set() # type: Set[str]
deprecated_env_vars: Dict[str, Callable] = {}
_assign_params(ctx, params_assigned_from_default, deprecated_env_vars)
_assign_params(
ctx, params_assigned_from_default, params_assigned_from_user, deprecated_env_vars
)

# Set deprecated_env_var_warnings to be fired later after events have been init.
object.__setattr__(
Expand All @@ -198,7 +214,10 @@ def _assign_params(
invoked_subcommand.ignore_unknown_options = True
invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv)
_assign_params(
invoked_subcommand_ctx, params_assigned_from_default, deprecated_env_vars
invoked_subcommand_ctx,
params_assigned_from_default,
params_assigned_from_user,
deprecated_env_vars,
)

if not user_config:
Expand Down Expand Up @@ -347,7 +366,11 @@ def add_fn(x):
if k == "macro" and command == CliCommand.RUN_OPERATION:
add_fn(v)
# None is a Singleton, False is a Flyweight, only one instance of each.
elif v is None or v is False:
elif (v is None or v is False) and k not in (
# These are None by default but they do not support --no-{flag}
"defer_state",
"log_format",
):
add_fn(f"--no-{spinal_cased}")
elif v is True:
add_fn(f"--{spinal_cased}")
Expand Down
74 changes: 10 additions & 64 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,18 @@ def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult:
def global_flags(func):
@p.cache_selected_only
@p.debug
@p.defer
@p.deprecated_defer
@p.defer_state
@p.deprecated_favor_state
@p.deprecated_print
@p.deprecated_state
@p.enable_legacy_logger
@p.fail_fast
@p.favor_state
dbeatty10 marked this conversation as resolved.
Show resolved Hide resolved
@p.log_cache_events
@p.log_file_max_bytes
@p.log_format
@p.log_format_file
@p.log_level
@p.log_level_file
Expand All @@ -143,12 +150,15 @@ def global_flags(func):
@p.record_timing_info
@p.send_anonymous_usage_stats
@p.single_threaded
@p.state
@p.static_parser
@p.use_colors
@p.use_colors_file
@p.use_experimental_parser
@p.version
@p.version_check
@p.warn_error
@p.warn_error_options
@p.write_json
@functools.wraps(func)
def wrapper(*args, **kwargs):
Expand All @@ -166,9 +176,6 @@ def wrapper(*args, **kwargs):
)
@click.pass_context
@global_flags
@p.warn_error
@p.warn_error_options
@p.log_format
@p.show_resource_report
def cli(ctx, **kwargs):
"""An ELT tool for managing your SQL transformations and data models.
Expand All @@ -180,11 +187,7 @@ def cli(ctx, **kwargs):
@cli.command("build")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.include_saved_query
@p.indirect_selection
Expand All @@ -195,9 +198,6 @@ def cli(ctx, **kwargs):
@p.select
@p.selector
@p.show
@p.state
@p.defer_state
@p.deprecated_state
@p.store_failures
@p.target
@p.target_path
Expand Down Expand Up @@ -259,21 +259,14 @@ def docs(ctx, **kwargs):
@click.pass_context
@global_flags
@p.compile_docs
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.empty_catalog
@p.static
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -330,11 +323,7 @@ def docs_serve(ctx, **kwargs):
@cli.command("compile")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.show_output_format
@p.indirect_selection
Expand All @@ -346,9 +335,6 @@ def docs_serve(ctx, **kwargs):
@p.select
@p.selector
@p.inline
@p.state
@p.defer_state
@p.deprecated_state
@p.compile_inject_ephemeral_ctes
@p.target
@p.target_path
Expand Down Expand Up @@ -378,11 +364,7 @@ def compile(ctx, **kwargs):
@cli.command("show")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.full_refresh
@p.show_output_format
@p.show_limit
Expand All @@ -394,9 +376,6 @@ def compile(ctx, **kwargs):
@p.select
@p.selector
@p.inline
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -533,9 +512,6 @@ def init(ctx, **kwargs):
@p.resource_type
@p.raw_select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.vars
Expand Down Expand Up @@ -591,10 +567,6 @@ def parse(ctx, **kwargs):
@cli.command("run")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.favor_state
@p.deprecated_favor_state
@p.exclude
@p.full_refresh
@p.profile
Expand All @@ -603,9 +575,6 @@ def parse(ctx, **kwargs):
@p.empty
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -638,7 +607,6 @@ def run(ctx, **kwargs):
@p.vars
@p.profile
@p.target
@p.state
@p.threads
@requires.postflight
@requires.preflight
Expand All @@ -663,7 +631,6 @@ def retry(ctx, **kwargs):
@cli.command("clone")
@click.pass_context
@global_flags
@p.defer_state
@p.exclude
@p.full_refresh
@p.profile
Expand All @@ -672,7 +639,6 @@ def retry(ctx, **kwargs):
@p.resource_type
@p.select
@p.selector
@p.state # required
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -740,9 +706,6 @@ def run_operation(ctx, **kwargs):
@p.select
@p.selector
@p.show
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand All @@ -769,19 +732,12 @@ def seed(ctx, **kwargs):
@cli.command("snapshot")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -824,9 +780,6 @@ def source(ctx, **kwargs):
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
Expand Down Expand Up @@ -860,20 +813,13 @@ def freshness(ctx, **kwargs):
@cli.command("test")
@click.pass_context
@global_flags
@p.defer
@p.deprecated_defer
@p.exclude
@p.favor_state
@p.deprecated_favor_state
@p.indirect_selection
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.state
@p.defer_state
@p.deprecated_state
@p.store_failures
@p.target
@p.target_path
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,8 @@ def sql(self) -> Optional[str]:
if self.model.language == ModelLanguage.sql: # type: ignore[union-attr]
# If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod
# relation
if getattr(self.model, "defer_relation", None):
# TODO: avoid routing on args.which if possible
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
if getattr(self.model, "defer_relation", None) and self.config.args.which == "clone":
# TODO https://github.com/dbt-labs/dbt-core/issues/7976
return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr]
elif getattr(self.model, "extra_ctes_injected", None):
Expand Down
28 changes: 8 additions & 20 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ def is_invalid_protected_ref(
node.package_name != target_model.package_name and restrict_package_access
)

# Called by RunTask.defer_to_manifest
# Called by GraphRunnableTask.defer_to_manifest
def merge_from_artifact(
self,
adapter,
Expand Down Expand Up @@ -1369,6 +1369,13 @@ def merge_from_artifact(
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)

# for all other nodes, add 'defer_relation'
elif current and node.resource_type in refables and not node.is_ephemeral:
defer_relation = DeferRelation(
node.database, node.schema, node.alias, node.relation_name
)
self.nodes[unique_id] = current.replace(defer_relation=defer_relation)

# Rebuild the flat_graph, which powers the 'graph' context variable,
# now that we've deferred some nodes
self.build_flat_graph()
Expand All @@ -1377,25 +1384,6 @@ def merge_from_artifact(
sample = list(islice(merged, 5))
fire_event(MergedFromState(num_merged=len(merged), sample=sample))

# Called by CloneTask.defer_to_manifest
def add_from_artifact(
self,
other: "WritableManifest",
) -> None:
"""Update this manifest by *adding* information about each node's location
in the other manifest.

Only non-ephemeral refable nodes are examined.
"""
refables = set(NodeType.refable())
for unique_id, node in other.nodes.items():
current = self.nodes.get(unique_id)
if current and (node.resource_type in refables and not node.is_ephemeral):
defer_relation = DeferRelation(
node.database, node.schema, node.alias, node.relation_name
)
self.nodes[unique_id] = current.replace(defer_relation=defer_relation)

# Methods that were formerly in ParseResult

def add_macro(self, source_file: SourceFile, macro: Macro):
Expand Down
Loading