Skip to content

Commit

Permalink
fix ping-and-up test for den launched clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Feb 10, 2025
1 parent 8f939af commit c3c29d2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 9 deletions.
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def check_connect_server():

if not self._http_client and not check_connect_server():
if self.__class__.__name__ == "OnDemandCluster":
self._update_from_sky_status(dryrun=False)
self._update_from_status(dryrun=False)
if not self._ping(retry=False):
raise ConnectionError(
f"Could not reach {self.name} {self.head_ip}. Is cluster up?"
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/launcher_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def up(cls, cluster, verbose: bool = True):
**cluster.sky_kwargs.get("launch", {}),
)

cluster._update_from_sky_status()
cluster._update_from_status()
if cluster.domain:
logger.info(
f"Cluster has been launched with the custom domain '{cluster.domain}'. "
Expand Down
50 changes: 45 additions & 5 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
DEFAULT_SERVER_PORT,
LOCAL_HOSTS,
)

from runhouse.globals import configs, obj_store, rns_client
from runhouse.logger import get_logger
from runhouse.resources.hardware.utils import (
Expand All @@ -36,6 +35,7 @@
ServerConnectionType,
up_cluster_helper,
)
from runhouse.rns.utils.api import read_resp_data
from .cluster import Cluster
from .launcher_utils import DenLauncher, LocalLauncher

Expand Down Expand Up @@ -136,7 +136,7 @@ def __init__(
# Checks if state info is in local sky db, populates if so.
if not dryrun and not self.ips and self.launcher == LauncherType.LOCAL:
# Cluster status is set to INIT in the Sky DB right after starting, so we need to refresh once
self._update_from_sky_status(dryrun=True)
self._update_from_status(dryrun=True)

@property
def ips(self):
Expand All @@ -153,7 +153,7 @@ def client(self):
except ValueError as e:
if not self.ips:
# Try loading in from local Sky DB
self._update_from_sky_status(dryrun=True)
self._update_from_status(dryrun=True)
if not self.ips:
raise ValueError(
f"Could not determine ips for ondemand cluster <{self.name}>. "
Expand Down Expand Up @@ -405,7 +405,7 @@ def _sky_status(self, refresh: bool = True, retry: bool = True):

def _start_ray_workers(self, ray_port, env_vars):
if not self.internal_ips:
self._update_from_sky_status()
self._update_from_status()

super()._start_ray_workers(ray_port, env_vars)

Expand Down Expand Up @@ -544,6 +544,45 @@ def _setup_default_creds(self):
For Den launching we load the default ssh creds, and for local launching we let Sky handle it."""
return DenLauncher.load_creds() if self.launcher == LauncherType.DEN else None

def _update_from_status(self, dryrun: bool = False):

if self._is_shared:
# If the cluster is shared can ignore, since the sky data will only be saved on the machine where
# the cluster was initially upped
return

if self.launcher == LauncherType.LOCAL:
# Try to get the cluster status from SkyDB
cluster_dict = self._sky_status(refresh=not dryrun)
self._populate_connection_from_status_dict(cluster_dict)
if self.launcher == LauncherType.DEN:
# checking if the function is called during the init process, before the cluster is saved to den.
if not self.rns_address:
return
cluster_uri = rns_client.format_rns_address(self.rns_address)
resp = requests.get(
f"{rns_client.api_server_url}/resource/{cluster_uri}/cluster/status",
headers=rns_client.request_headers(),
)
if resp.status_code != 200:
logger.warning(
f"Received [{resp.status_code}] from Den GET '{cluster_uri}': Failed to load cluster status"
)
return
cluster_den_status: list = read_resp_data(resp)

if not cluster_den_status:
logger.warning("Failed to update cluster info")
return

resource_info = cluster_den_status[0].get("resource_info")
if not resource_info:
logger.warning("Failed to load resource info from cluster status")
return

cluster_dict = resource_info.get("cluster_config")
DenLauncher._update_from_den_response(cluster=self, config=cluster_dict)

def get_instance_type(self):
"""Returns instance type of the cluster."""
if self.instance_type and "--" in self.instance_type: # K8s specific syntax
Expand Down Expand Up @@ -828,6 +867,7 @@ def _ping(self, timeout=5, retry=False):
return True

if retry:
self._update_from_sky_status(dryrun=False)
dryrun = False if self.launcher == LauncherType.LOCAL else None
self._update_from_status(dryrun=dryrun)
return super()._ping(timeout=timeout, retry=False)
return False
2 changes: 0 additions & 2 deletions tests/fixtures/on_demand_cluster_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ def den_launched_ondemand_aws_k8s_cluster(request, test_rns_folder):
"launcher": LauncherType.DEN,
"context": os.getenv("EKS_ARN"),
}

cluster = setup_test_cluster(args, request)
yield cluster
teardown_cluster_fixture(request, cluster)
Expand All @@ -256,7 +255,6 @@ def den_launched_ondemand_gcp_k8s_cluster(request, test_rns_folder):
"launcher": LauncherType.DEN,
"context": "gke_testing",
}

cluster = setup_test_cluster(args, request)
yield cluster
teardown_cluster_fixture(request, cluster)
Expand Down

0 comments on commit c3c29d2

Please sign in to comment.