From 096596b4a0241719b6104dbca07b7473e8af3203 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 21 Jan 2025 15:02:00 +0500 Subject: [PATCH] Add termination reason and message to the runner API (#2204) * Introduce TerminationReason and JobState types * Handle runner API not avaiable when stopping Maybe relevant for local runner when the runner container or shim was stopped * Set max duration exceeded in termination message * Add max_duration_exceeded termination reason * Update shim OpenAPI spec * Revert using TerminationReason enum in shim The shim may expect any termination reason from the server * Send termination_reason.value to shim --- runner/consts/states/state.go | 9 --- runner/docs/shim.openapi.yaml | 13 +-- runner/internal/executor/base.go | 9 ++- runner/internal/executor/executor.go | 47 ++++++++--- runner/internal/schemas/schemas.go | 12 ++- runner/internal/shim/docker.go | 17 ++-- runner/internal/shim/task.go | 2 +- runner/internal/types/types.go | 23 ++++++ src/dstack/_internal/core/models/runs.py | 2 + .../background/tasks/process_running_jobs.py | 20 ++--- ..._add_jobterminationreason_max_duration_.py | 81 +++++++++++++++++++ src/dstack/_internal/server/schemas/runner.py | 2 + .../server/services/jobs/__init__.py | 8 +- 13 files changed, 193 insertions(+), 52 deletions(-) delete mode 100644 runner/consts/states/state.go create mode 100644 runner/internal/types/types.go create mode 100644 src/dstack/_internal/server/migrations/versions/543d78854db7_add_jobterminationreason_max_duration_.py diff --git a/runner/consts/states/state.go b/runner/consts/states/state.go deleted file mode 100644 index a1a682313..000000000 --- a/runner/consts/states/state.go +++ /dev/null @@ -1,9 +0,0 @@ -package states - -const ( - Done = "done" - Failed = "failed" - Running = "running" - Terminated = "terminated" - Terminating = "terminating" -) diff --git a/runner/docs/shim.openapi.yaml b/runner/docs/shim.openapi.yaml index f46071f56..9cd818b84 100644 --- a/runner/docs/shim.openapi.yaml +++ b/runner/docs/shim.openapi.yaml @@ -177,12 +177,13 @@ components: TerminationReason: type: string enum: - - EXECUTOR_ERROR - - CREATING_CONTAINER_ERROR - - CONTAINER_EXITED_WITH_ERROR - - DONE_BY_RUNNER - - TERMINATED_BY_USER - - TERMINATED_BY_SERVER + - executor_error + - creating_container_error + - container_exited_with_error + - done_by_runner + - terminated_by_user + - terminated_by_server + - max_duration_exceeded GpuID: description: > diff --git a/runner/internal/executor/base.go b/runner/internal/executor/base.go index bf3eeb291..a8e150552 100644 --- a/runner/internal/executor/base.go +++ b/runner/internal/executor/base.go @@ -4,6 +4,7 @@ import ( "context" "github.com/dstackai/dstack/runner/internal/schemas" + "github.com/dstackai/dstack/runner/internal/types" ) type Executor interface { @@ -13,7 +14,13 @@ type Executor interface { Run(ctx context.Context) error SetCodePath(codePath string) SetJob(job schemas.SubmitBody) - SetJobState(ctx context.Context, state string) + SetJobState(ctx context.Context, state types.JobState) + SetJobStateWithTerminationReason( + ctx context.Context, + state types.JobState, + termination_reason types.TerminationReason, + termination_message string, + ) SetRunnerState(state string) Lock() RLock() diff --git a/runner/internal/executor/executor.go b/runner/internal/executor/executor.go index f0bbf26b6..6e7666436 100644 --- a/runner/internal/executor/executor.go +++ b/runner/internal/executor/executor.go @@ -18,10 +18,10 @@ import ( "github.com/creack/pty" "github.com/dstackai/dstack/runner/consts" - "github.com/dstackai/dstack/runner/consts/states" "github.com/dstackai/dstack/runner/internal/gerrors" "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/schemas" + "github.com/dstackai/dstack/runner/internal/types" ) type RunExecutor struct { @@ -79,14 +79,14 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string) (*RunExec func (ex *RunExecutor) Run(ctx context.Context) (err error) { runnerLogFile, err := log.CreateAppendFile(filepath.Join(ex.tempDir, consts.RunnerLogFileName)) if err != nil { - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) return gerrors.Wrap(err) } defer func() { _ = runnerLogFile.Close() }() jobLogFile, err := log.CreateAppendFile(filepath.Join(ex.tempDir, consts.RunnerJobLogFileName)) if err != nil { - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) return gerrors.Wrap(err) } defer func() { _ = jobLogFile.Close() }() @@ -95,7 +95,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { // recover goes after runnerLogFile.Close() to keep the log if r := recover(); r != nil { log.Error(ctx, "Executor PANIC", "err", r) - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) err = gerrors.Newf("recovered: %v", r) } // no more logs will be written after this @@ -115,17 +115,17 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String()) if err := ex.setupRepo(ctx); err != nil { - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) return gerrors.Wrap(err) } cleanupCredentials, err := ex.setupCredentials(ctx) if err != nil { - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) return gerrors.Wrap(err) } defer cleanupCredentials() - ex.SetJobState(ctx, states.Running) + ex.SetJobState(ctx, types.JobStateRunning) timeoutCtx := ctx var cancelTimeout context.CancelFunc if ex.jobSpec.MaxDuration != 0 { @@ -136,7 +136,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { select { case <-ctx.Done(): log.Error(ctx, "Job canceled") - ex.SetJobState(ctx, states.Terminated) + ex.SetJobState(ctx, types.JobStateTerminated) return gerrors.Wrap(err) default: } @@ -144,18 +144,25 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { select { case <-timeoutCtx.Done(): log.Error(ctx, "Max duration exceeded", "max_duration", ex.jobSpec.MaxDuration) - ex.SetJobState(ctx, states.Terminated) + // We do not set "max_duration_exceeded" termination reason yet for backward compatibility + // TODO: Set it several releases after 0.18.36 + ex.SetJobStateWithTerminationReason( + ctx, + types.JobStateTerminated, + types.TerminationReasonContainerExitedWithError, + "Max duration exceeded", + ) return gerrors.Wrap(err) default: } // todo fail reason? log.Error(ctx, "Exec failed", "err", err) - ex.SetJobState(ctx, states.Failed) + ex.SetJobState(ctx, types.JobStateFailed) return gerrors.Wrap(err) } - ex.SetJobState(ctx, states.Done) + ex.SetJobState(ctx, types.JobStateDone) return nil } @@ -173,9 +180,23 @@ func (ex *RunExecutor) SetCodePath(codePath string) { ex.state = WaitRun } -func (ex *RunExecutor) SetJobState(ctx context.Context, state string) { +func (ex *RunExecutor) SetJobState(ctx context.Context, state types.JobState) { + ex.SetJobStateWithTerminationReason(ctx, state, "", "") +} + +func (ex *RunExecutor) SetJobStateWithTerminationReason( + ctx context.Context, state types.JobState, termination_reason types.TerminationReason, termination_message string, +) { ex.mu.Lock() - ex.jobStateHistory = append(ex.jobStateHistory, schemas.JobStateEvent{State: state, Timestamp: ex.timestamp.Next()}) + ex.jobStateHistory = append( + ex.jobStateHistory, + schemas.JobStateEvent{ + State: state, + Timestamp: ex.timestamp.Next(), + TerminationReason: termination_reason, + TerminationMessage: termination_message, + }, + ) ex.mu.Unlock() log.Info(ctx, "Job state changed", "new", state) } diff --git a/runner/internal/schemas/schemas.go b/runner/internal/schemas/schemas.go index d152738cd..a51be87fc 100644 --- a/runner/internal/schemas/schemas.go +++ b/runner/internal/schemas/schemas.go @@ -1,10 +1,16 @@ package schemas -import "strings" +import ( + "strings" + + "github.com/dstackai/dstack/runner/internal/types" +) type JobStateEvent struct { - State string `json:"state"` - Timestamp int64 `json:"timestamp"` + State types.JobState `json:"state"` + Timestamp int64 `json:"timestamp"` + TerminationReason types.TerminationReason `json:"termination_reason"` + TerminationMessage string `json:"termination_message"` } type LogEvent struct { diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index ab01b79f0..f82f02984 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -33,6 +33,7 @@ import ( "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/shim/backends" "github.com/dstackai/dstack/runner/internal/shim/host" + "github.com/dstackai/dstack/runner/internal/types" bytesize "github.com/inhies/go-bytesize" "github.com/ztrue/tracerr" ) @@ -260,7 +261,7 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { gpuIDs, err := d.gpuLock.Acquire(ctx, cfg.GPU) if err != nil { log.Error(ctx, err.Error()) - task.SetStatusTerminated("EXECUTOR_ERROR", err.Error()) + task.SetStatusTerminated(string(types.TerminationReasonExecutorError), err.Error()) return tracerr.Wrap(err) } task.gpuIDs = gpuIDs @@ -279,7 +280,7 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err := ak.AppendPublicKeys(cfg.HostSshKeys); err != nil { errMessage := fmt.Sprintf("ak.AppendPublicKeys error: %s", err.Error()) log.Error(ctx, errMessage) - task.SetStatusTerminated("EXECUTOR_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage) return tracerr.Wrap(err) } defer func(cfg TaskConfig) { @@ -299,14 +300,14 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err != nil { errMessage := fmt.Sprintf("prepareVolumes error: %s", err.Error()) log.Error(ctx, errMessage) - task.SetStatusTerminated("EXECUTOR_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage) return tracerr.Wrap(err) } err = prepareInstanceMountPoints(cfg) if err != nil { errMessage := fmt.Sprintf("prepareInstanceMountPoints error: %s", err.Error()) log.Error(ctx, errMessage) - task.SetStatusTerminated("EXECUTOR_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage) return tracerr.Wrap(err) } @@ -320,7 +321,7 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err = pullImage(pullCtx, d.client, cfg); err != nil { errMessage := fmt.Sprintf("pullImage error: %s", err.Error()) log.Error(ctx, errMessage) - task.SetStatusTerminated("CREATING_CONTAINER_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonCreatingContainerError), errMessage) return tracerr.Wrap(err) } @@ -332,7 +333,7 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err := d.createContainer(ctx, &task); err != nil { errMessage := fmt.Sprintf("createContainer error: %s", err.Error()) log.Error(ctx, errMessage) - task.SetStatusTerminated("CREATING_CONTAINER_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonCreatingContainerError), errMessage) return tracerr.Wrap(err) } @@ -358,12 +359,12 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { log.Error(ctx, "getContainerLastLogs error", "err", err) errMessage = "" } - task.SetStatusTerminated("CONTAINER_EXITED_WITH_ERROR", errMessage) + task.SetStatusTerminated(string(types.TerminationReasonContainerExitedWithError), errMessage) return tracerr.Wrap(err) } log.Debug(ctx, "Container finished successfully", "task", task.ID, "name", task.containerName) - task.SetStatusTerminated("DONE_BY_RUNNER", "") + task.SetStatusTerminated(string(types.TerminationReasonDoneByRunner), "") return nil } diff --git a/runner/internal/shim/task.go b/runner/internal/shim/task.go index 5cd297925..cd2cd9265 100644 --- a/runner/internal/shim/task.go +++ b/runner/internal/shim/task.go @@ -31,7 +31,7 @@ const ( type Task struct { ID string Status TaskStatus - TerminationReason string // TODO: enum + TerminationReason string TerminationMessage string config TaskConfig diff --git a/runner/internal/types/types.go b/runner/internal/types/types.go new file mode 100644 index 000000000..e8a9519eb --- /dev/null +++ b/runner/internal/types/types.go @@ -0,0 +1,23 @@ +package types + +type TerminationReason string + +const ( + TerminationReasonExecutorError TerminationReason = "executor_error" + TerminationReasonCreatingContainerError TerminationReason = "creating_container_error" + TerminationReasonContainerExitedWithError TerminationReason = "container_exited_with_error" + TerminationReasonDoneByRunner TerminationReason = "done_by_runner" + TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" + TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" + TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" +) + +type JobState string + +const ( + JobStateDone JobState = "done" + JobStateFailed JobState = "failed" + JobStateRunning JobState = "running" + JobStateTerminated JobState = "terminated" + JobStateTerminating JobState = "terminating" +) diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 77115157a..a2f1e25e3 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -117,6 +117,7 @@ class JobTerminationReason(str, Enum): PORTS_BINDING_FAILED = "ports_binding_failed" CREATING_CONTAINER_ERROR = "creating_container_error" EXECUTOR_ERROR = "executor_error" + MAX_DURATION_EXCEEDED = "max_duration_exceeded" def to_status(self) -> JobStatus: mapping = { @@ -135,6 +136,7 @@ def to_status(self) -> JobStatus: self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, self.EXECUTOR_ERROR: JobStatus.FAILED, + self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED, } return mapping[self] diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 40bb2fbef..9172c0116 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -514,7 +514,7 @@ def _process_pulling_with_shim( task.termination_message, ) logger.debug("task status: %s", task.dict()) - job_model.termination_reason = JobTerminationReason[task.termination_reason.upper()] + job_model.termination_reason = JobTerminationReason(task.termination_reason.lower()) job_model.termination_reason_message = task.termination_message return False @@ -547,7 +547,7 @@ def _process_pulling_with_shim( shim_status.result.reason_message, ) logger.debug("shim status: %s", shim_status.dict()) - job_model.termination_reason = JobTerminationReason[shim_status.result.reason.upper()] + job_model.termination_reason = JobTerminationReason(shim_status.result.reason.lower()) job_model.termination_reason_message = shim_status.result.reason_message return False @@ -598,18 +598,20 @@ def _process_running( job_logs=resp.job_logs, ) if len(resp.job_states) > 0: - latest_status = resp.job_states[-1].state - # TODO(egor-s): refactor dstack-runner to return compatible statuses and reasons + latest_state_event = resp.job_states[-1] + latest_status = latest_state_event.state if latest_status == JobStatus.DONE: job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.DONE_BY_RUNNER - # let the CLI pull logs? - # delay_job_instance_termination(job_model) - elif latest_status in {JobStatus.FAILED, JobStatus.ABORTED, JobStatus.TERMINATED}: + elif latest_status in {JobStatus.FAILED, JobStatus.TERMINATED}: job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.CONTAINER_EXITED_WITH_ERROR - # let the CLI pull logs? - # delay_job_instance_termination(job_model) + if latest_state_event.termination_reason: + job_model.termination_reason = JobTerminationReason( + latest_state_event.termination_reason.lower() + ) + if latest_state_event.termination_message: + job_model.termination_reason_message = latest_state_event.termination_message logger.info("%s: now is %s", fmt(job_model), job_model.status.name) return True diff --git a/src/dstack/_internal/server/migrations/versions/543d78854db7_add_jobterminationreason_max_duration_.py b/src/dstack/_internal/server/migrations/versions/543d78854db7_add_jobterminationreason_max_duration_.py new file mode 100644 index 000000000..48980d667 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/543d78854db7_add_jobterminationreason_max_duration_.py @@ -0,0 +1,81 @@ +# """Add JobTerminationReason.MAX_DURATION_EXCEEDED + +# Revision ID: ffa99edd1988 +# Revises: 803c7e9ed85d +# Create Date: 2025-01-21 10:53:22.338540 + +# """ + +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision = "ffa99edd1988" +down_revision = "803c7e9ed85d" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + "public", + "jobterminationreason", + [ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + ], + [ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + "public", + "jobterminationreason", + [ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + ], + [ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index c05090909..3e7a40a0b 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -14,6 +14,8 @@ class JobStateEvent(CoreModel): timestamp: int state: JobStatus + termination_reason: Optional[str] = None + termination_message: Optional[str] = None class LogEvent(CoreModel): diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 5adc75570..5101ad565 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -4,6 +4,7 @@ from datetime import timezone from typing import Dict, Iterable, List, Optional, Tuple +import requests from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload @@ -188,7 +189,10 @@ def _stop_runner( ): logger.debug("%s: stopping runner", fmt(job_model)) runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT]) - runner_client.stop() + try: + runner_client.stop() + except requests.RequestException: + logger.exception("%s: failed to stop runner gracefully", fmt(job_model)) async def process_terminating_job(session: AsyncSession, job_model: JobModel): @@ -306,7 +310,7 @@ def _shim_submit_stop(ports: Dict[int, int], job_model: JobModel): if job_model.termination_reason is None: reason = None else: - reason = job_model.termination_reason.name + reason = job_model.termination_reason.value shim_client.terminate_task( task_id=job_model.id, reason=reason,