Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add cloud run job timeout to instance config #88

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions libraries/dagster-contrib-gcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,38 @@ make test
```sh
make build
```

## Overview

This package provides integrations with Google Cloud Platform (GCP) services. It currently includes the following
integrations:

### Cloud Run

#### Cloud Run run launcher

Adds support for launching Dagster runs on Google Cloud Run. Usage is as follows:

1. Create a Cloud Run Job from your Dagster code location image to act as the run worker. If you require multiple
environments/code locations, you can create multiple Cloud Run Jobs.
2. Add `dagster-contrib-gcp` to your Dagster webserver/daemon environment.
3. Add the following configuration to your Dagster instance YAML:

```yaml
run_launcher:
module: dagster_contrib_gcp.cloud_run.run_launcher
class: CloudRunRunLauncher
config:
project:
env: GOOGLE_CLOUD_PROJECT
region:
env: GOOGLE_CLOUD_REGION
job_name_by_code_location:
my-code-location-1: my-cloud-run-job-1
my-code-location-2: my-cloud-run-job-2
```

Additional steps may be required for configuring IAM permissions, etc. In particular:
- Ensure that the webserver/daemon environment has the necessary permissions to execute the Cloud Run jobs
- Ensure that the Cloud Run run worker jobs have the necessary permissions to execute your Dagster runs
See the [Cloud Run documentation](https://cloud.google.com/run/docs) for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
region: str,
job_name_by_code_location: "dict[str, str]",
run_job_retry: "dict[str, int]",
run_timeout: int,
inst_data: Optional[ConfigurableClassData] = None,
):
self._inst_data = inst_data
Expand All @@ -48,6 +49,7 @@ def __init__(

self.run_job_retry_wait = run_job_retry["wait"]
self.run_job_retry_timeout = run_job_retry["timeout"]
self.run_timeout = run_timeout

self.jobs_client = run_v2.JobsClient()
self.executions_client = run_v2.ExecutionsClient()
Expand Down Expand Up @@ -102,7 +104,6 @@ def create_execution(self, code_location_name: str, args: Sequence[str]):
def execute_job(
self,
fully_qualified_job_name: str,
timeout: str = "3600s",
args: Optional[Sequence[str]] = None,
env: Optional["dict[str, str]"] = None,
) -> Operation:
Expand All @@ -119,7 +120,7 @@ def execute_job(
container_overrides = [RunJobRequest.Overrides.ContainerOverride(**overrides)]

request.overrides.container_overrides.extend(container_overrides)
request.overrides.timeout = timeout # pyright: ignore
request.overrides.timeout = f"{self.run_timeout}s" # pyright: ignore

@tenacity.retry(
wait=tenacity.wait_fixed(self.run_job_retry_wait),
Expand Down Expand Up @@ -215,6 +216,12 @@ def config_type(cls) -> "UserConfigSchema":
"Admin API quota is quite low, which makes retries more likely."
),
),
"run_timeout": Field(
int,
is_required=False,
default_value=3600,
description="Timeout for the Cloud Run job execution in seconds",
),
}

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def instance(
"region": "test_region",
"job_name_by_code_location": {IN_PROCESS_NAME: "test_job_name"},
"run_job_retry": {"wait": 1, "timeout": 60},
"run_timeout": 7200,
}
) as dagster_instance:
yield dagster_instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
GetExecutionRequest,
RunJobRequest,
)
import datetime as dt


def test_launch_run(
Expand All @@ -33,6 +34,7 @@ def test_launch_run(
assert (
args[0].name == "projects/test_project/locations/test_region/jobs/test_job_name"
)
assert args[0].overrides.timeout == dt.timedelta(seconds=7200)


def test_terminate(
Expand Down
4 changes: 2 additions & 2 deletions libraries/dagster-contrib-gcp/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading