Skip to content

Commit

Permalink
[dg] add definitions validate command
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Feb 14, 2025
1 parent 30db274 commit ee23a34
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dagster_dg.cli.component import component_group
from dagster_dg.cli.component_type import component_type_group
from dagster_dg.cli.deployment import deployment_group
from dagster_dg.cli.dev import dev_command
from dagster_dg.cli.dev import definitions_cli, dev_command
from dagster_dg.cli.global_options import dg_global_options
from dagster_dg.component import RemoteComponentRegistry
from dagster_dg.config import normalize_cli_config
Expand All @@ -26,6 +26,7 @@ def create_dg_cli():
"component-type": component_type_group,
"deployment": deployment_group,
"dev": dev_command,
"definitions": definitions_cli,
},
context_settings={
"max_content_width": DG_CLI_MAX_OUTPUT_WIDTH,
Expand Down
100 changes: 99 additions & 1 deletion python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional, TypeVar
from typing import Any, Optional, TypeVar

import click
import psutil
Expand All @@ -18,6 +18,7 @@
from dagster_dg.context import DgContext
from dagster_dg.error import DgError
from dagster_dg.utils import DgClickCommand, exit_with_error, get_uv_run_executable_path, pushd
from dagster_dg.utils.filesystem import watch_paths

T = TypeVar("T")

Expand Down Expand Up @@ -216,3 +217,100 @@ def _open_subprocess(command: Sequence[str]) -> "subprocess.Popen":
command,
creationflags=creationflags,
)


@click.group(name="definitions")
def definitions_cli():
"""Commands for working with Dagster definitions."""


@definitions_cli.command(name="validate", cls=DgClickCommand)
@click.option(
"--log-level",
help="Set the log level for dagster services.",
show_default=True,
default="info",
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
)
@click.option(
"--log-format",
type=click.Choice(["colored", "json", "rich"], case_sensitive=False),
show_default=True,
required=False,
default="colored",
help="Format of the logs for dagster services",
)
@click.option("--watch", is_flag=True, help="Watch for changes and validate on change.")
@dg_global_options
@click.pass_context
def validate_command(
context: click.Context,
log_level: str,
log_format: str,
watch: bool,
**global_options: Mapping[str, object],
) -> None:
"""Loads and validates your Dagster definitions using a Dagster instance.
If run inside a deployment directory, this command will launch all code locations in the
deployment. If launched inside a code location directory, it will launch only that code
location.
When running, this command sets the environment variable `DAGSTER_IS_DEFS_VALIDATION_CLI=1`.
This environment variable can be used to control the behavior of your code in validation mode.
This command returns an exit code 1 when errors are found, otherwise an exit code 0.
"""
cli_config = normalize_cli_config(global_options, context)
dg_context = DgContext.for_deployment_or_code_location_environment(Path.cwd(), cli_config)

forward_options = [
*_format_forwarded_option("--log-level", log_level),
*_format_forwarded_option("--log-format", log_format),
]

# In a code location context, we can just run `dagster definitions validate` directly, using `dagster` from the
# code location's environment.
if dg_context.is_code_location:
cmd = ["uv", "run", "dagster", "definitions", "validate", *forward_options]
cmd_location = get_uv_run_executable_path("dagster")
temp_workspace_file_cm = nullcontext()
watched_paths = [dg_context.root_path] + dg_context.component_registry_paths()

# In a deployment context, dg validate will construct a temporary
# workspace file that points at all defined code locations and invoke:
#
# uv tool run --with dagster-webserver dagster definitions validate
elif dg_context.is_deployment:
cmd = [
"uv",
"tool",
"run",
"dagster",
"definitions",
"validate",
*forward_options,
]
cmd_location = "ephemeral dagster definitions validate"
temp_workspace_file_cm = _temp_workspace_file(dg_context)
watched_paths = [dg_context.root_path] + dg_context.component_registry_paths()
else:
exit_with_error("This command must be run inside a code location or deployment directory.")

with pushd(dg_context.root_path), temp_workspace_file_cm as workspace_file:
print(f"Using {cmd_location}") # noqa: T201
if workspace_file: # only non-None deployment context
cmd.extend(["--workspace", workspace_file])

if watch:

def run_validate(_: Any = None):
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
pass

watch_paths(watched_paths, run_validate)
else:
subprocess.run(cmd, check=True)

0 comments on commit ee23a34

Please sign in to comment.