From ee23a34d21320086c311712cbffc5c16b77be9f8 Mon Sep 17 00:00:00 2001 From: benpankow Date: Fri, 14 Feb 2025 15:26:50 -0800 Subject: [PATCH] [dg] add definitions validate command --- .../dagster-dg/dagster_dg/cli/__init__.py | 3 +- .../dagster-dg/dagster_dg/cli/dev.py | 100 +++++++++++++++++- 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py index 829dde8a09123..956dc08302ef4 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py @@ -6,7 +6,7 @@ from dagster_dg.cli.component import component_group from dagster_dg.cli.component_type import component_type_group from dagster_dg.cli.deployment import deployment_group -from dagster_dg.cli.dev import dev_command +from dagster_dg.cli.dev import definitions_cli, dev_command from dagster_dg.cli.global_options import dg_global_options from dagster_dg.component import RemoteComponentRegistry from dagster_dg.config import normalize_cli_config @@ -26,6 +26,7 @@ def create_dg_cli(): "component-type": component_type_group, "deployment": deployment_group, "dev": dev_command, + "definitions": definitions_cli, }, context_settings={ "max_content_width": DG_CLI_MAX_OUTPUT_WIDTH, diff --git a/python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py b/python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py index 8a109a2457835..37c394f39f8a3 100644 --- a/python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py +++ b/python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py @@ -7,7 +7,7 @@ from contextlib import contextmanager, nullcontext from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Optional, TypeVar +from typing import Any, Optional, TypeVar import click import psutil @@ -18,6 +18,7 @@ from dagster_dg.context import DgContext from dagster_dg.error import DgError from dagster_dg.utils import DgClickCommand, exit_with_error, get_uv_run_executable_path, pushd +from dagster_dg.utils.filesystem import watch_paths T = TypeVar("T") @@ -216,3 +217,100 @@ def _open_subprocess(command: Sequence[str]) -> "subprocess.Popen": command, creationflags=creationflags, ) + + +@click.group(name="definitions") +def definitions_cli(): + """Commands for working with Dagster definitions.""" + + +@definitions_cli.command(name="validate", cls=DgClickCommand) +@click.option( + "--log-level", + help="Set the log level for dagster services.", + show_default=True, + default="info", + type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False), +) +@click.option( + "--log-format", + type=click.Choice(["colored", "json", "rich"], case_sensitive=False), + show_default=True, + required=False, + default="colored", + help="Format of the logs for dagster services", +) +@click.option("--watch", is_flag=True, help="Watch for changes and validate on change.") +@dg_global_options +@click.pass_context +def validate_command( + context: click.Context, + log_level: str, + log_format: str, + watch: bool, + **global_options: Mapping[str, object], +) -> None: + """Loads and validates your Dagster definitions using a Dagster instance. + + If run inside a deployment directory, this command will launch all code locations in the + deployment. If launched inside a code location directory, it will launch only that code + location. + + When running, this command sets the environment variable `DAGSTER_IS_DEFS_VALIDATION_CLI=1`. + This environment variable can be used to control the behavior of your code in validation mode. + + This command returns an exit code 1 when errors are found, otherwise an exit code 0. + + """ + cli_config = normalize_cli_config(global_options, context) + dg_context = DgContext.for_deployment_or_code_location_environment(Path.cwd(), cli_config) + + forward_options = [ + *_format_forwarded_option("--log-level", log_level), + *_format_forwarded_option("--log-format", log_format), + ] + + # In a code location context, we can just run `dagster definitions validate` directly, using `dagster` from the + # code location's environment. + if dg_context.is_code_location: + cmd = ["uv", "run", "dagster", "definitions", "validate", *forward_options] + cmd_location = get_uv_run_executable_path("dagster") + temp_workspace_file_cm = nullcontext() + watched_paths = [dg_context.root_path] + dg_context.component_registry_paths() + + # In a deployment context, dg validate will construct a temporary + # workspace file that points at all defined code locations and invoke: + # + # uv tool run --with dagster-webserver dagster definitions validate + elif dg_context.is_deployment: + cmd = [ + "uv", + "tool", + "run", + "dagster", + "definitions", + "validate", + *forward_options, + ] + cmd_location = "ephemeral dagster definitions validate" + temp_workspace_file_cm = _temp_workspace_file(dg_context) + watched_paths = [dg_context.root_path] + dg_context.component_registry_paths() + else: + exit_with_error("This command must be run inside a code location or deployment directory.") + + with pushd(dg_context.root_path), temp_workspace_file_cm as workspace_file: + print(f"Using {cmd_location}") # noqa: T201 + if workspace_file: # only non-None deployment context + cmd.extend(["--workspace", workspace_file]) + + if watch: + + def run_validate(_: Any = None): + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError: + pass + + watch_paths(watched_paths, run_validate) + else: + subprocess.run(cmd, check=True)