Skip to content

Commit

Permalink
Preserve offers order during the merge (#782)
Browse files Browse the repository at this point in the history
  • Loading branch information
Egor-S authored Nov 14, 2023
1 parent dc8fae1 commit fae7937
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 42 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def get_long_description():
"docker>=6.0.0",
"dnspython",
"grpcio>=1.50", # indirect
"gpuhunt==0.0.1rc7",
"gpuhunt==0.0.1rc8",
"sentry-sdk[fastapi]",
]

Expand Down
33 changes: 15 additions & 18 deletions src/dstack/_internal/core/backends/azure/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,30 +244,27 @@ def _get_offers_with_availability(
config_locations: List[str],
offers: List[InstanceOffer],
) -> List[InstanceOfferWithAvailability]:
availability_offers = {}
locations = set()

for offer in offers:
location = offer.region
if location not in config_locations:
continue
locations.add(location)
instance_name = offer.instance.name
spot = offer.instance.resources.spot
availability_offers[(instance_name, location, spot)] = InstanceOfferWithAvailability(
**offer.dict(), availability=InstanceAvailability.NO_QUOTA
)
offers = [offer for offer in offers if offer.region in config_locations]
locations = set(offer.region for offer in offers)

has_quota = set()
for location in locations:
resources = compute_client.resource_skus.list(filter=f"location eq '{location}'")
for resource in resources:
if resource.resource_type != "virtualMachines" or not _vm_type_available(resource):
continue
for spot in (True, False):
key = (resource.name, location, spot)
if key in availability_offers:
availability_offers[key].availability = InstanceAvailability.UNKNOWN
return list(availability_offers.values())
has_quota.add((resource.name, location))

offers_with_availability = []
for offer in offers:
availability = InstanceAvailability.NO_QUOTA
if (offer.instance.name, offer.region) in has_quota:
availability = InstanceAvailability.UNKNOWN
offers_with_availability.append(
InstanceOfferWithAvailability(**offer.dict(), availability=availability)
)

return offers_with_availability


def _vm_type_available(vm_resource: ResourceSku) -> bool:
Expand Down
28 changes: 14 additions & 14 deletions src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,25 @@ def get_offers(
for region in self.regions_client.list(project=self.config.project_id):
for quota in region.quotas:
quotas[region.name][quota.metric] = quota.limit - quota.usage
offers_with_availability: Dict[Tuple[str, str], InstanceOfferWithAvailability] = {}

seen_region_offers = set()
offers_with_availability = []
for offer in offers:
region = offer.region[:-2] # strip zone
key = (
_unique_instance_name(offer.instance),
region,
)
if key in offers_with_availability:
key = (_unique_instance_name(offer.instance), region)
if key in seen_region_offers:
continue
availability = InstanceAvailability.UNKNOWN
if not _has_gpu_quota(quotas[region], offer.instance.resources):
availability = InstanceAvailability.NO_QUOTA
seen_region_offers.add(key)
availability = InstanceAvailability.NO_QUOTA
if _has_gpu_quota(quotas[region], offer.instance.resources):
availability = InstanceAvailability.UNKNOWN
# todo quotas: cpu, memory, global gpu

offers_with_availability[key] = InstanceOfferWithAvailability(
**offer.dict(), availability=availability
offers_with_availability.append(
InstanceOfferWithAvailability(**offer.dict(), availability=availability)
)
offers_with_availability[key].region = region
return list(offers_with_availability.values())
offers_with_availability[-1].region = region

return offers_with_availability

def terminate_instance(
self, instance_id: str, region: str, backend_data: Optional[str] = None
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Optional

from dstack._internal.core.backends.base.compute import Compute, get_dstack_runner_version
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
Expand All @@ -22,6 +23,7 @@ def get_offers(
) -> List[InstanceOfferWithAvailability]:
return [
InstanceOfferWithAvailability(
backend=BackendType.LOCAL,
instance=InstanceType(
name="local",
resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False),
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/backends/vastai/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, config: VastAIConfig):
extra_filters={
"direct_port_count": {"gte": 1},
"reliability2": {"gte": 0.9},
"inet_down": {"gt": 100},
"inet_down": {"gt": 128},
"verified": {"eq": True},
"cuda_max_good": {"gte": 11.8},
"disk_space": {"gte": DISK_SIZE},
Expand Down
21 changes: 13 additions & 8 deletions src/dstack/_internal/server/services/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import heapq
from typing import List, Optional, Tuple, Type

from sqlalchemy import delete, update
Expand Down Expand Up @@ -269,14 +270,18 @@ async def get_instance_offers(
"""
Returns list of instances satisfying minimal resource requirements sorted by price
"""
offers = []
tasks = [
run_async(backend.compute().get_offers, job.job_spec.requirements) for backend in backends
]
for backend, backend_offers in zip(backends, await asyncio.gather(*tasks)):
for offer in backend_offers:
if not exclude_not_available or offer.availability not in _NOT_AVAILABLE:
offers.append((backend, offer))

# Put NOT_AVAILABLE and NO_QUOTA instances at the end
return sorted(offers, key=lambda i: (i[1].availability in _NOT_AVAILABLE, i[1].price))
offers_by_backend = [
[
(backend, offer)
for offer in backend_offers
if not exclude_not_available or offer.availability not in _NOT_AVAILABLE
]
for backend, backend_offers in zip(backends, await asyncio.gather(*tasks))
]
# Merge preserving order for every backend
offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price)
# Put NOT_AVAILABLE and NO_QUOTA instances at the end, do not sort by price
return sorted(offers, key=lambda i: i[1].availability in _NOT_AVAILABLE)

0 comments on commit fae7937

Please sign in to comment.