diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index eec513cea969..66bbc814db6c 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -16,6 +16,7 @@ import ray import ray._private.ray_constants as ray_constants +from ray._private.utils import binary_to_hex from ray.autoscaler._private.constants import ( AUTOSCALER_HEARTBEAT_TIMEOUT_S, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, @@ -491,7 +492,25 @@ def terminate_nodes_to_enforce_config_constraints(self, now: float): assert self.provider last_used = self.load_metrics.last_used_time_by_ip - horizon = now - (60 * self.config["idle_timeout_minutes"]) + + # local import to avoid circular dependencies + from ray.autoscaler.v2.sdk import get_cluster_resource_state + + # Note: The `last_used` metric only considers resource occupation, + # which can misreport nodes as idle when: + # 1. Tasks without assigned resources run on the node. + # 2. All tasks are blocked on `get` or `wait` operations. + # Using idle_duration_ms reported by ralyet instead + # ref: https://github.com/ray-project/ray/pull/39582 + # Use get_cluster_resource_state from autocaler v2 sdk + # to get idle_duration_ms from raylet + ray_state = get_cluster_resource_state(self.gcs_client) + ray_nodes_idle_duration_ms_by_id = { + binary_to_hex(node.node_id): node.idle_duration_ms + for node in ray_state.node_states + } + + idle_timeout_ms = 60 * 1000 * self.config["idle_timeout_minutes"] # Sort based on last used to make sure to keep min_workers that # were most recently used. Otherwise, _keep_min_workers_of_node_type @@ -539,7 +558,11 @@ def keep_node(node_id: NodeID) -> None: continue node_ip = self.provider.internal_ip(node_id) - if node_ip in last_used and last_used[node_ip] < horizon: + + if ( + node_id in ray_nodes_idle_duration_ms_by_id + and ray_nodes_idle_duration_ms_by_id[node_id] > idle_timeout_ms + ): self.schedule_node_termination(node_id, "idle", logger.info) # Get the local time of the node's last use as a string. formatted_last_used_time = time.asctime( diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 7870d804ead5..05a4f3685b65 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -70,6 +70,7 @@ ) from ray.exceptions import RpcError +from ray.core.generated import autoscaler_pb2 WORKER_FILTER = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} @@ -110,6 +111,8 @@ def __init__(self, drain_node_outcome=DrainNodeOutcome.Succeeded): # Tracks how many times DrainNode returned a successful RPC response. self.drain_node_reply_success = 0 + self.custom_cluster_state = None + def drain_nodes(self, raylet_ids_to_drain, timeout: int): """Simulate NodeInfo stub's DrainNode call. @@ -143,6 +146,17 @@ def drain_nodes(self, raylet_ids_to_drain, timeout: int): # Shouldn't land here. assert False, "Possible drain node outcomes exhausted." + def get_cluster_resource_state(self): + """Mock get_cluster_resource_state to return a ClusterResourceState. + + Returns an empty state by default or custom state if provided. + """ + if self.custom_cluster_state is not None: + return self.custom_cluster_state.SerializeToString() + + # Default empty ClusterResourceState + return autoscaler_pb2.ClusterResourceState().SerializeToString() + def mock_raylet_id() -> bytes: """Random raylet id to pass to load_metrics.update."""