Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement volumes force detach #2242

Merged
merged 8 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ def apply_configuration(
backends=profile.backends,
regions=profile.regions,
instance_types=profile.instance_types,
reservation=profile.reservation,
spot_policy=profile.spot_policy,
retry_policy=profile.retry_policy,
max_duration=profile.max_duration,
stop_duration=profile.stop_duration,
max_price=profile.max_price,
working_dir=conf.working_dir,
run_name=conf.name,
Expand All @@ -110,6 +112,7 @@ def apply_configuration(
creation_policy=profile.creation_policy,
termination_policy=profile.termination_policy,
termination_policy_idle=profile.termination_idle_time,
idle_duration=profile.idle_duration,
)

print_run_plan(run_plan, offers_limit=configurator_args.max_offers)
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def print_run_plan(run_plan: RunPlan, offers_limit: int = 3):
props.add_column(no_wrap=True) # key
props.add_column() # value

print(job_plan.job_spec.stop_duration)
un-def marked this conversation as resolved.
Show resolved Hide resolved

req = job_plan.job_spec.requirements
pretty_req = req.pretty_format(resources_only=True)
max_price = f"${req.max_price:g}" if req.max_price else "-"
Expand All @@ -40,6 +42,8 @@ def print_run_plan(run_plan: RunPlan, offers_limit: int = 3):

profile = run_plan.run_spec.merged_profile
creation_policy = profile.creation_policy
# FIXME: This assumes the default idle_duration is the same for client and server.
# If the server changes idle_duration, old clients will see incorrect value.
termination_policy, termination_idle_time = get_termination(
profile, DEFAULT_RUN_TERMINATION_IDLE_TIME
)
Expand Down
24 changes: 15 additions & 9 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

DEFAULT_INSTANCE_RETRY_DURATION = 60 * 60 * 24 # 24h

DEFAULT_STOP_DURATION = 300


class SpotPolicy(str, Enum):
SPOT = "spot"
Expand All @@ -38,23 +40,27 @@ def parse_duration(v: Optional[Union[int, str]]) -> Optional[int]:
return Duration.parse(v)


def parse_max_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
return parse_off_duration(v)


def parse_stop_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
return parse_off_duration(v)


def parse_off_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
if v == "off":
return v
def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
if v == "off" or v is False:
return "off"
if v is True:
return None
return parse_duration(v)


def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
if v is False:
return -1
if v is True:
return None
return parse_duration(v)


Expand Down Expand Up @@ -143,7 +149,7 @@ class ProfileParams(CoreModel):
Field(description="The policy for resubmitting the run. Defaults to `false`"),
]
max_duration: Annotated[
Optional[Union[Literal["off"], str, int]],
Optional[Union[Literal["off"], str, int, bool]],
Field(
description=(
"The maximum duration of a run (e.g., `2h`, `1d`, etc)."
Expand All @@ -153,7 +159,7 @@ class ProfileParams(CoreModel):
),
]
stop_duration: Annotated[
Optional[Union[Literal["off"], str, int]],
Optional[Union[Literal["off"], str, int, bool]],
Field(
description=(
"The maximum duration of a run gracefull stopping."
Expand All @@ -174,7 +180,7 @@ class ProfileParams(CoreModel):
),
]
idle_duration: Annotated[
Optional[Union[Literal["off"], str, int]],
Optional[Union[Literal["off"], str, int, bool]],
Field(
description=(
"Time to wait before terminating idle instances."
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class JobSpec(CoreModel):
image_name: str
privileged: bool = False
max_duration: Optional[int]
stop_duration: Optional[int] = None
registry_auth: Optional[RegistryAuth]
requirements: Requirements
retry: Optional[Retry]
Expand Down
7 changes: 0 additions & 7 deletions src/dstack/_internal/core/services/profiles.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import timedelta
from typing import Optional, Tuple

from dstack._internal.core.models.profiles import (
Expand Down Expand Up @@ -54,9 +53,3 @@ def get_termination(
if termination_policy == TerminationPolicy.DONT_DESTROY:
termination_idle_time = -1
return termination_policy, int(termination_idle_time)


def get_stop_duration(profile: Profile) -> timedelta:
if profile.stop_duration is None:
return timedelta(minutes=5)
return timedelta(seconds=profile.stop_duration)
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ async def _process_next_terminating_job():

async def _process_job(session: AsyncSession, job_model: JobModel):
logger.debug("%s: terminating job", fmt(job_model))
res = await session.execute(
select(JobModel).where(JobModel.id == job_model.id).options(joinedload(JobModel.run))
)
job_model = res.scalar_one()
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == job_model.used_instance_id)
Expand Down
18 changes: 9 additions & 9 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
JobTerminationReason,
RunSpec,
)
from dstack._internal.core.services.profiles import get_stop_duration
from dstack._internal.server.models import (
InstanceModel,
JobModel,
Expand Down Expand Up @@ -379,7 +378,7 @@ async def _detach_volumes_from_job_instance(
jpd: JobProvisioningData,
instance_model: InstanceModel,
):
run_spec = RunSpec.__response__.parse_raw(job_model.run.run_spec)
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
backend = await backends_services.get_project_backend_by_type(
project=project,
backend_type=jpd.backend,
Expand All @@ -397,7 +396,7 @@ async def _detach_volumes_from_job_instance(
backend=backend,
job_model=job_model,
jpd=jpd,
run_spec=run_spec,
job_spec=job_spec,
instance_model=instance_model,
volume_model=volume_model,
)
Expand All @@ -416,13 +415,12 @@ async def _detach_volume_from_job_instance(
backend: Backend,
job_model: JobModel,
jpd: JobProvisioningData,
run_spec: RunSpec,
job_spec: JobSpec,
instance_model: InstanceModel,
volume_model: VolumeModel,
) -> bool:
detached = True
volume = volume_model_to_volume(volume_model)
stop_duration = get_stop_duration(run_spec.merged_profile)
if volume.provisioning_data is None or not volume.provisioning_data.detachable:
# Backends without `detach_volume` detach volumes automatically
return detached
Expand All @@ -447,7 +445,7 @@ async def _detach_volume_from_job_instance(
volume=volume,
instance_id=jpd.instance_id,
)
if not detached and _should_force_detach_volume(job_model, stop_duration):
if not detached and _should_force_detach_volume(job_model, job_spec.stop_duration):
logger.info(
"Force detaching volume %s from %s",
volume_model.name,
Expand Down Expand Up @@ -479,14 +477,16 @@ async def _detach_volume_from_job_instance(
MIN_FORCE_DETACH_WAIT_PERIOD = timedelta(seconds=60)


def _should_force_detach_volume(job_model: JobModel, stop_duration: timedelta) -> bool:
def _should_force_detach_volume(job_model: JobModel, stop_duration: Optional[int]) -> bool:
return (
job_model.volumes_detached_at is not None
and common.get_current_datetime()
> job_model.volumes_detached_at.replace(tzinfo=timezone.utc) + MIN_FORCE_DETACH_WAIT_PERIOD
and (
job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER
or common.get_current_datetime()
> job_model.volumes_detached_at.replace(tzinfo=timezone.utc) + stop_duration
or stop_duration is not None
and common.get_current_datetime()
> job_model.volumes_detached_at.replace(tzinfo=timezone.utc)
+ timedelta(seconds=stop_duration)
)
)
16 changes: 13 additions & 3 deletions src/dstack/_internal/server/services/jobs/configurators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
PythonVersion,
RunConfigurationType,
)
from dstack._internal.core.models.profiles import SpotPolicy
from dstack._internal.core.models.profiles import DEFAULT_STOP_DURATION, SpotPolicy
from dstack._internal.core.models.runs import (
AppSpec,
JobSpec,
Expand Down Expand Up @@ -105,6 +105,7 @@ async def _get_job_spec(
user=await self._user(),
privileged=self._privileged(),
max_duration=self._max_duration(),
stop_duration=self._stop_duration(),
registry_auth=self._registry_auth(),
requirements=self._requirements(),
retry=self._retry(),
Expand Down Expand Up @@ -172,12 +173,21 @@ def _privileged(self) -> bool:
return self.run_spec.configuration.privileged

def _max_duration(self) -> Optional[int]:
if self.run_spec.merged_profile.max_duration is None:
if self.run_spec.merged_profile.max_duration in [None, True]:
return self._default_max_duration()
if self.run_spec.merged_profile.max_duration == "off":
if self.run_spec.merged_profile.max_duration in ["off", False]:
return None
# pydantic validator ensures this is int
return self.run_spec.merged_profile.max_duration

def _stop_duration(self) -> Optional[int]:
if self.run_spec.merged_profile.stop_duration in [None, True]:
return DEFAULT_STOP_DURATION
if self.run_spec.merged_profile.stop_duration in ["off", False]:
return None
# pydantic validator ensures this is int
return self.run_spec.merged_profile.stop_duration

def _registry_auth(self) -> Optional[RegistryAuth]:
return self.run_spec.configuration.registry_auth

Expand Down
6 changes: 6 additions & 0 deletions src/dstack/api/_public/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ def get_plan(
creation_policy: Optional[CreationPolicy] = None,
termination_policy: Optional[TerminationPolicy] = None,
termination_policy_idle: Optional[Union[str, int]] = None,
reservation: Optional[str] = None,
idle_duration: Optional[Union[str, int]] = None,
stop_duration: Optional[Union[str, int]] = None,
) -> RunPlan:
# """
# Get run plan. Same arguments as `submit`
Expand Down Expand Up @@ -520,14 +523,17 @@ def get_plan(
backends=backends,
regions=regions,
instance_types=instance_types,
reservation=reservation,
spot_policy=spot_policy,
retry=None,
retry_policy=retry_policy,
max_duration=max_duration,
stop_duration=stop_duration,
max_price=max_price,
pool_name=pool_name,
instance_name=instance_name,
creation_policy=creation_policy,
idle_duration=idle_duration,
termination_policy=termination_policy,
termination_idle_time=termination_policy_idle,
)
Expand Down
11 changes: 9 additions & 2 deletions src/dstack/api/server/_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[dict]:
configuration_excludes: set[str] = set()
profile_excludes: set[str] = set()
configuration = run_spec.configuration
profile = run_spec.profile

# client >= 0.18.18 / server <= 0.18.17 compatibility tweak
if not configuration.privileged:
Expand All @@ -131,13 +132,19 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[dict]:
if run_spec.configuration.user is None:
configuration_excludes.add("user")
# client >= 0.18.30 / server <= 0.18.29 compatibility tweak
if not configuration.reservation:
if configuration.reservation is None:
configuration_excludes.add("reservation")
if profile is not None and profile.reservation is None:
profile_excludes.add("reservation")
if configuration.idle_duration is None:
configuration_excludes.add("idle_duration")
if profile is not None and profile.idle_duration is None:
profile_excludes.add("idle_duration")

# client >= 0.18.38 / server <= 0.18.37 compatibility tweak
if configuration.stop_duration is None:
configuration_excludes.add("stop_duration")
if profile is not None and profile.stop_duration is None:
profile_excludes.add("stop_duration")
if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
if profile_excludes:
Expand Down
2 changes: 2 additions & 0 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def get_dev_env_run_plan_dict(
"job_num": 0,
"jobs_per_replica": 1,
"max_duration": None,
"stop_duration": 300,
"registry_auth": None,
"requirements": {
"resources": {
Expand Down Expand Up @@ -331,6 +332,7 @@ def get_dev_env_run_dict(
"job_num": 0,
"jobs_per_replica": 1,
"max_duration": None,
"stop_duration": 300,
"registry_auth": None,
"requirements": {
"resources": {
Expand Down