Skip to content

Commit

Permalink
Fix incorrectly terminating nodes misclassified as idle in autoscaler v1
Browse files Browse the repository at this point in the history
Signed-off-by: Mimi Liao <[email protected]>
  • Loading branch information
mimiliaogo committed Nov 3, 2024
1 parent c7263e4 commit 8cd0be3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
25 changes: 23 additions & 2 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import ray
import ray._private.ray_constants as ray_constants
from ray._private.utils import binary_to_hex
from ray.autoscaler._private.constants import (
AUTOSCALER_HEARTBEAT_TIMEOUT_S,
AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
Expand Down Expand Up @@ -491,7 +492,23 @@ def terminate_nodes_to_enforce_config_constraints(self, now: float):
assert self.provider

last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])

# local import to avoid circular dependencies
from ray.autoscaler.v2.sdk import get_cluster_resource_state

# Note: The `last_used` metric only considers resource occupation, which can misreport nodes as idle when:
# 1. Tasks without assigned resources run on the node.
# 2. All tasks are blocked on `get` or `wait` operations.
# Using idle_duration_ms reported by ralyet instead. (ref: https://github.com/ray-project/ray/pull/39582)
# Use get_cluster_resource_state from autocaler v2 sdk to get idle_duration_ms from raylet
# TODO(mimi): should we maintain ray_state at higher level?
ray_state = get_cluster_resource_state(self.gcs_client)
ray_nodes_idle_duration_ms_by_id = {
binary_to_hex(node.node_id): node.idle_duration_ms
for node in ray_state.node_states
}

idle_timeout_ms = 60 * 1000 * self.config["idle_timeout_minutes"]

# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
Expand Down Expand Up @@ -539,7 +556,11 @@ def keep_node(node_id: NodeID) -> None:
continue

node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:

if (
node_id in ray_nodes_idle_duration_ms_by_id
and ray_nodes_idle_duration_ms_by_id[node_id] > idle_timeout_ms
):
self.schedule_node_termination(node_id, "idle", logger.info)
# Get the local time of the node's last use as a string.
formatted_last_used_time = time.asctime(
Expand Down
14 changes: 14 additions & 0 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
)
from ray.exceptions import RpcError

from ray.core.generated import autoscaler_pb2

WORKER_FILTER = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER}

Expand Down Expand Up @@ -110,6 +111,8 @@ def __init__(self, drain_node_outcome=DrainNodeOutcome.Succeeded):
# Tracks how many times DrainNode returned a successful RPC response.
self.drain_node_reply_success = 0

self.custom_cluster_state = None

def drain_nodes(self, raylet_ids_to_drain, timeout: int):
"""Simulate NodeInfo stub's DrainNode call.
Expand Down Expand Up @@ -143,6 +146,17 @@ def drain_nodes(self, raylet_ids_to_drain, timeout: int):
# Shouldn't land here.
assert False, "Possible drain node outcomes exhausted."

def get_cluster_resource_state(self):
"""Mock get_cluster_resource_state to return a ClusterResourceState.
Returns an empty state by default or custom state if provided.
"""
if self.custom_cluster_state is not None:
return self.custom_cluster_state.SerializeToString()

# Default empty ClusterResourceState
return autoscaler_pb2.ClusterResourceState().SerializeToString()


def mock_raylet_id() -> bytes:
"""Random raylet id to pass to load_metrics.update."""
Expand Down

0 comments on commit 8cd0be3

Please sign in to comment.