From 7476a7663fcf8fff238f1a79a7f2c2a237e33f4d Mon Sep 17 00:00:00 2001 From: timchap <13381054+timchap@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:16:48 +0100 Subject: [PATCH 1/2] fix: add cloud run job timeout to instance config (#88) add basic docs --- libraries/dagster-contrib-gcp/README.md | 35 +++++++++++++++++++ .../cloud_run/run_launcher.py | 11 ++++-- .../cloud_run_tests/conftest.py | 1 + .../cloud_run_tests/test_run_launcher.py | 2 ++ libraries/dagster-contrib-gcp/uv.lock | 4 +-- 5 files changed, 49 insertions(+), 4 deletions(-) diff --git a/libraries/dagster-contrib-gcp/README.md b/libraries/dagster-contrib-gcp/README.md index 9bed740..2e79f63 100644 --- a/libraries/dagster-contrib-gcp/README.md +++ b/libraries/dagster-contrib-gcp/README.md @@ -11,3 +11,38 @@ make test ```sh make build ``` + +## Overview + +This package provides integrations with Google Cloud Platform (GCP) services. It currently includes the following +integrations: + +### Cloud Run + +#### Cloud Run run launcher + +Adds support for launching Dagster runs on Google Cloud Run. Usage is as follows: + +1. Create a Cloud Run Job from your Dagster code location image to act as the run worker. If you require multiple + environments/code locations, you can create multiple Cloud Run Jobs. +2. Add `dagster-contrib-gcp` to your Dagster webserver/daemon environment. +3. Add the following configuration to your Dagster instance YAML: + +```yaml +run_launcher: + module: dagster_contrib_gcp.cloud_run.run_launcher + class: CloudRunRunLauncher + config: + project: + env: GOOGLE_CLOUD_PROJECT + region: + env: GOOGLE_CLOUD_REGION + job_name_by_code_location: + my-code-location-1: my-cloud-run-job-1 + my-code-location-2: my-cloud-run-job-2 +``` + +Additional steps may be required for configuring IAM permissions, etc. In particular: +- Ensure that the webserver/daemon environment has the necessary permissions to execute the Cloud Run jobs +- Ensure that the Cloud Run run worker jobs have the necessary permissions to execute your Dagster runs +See the [Cloud Run documentation](https://cloud.google.com/run/docs) for more information. \ No newline at end of file diff --git a/libraries/dagster-contrib-gcp/dagster_contrib_gcp/cloud_run/run_launcher.py b/libraries/dagster-contrib-gcp/dagster_contrib_gcp/cloud_run/run_launcher.py index e5a2b37..53e5243 100644 --- a/libraries/dagster-contrib-gcp/dagster_contrib_gcp/cloud_run/run_launcher.py +++ b/libraries/dagster-contrib-gcp/dagster_contrib_gcp/cloud_run/run_launcher.py @@ -39,6 +39,7 @@ def __init__( region: str, job_name_by_code_location: "dict[str, str]", run_job_retry: "dict[str, int]", + run_timeout: int, inst_data: Optional[ConfigurableClassData] = None, ): self._inst_data = inst_data @@ -48,6 +49,7 @@ def __init__( self.run_job_retry_wait = run_job_retry["wait"] self.run_job_retry_timeout = run_job_retry["timeout"] + self.run_timeout = run_timeout self.jobs_client = run_v2.JobsClient() self.executions_client = run_v2.ExecutionsClient() @@ -102,7 +104,6 @@ def create_execution(self, code_location_name: str, args: Sequence[str]): def execute_job( self, fully_qualified_job_name: str, - timeout: str = "3600s", args: Optional[Sequence[str]] = None, env: Optional["dict[str, str]"] = None, ) -> Operation: @@ -119,7 +120,7 @@ def execute_job( container_overrides = [RunJobRequest.Overrides.ContainerOverride(**overrides)] request.overrides.container_overrides.extend(container_overrides) - request.overrides.timeout = timeout # pyright: ignore + request.overrides.timeout = f"{self.run_timeout}s" # pyright: ignore @tenacity.retry( wait=tenacity.wait_fixed(self.run_job_retry_wait), @@ -215,6 +216,12 @@ def config_type(cls) -> "UserConfigSchema": "Admin API quota is quite low, which makes retries more likely." ), ), + "run_timeout": Field( + int, + is_required=False, + default_value=3600, + description="Timeout for the Cloud Run job execution in seconds", + ), } @classmethod diff --git a/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/conftest.py b/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/conftest.py index 6d39f4e..abae1f4 100644 --- a/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/conftest.py +++ b/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/conftest.py @@ -44,6 +44,7 @@ def instance( "region": "test_region", "job_name_by_code_location": {IN_PROCESS_NAME: "test_job_name"}, "run_job_retry": {"wait": 1, "timeout": 60}, + "run_timeout": 7200, } ) as dagster_instance: yield dagster_instance diff --git a/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/test_run_launcher.py b/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/test_run_launcher.py index 6af0272..2f604ab 100644 --- a/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/test_run_launcher.py +++ b/libraries/dagster-contrib-gcp/dagster_contrib_gcp_tests/cloud_run_tests/test_run_launcher.py @@ -11,6 +11,7 @@ GetExecutionRequest, RunJobRequest, ) +import datetime as dt def test_launch_run( @@ -33,6 +34,7 @@ def test_launch_run( assert ( args[0].name == "projects/test_project/locations/test_region/jobs/test_job_name" ) + assert args[0].overrides.timeout == dt.timedelta(seconds=7200) def test_terminate( diff --git a/libraries/dagster-contrib-gcp/uv.lock b/libraries/dagster-contrib-gcp/uv.lock index 3a33437..82c5330 100644 --- a/libraries/dagster-contrib-gcp/uv.lock +++ b/libraries/dagster-contrib-gcp/uv.lock @@ -222,8 +222,8 @@ wheels = [ [[package]] name = "dagster-contrib-gcp" -version = "0.0.1" -source = { virtual = "." } +version = "0.0.2" +source = { editable = "." } dependencies = [ { name = "dagster" }, { name = "google-cloud-run" }, From 6ef3dd16f7f5c3d8151e1c2e00c4d6ec51a1a234 Mon Sep 17 00:00:00 2001 From: colton Date: Mon, 6 Jan 2025 10:19:38 -0500 Subject: [PATCH 2/2] [dagster-contrib-gcp] bump 0.0.3; add changelog entry (#89) --- libraries/dagster-contrib-gcp/CHANGELOG.md | 7 +++++++ libraries/dagster-contrib-gcp/pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 libraries/dagster-contrib-gcp/CHANGELOG.md diff --git a/libraries/dagster-contrib-gcp/CHANGELOG.md b/libraries/dagster-contrib-gcp/CHANGELOG.md new file mode 100644 index 0000000..dea2e01 --- /dev/null +++ b/libraries/dagster-contrib-gcp/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## 0.0.3 + +### Updated + +- (pull/88) Added job timeout as dagster.yaml instance config diff --git a/libraries/dagster-contrib-gcp/pyproject.toml b/libraries/dagster-contrib-gcp/pyproject.toml index fac14cf..c9c2488 100644 --- a/libraries/dagster-contrib-gcp/pyproject.toml +++ b/libraries/dagster-contrib-gcp/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dagster_contrib_gcp" -version = "0.0.2" +version = "0.0.3" description = "Community contributed Google Cloud Platform integration" readme = "README.md" requires-python = ">=3.9"