Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ping-and-up test for den launched clusters #1684

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def check_connect_server():

if not self._http_client and not check_connect_server():
if self.__class__.__name__ == "OnDemandCluster":
self._update_from_sky_status(dryrun=False)
self._update_from_status(dryrun=False)
if not self._ping(retry=False):
raise ConnectionError(
f"Could not reach {self.name} {self.head_ip}. Is cluster up?"
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/launcher_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def up(cls, cluster, verbose: bool = True):
**cluster.sky_kwargs.get("launch", {}),
)

cluster._update_from_sky_status()
cluster._update_from_status()
if cluster.domain:
logger.info(
f"Cluster has been launched with the custom domain '{cluster.domain}'. "
Expand Down
50 changes: 45 additions & 5 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
DEFAULT_SERVER_PORT,
LOCAL_HOSTS,
)

from runhouse.globals import configs, obj_store, rns_client
from runhouse.logger import get_logger
from runhouse.resources.hardware.utils import (
Expand All @@ -36,6 +35,7 @@
ServerConnectionType,
up_cluster_helper,
)
from runhouse.rns.utils.api import read_resp_data
from .cluster import Cluster
from .launcher_utils import DenLauncher, LocalLauncher

Expand Down Expand Up @@ -136,7 +136,7 @@ def __init__(
# Checks if state info is in local sky db, populates if so.
if not dryrun and not self.ips and self.launcher == LauncherType.LOCAL:
# Cluster status is set to INIT in the Sky DB right after starting, so we need to refresh once
self._update_from_sky_status(dryrun=True)
self._update_from_status(dryrun=True)

@property
def ips(self):
Expand All @@ -153,7 +153,7 @@ def client(self):
except ValueError as e:
if not self.ips:
# Try loading in from local Sky DB
self._update_from_sky_status(dryrun=True)
self._update_from_status(dryrun=True)
if not self.ips:
raise ValueError(
f"Could not determine ips for ondemand cluster <{self.name}>. "
Expand Down Expand Up @@ -405,7 +405,7 @@ def _sky_status(self, refresh: bool = True, retry: bool = True):

def _start_ray_workers(self, ray_port, env_vars):
if not self.internal_ips:
self._update_from_sky_status()
self._update_from_status()

super()._start_ray_workers(ray_port, env_vars)

Expand Down Expand Up @@ -544,6 +544,45 @@ def _setup_default_creds(self):
For Den launching we load the default ssh creds, and for local launching we let Sky handle it."""
return DenLauncher.load_creds() if self.launcher == LauncherType.DEN else None

def _update_from_status(self, dryrun: bool = False):

if self._is_shared:
# If the cluster is shared can ignore, since the sky data will only be saved on the machine where
# the cluster was initially upped
return

if self.launcher == LauncherType.LOCAL:
# Try to get the cluster status from SkyDB
cluster_dict = self._sky_status(refresh=not dryrun)
self._populate_connection_from_status_dict(cluster_dict)
if self.launcher == LauncherType.DEN:
# checking if the function is called during the init process, before the cluster is saved to den.
if not self.rns_address:
return
cluster_uri = rns_client.format_rns_address(self.rns_address)
resp = requests.get(
f"{rns_client.api_server_url}/resource/{cluster_uri}/cluster/status",
headers=rns_client.request_headers(),
)
if resp.status_code != 200:
logger.warning(
f"Received [{resp.status_code}] from Den GET '{cluster_uri}': Failed to load cluster status"
)
return
cluster_den_status: list = read_resp_data(resp)

if not cluster_den_status:
logger.warning("Failed to update cluster info")
Copy link
Contributor Author

@BelSasha BelSasha Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to go with "Failed to update cluster info" since we are updating cluster info (such as compute properties etc), and not just cluster status.
@jlewitt1

return

resource_info = cluster_den_status[0].get("resource_info")
if not resource_info:
logger.warning("Failed to load resource info from cluster status")
return

cluster_dict = resource_info.get("cluster_config")
DenLauncher._update_from_den_response(cluster=self, config=cluster_dict)

def get_instance_type(self):
"""Returns instance type of the cluster."""
if self.instance_type and "--" in self.instance_type: # K8s specific syntax
Expand Down Expand Up @@ -828,6 +867,7 @@ def _ping(self, timeout=5, retry=False):
return True

if retry:
self._update_from_sky_status(dryrun=False)
dryrun = False if self.launcher == LauncherType.LOCAL else None
self._update_from_status(dryrun=dryrun)
return super()._ping(timeout=timeout, retry=False)
return False
2 changes: 0 additions & 2 deletions tests/fixtures/on_demand_cluster_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ def den_launched_ondemand_aws_k8s_cluster(request, test_rns_folder):
"launcher": LauncherType.DEN,
"context": os.getenv("EKS_ARN"),
}

cluster = setup_test_cluster(args, request)
yield cluster
teardown_cluster_fixture(request, cluster)
Expand All @@ -256,7 +255,6 @@ def den_launched_ondemand_gcp_k8s_cluster(request, test_rns_folder):
"launcher": LauncherType.DEN,
"context": "gke_testing",
}

cluster = setup_test_cluster(args, request)
yield cluster
teardown_cluster_fixture(request, cluster)
Expand Down
Loading