Skip to content

Commit

Permalink
Merge branch 'master' into hjiang/cache-runtime-env-core-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dentiny committed Nov 28, 2024
2 parents f3db2bc + c6493b6 commit aee7696
Show file tree
Hide file tree
Showing 75 changed files with 2,156 additions and 937 deletions.
40 changes: 36 additions & 4 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2404,11 +2404,43 @@ ray_cc_test(
)

ray_cc_test(
name = "gcs_export_event_test",
name = "gcs_job_manager_export_event_test",
size = "small",
srcs = glob([
"src/ray/gcs/gcs_server/test/export_api/*.cc",
]),
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "gcs_actor_manager_export_event_test",
size = "small",
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "gcs_node_manager_export_event_test",
size = "small",
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ helm repo add grafana https://grafana.github.io/helm-charts
helm repo update

# Install Loki with single replica mode
helm install loki grafana/loki -f https://raw.githubusercontent.com/grafana/loki/refs/heads/main/production/helm/loki/single-binary-values.yaml
helm install loki grafana/loki --version 6.21.0 -f https://raw.githubusercontent.com/grafana/loki/refs/heads/main/production/helm/loki/single-binary-values.yaml
```

### Configure log processing
Expand Down Expand Up @@ -48,7 +48,7 @@ Deploy the Fluent Bit deployment with the [Helm chart repository](https://github
helm repo add fluent https://fluent.github.io/helm-charts
helm repo update

helm install fluent-bit fluent/fluent-bit -f fluent-bit-config.yaml
helm install fluent-bit fluent/fluent-bit --version 0.48.2 -f fluent-bit-config.yaml
```

### Install the KubeRay Operator
Expand All @@ -75,7 +75,7 @@ Deploy the Grafana deployment with the [Helm chart repository](https://github.co
helm repo add grafana https://grafana.github.io/helm-charts
helm repo update

helm install grafana grafana/grafana -f datasource-config.yaml
helm install grafana grafana/grafana --version 8.6.2 -f datasource-config.yaml
```

### Check the Grafana Dashboard
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_private/ray_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class LogKey(str, Enum):
NODE_ID = "node_id"
ACTOR_ID = "actor_id"
TASK_ID = "task_id"
ACTOR_NAME = "actor_name"
TASK_NAME = "task_name"
TASK_FUNCTION_NAME = "task_func_name"

# Logger built-in context
ASCTIME = "asctime"
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/ray_logging/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ def filter(self, record):
task_id = runtime_context.get_task_id()
if task_id is not None:
setattr(record, LogKey.TASK_ID.value, task_id)
task_name = runtime_context.get_task_name()
if task_name is not None:
setattr(record, LogKey.TASK_NAME.value, task_name)
task_function_name = runtime_context.get_task_function_name()
if task_function_name is not None:
setattr(record, LogKey.TASK_FUNCTION_NAME.value, task_function_name)
actor_name = runtime_context.get_actor_name()
if actor_name is not None:
setattr(record, LogKey.ACTOR_NAME.value, actor_name)
return True
8 changes: 4 additions & 4 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,8 @@ def start_ray_process(
use_valgrind_profiler: bool = False,
use_perftools_profiler: bool = False,
use_tmux: bool = False,
stdout_file: Optional[str] = None,
stderr_file: Optional[str] = None,
stdout_file: Optional[IO[AnyStr]] = None,
stderr_file: Optional[IO[AnyStr]] = None,
pipe_stdin: bool = False,
):
"""Start one of the Ray processes.
Expand Down Expand Up @@ -1443,8 +1443,8 @@ def start_gcs_server(
redis_address: str,
log_dir: str,
session_name: str,
stdout_file: Optional[str] = None,
stderr_file: Optional[str] = None,
stdout_file: Optional[IO[AnyStr]] = None,
stderr_file: Optional[IO[AnyStr]] = None,
redis_password: Optional[str] = None,
config: Optional[dict] = None,
fate_share: Optional[bool] = None,
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def actor_name(self):
def current_task_id(self):
return self.core_worker.get_current_task_id()

@property
def current_task_name(self):
return self.core_worker.get_current_task_name()

@property
def current_task_function_name(self):
return self.core_worker.get_current_task_function_name()

@property
def current_node_id(self):
return self.core_worker.get_current_node_id()
Expand Down
55 changes: 53 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ cdef optional[ObjectIDIndexType] NULL_PUT_INDEX = nullopt
# https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar
# It is thread-safe.
async_task_id = contextvars.ContextVar('async_task_id', default=None)
async_task_name = contextvars.ContextVar('async_task_name', default=None)
async_task_function_name = contextvars.ContextVar('async_task_function_name',
default=None)


class DynamicObjectRefGenerator:
Expand Down Expand Up @@ -1815,7 +1818,8 @@ cdef void execute_task(
return core_worker.run_async_func_or_coro_in_event_loop(
async_function, function_descriptor,
name_of_concurrency_group_to_execute, task_id=task_id,
func_args=(actor, *arguments), func_kwargs=kwarguments)
task_name=task_name, func_args=(actor, *arguments),
func_kwargs=kwarguments)

return function(actor, *arguments, **kwarguments)

Expand Down Expand Up @@ -1927,7 +1931,8 @@ cdef void execute_task(
execute_streaming_generator_async(context),
function_descriptor,
name_of_concurrency_group_to_execute,
task_id=task_id)
task_id=task_id,
task_name=task_name)
else:
execute_streaming_generator_sync(context)

Expand Down Expand Up @@ -3415,6 +3420,48 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.GetCoreWorker().Exit(c_exit_type, detail, null_ptr)

def get_current_task_name(self) -> str:
"""Return the current task name.

If it is a normal task, it returns the task name from the main thread.
If it is a threaded actor, it returns the task name for the current thread.
If it is async actor, it returns the task name stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task name within asyncio task
# via async_task_name contextvar. We try this first.
# It is needed because the core worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task name.
task_name = async_task_name.get()
if task_name is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_name = CCoreWorkerProcess.GetCoreWorker().GetCurrentTaskName() \
.decode("utf-8")
return task_name

def get_current_task_function_name(self) -> str:
"""Return the current task function.

If it is a normal task, it returns the task function from the main thread.
If it is a threaded actor, it returns the task function for the current thread.
If it is async actor, it returns the task function stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task function within asyncio task
# via async_task_function_name contextvar. We try this first.
# It is needed because the core Worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task function.
task_function_name = async_task_function_name.get()
if task_function_name is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_function_name = CCoreWorkerProcess.GetCoreWorker() \
.GetCurrentTaskFunctionName().decode("utf-8")
return task_function_name

def get_current_task_id(self) -> TaskID:
"""Return the current task ID.

Expand Down Expand Up @@ -4822,6 +4869,7 @@ cdef class CoreWorker:
specified_cgname: str,
*,
task_id: Optional[TaskID] = None,
task_name: Optional[str] = None,
func_args: Optional[Tuple] = None,
func_kwargs: Optional[Dict] = None,
):
Expand Down Expand Up @@ -4868,6 +4916,9 @@ cdef class CoreWorker:
try:
if task_id:
async_task_id.set(task_id)
if task_name is not None:
async_task_name.set(task_name)
async_task_function_name.set(function_descriptor.repr)

if inspect.isawaitable(func_or_coro):
coroutine = func_or_coro
Expand Down
10 changes: 0 additions & 10 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,16 +604,6 @@ def log_resource_batch_data_if_desired(
parser.add_argument(
"--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--redis-address", required=False, type=str, help="This is deprecated"
)
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="This is deprecated",
)
parser.add_argument(
"--autoscaling-config",
required=False,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ spec:
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
minReplicas: 0
maxReplicas: 300
# logical group name, for this called small-group, also can be functional
groupName: small-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,25 @@ def _initialize_scale_request(
cur_instances = self.instances

# Get the worker groups that have pending deletes and the worker groups that
# have finished deletes.
# have finished deletes, and the set of workers included in the workersToDelete
# field of any worker group.
(
worker_groups_with_pending_deletes,
worker_groups_without_pending_deletes,
) = self._get_workers_groups_with_deletes(
ray_cluster, set(cur_instances.keys())
)
worker_to_delete_set,
) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys()))

# Calculate the desired number of workers by type.
num_workers_dict = defaultdict(int)
for _, cur_instance in cur_instances.items():
if cur_instance.node_kind == NodeKind.HEAD:
# Only track workers.
continue
num_workers_dict[cur_instance.node_type] += 1
worker_groups = ray_cluster["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
node_type = worker_group["groupName"]
# Handle the case where users manually increase `minReplicas`
# to scale up the number of worker Pods. In this scenario,
# `replicas` will be smaller than `minReplicas`.
num_workers_dict[node_type] = max(
worker_group["replicas"], worker_group["minReplicas"]
)

# Add to launch nodes.
for node_type, count in to_launch.items():
Expand All @@ -242,6 +246,11 @@ def _initialize_scale_request(
# Not possible to delete head node.
continue

if to_delete_instance.cloud_instance_id in worker_to_delete_set:
# If the instance is already in the workersToDelete field of
# any worker group, skip it.
continue

num_workers_dict[to_delete_instance.node_type] -= 1
assert num_workers_dict[to_delete_instance.node_type] >= 0
to_delete_instances_by_type[to_delete_instance.node_type].append(
Expand Down Expand Up @@ -321,6 +330,7 @@ def _submit_scale_request(
# No patch required.
return

logger.info(f"Submitting a scale request: {scale_request}")
self._patch(f"rayclusters/{self._cluster_name}", patch_payload)

def _add_launch_errors(
Expand Down Expand Up @@ -392,9 +402,9 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]:
return copy.deepcopy(self._cached_instances)

@staticmethod
def _get_workers_groups_with_deletes(
def _get_workers_delete_info(
ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId]
) -> Tuple[Set[NodeType], Set[NodeType]]:
) -> Tuple[Set[NodeType], Set[NodeType], Set[CloudInstanceId]]:
"""
Gets the worker groups that have pending deletes and the worker groups that
have finished deletes.
Expand All @@ -404,10 +414,13 @@ def _get_workers_groups_with_deletes(
deletes.
worker_groups_with_finished_deletes: The worker groups that have finished
deletes.
worker_to_delete_set: A set of Pods that are included in the workersToDelete
field of any worker group.
"""

worker_groups_with_pending_deletes = set()
worker_groups_with_deletes = set()
worker_to_delete_set = set()

worker_groups = ray_cluster_spec["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
Expand All @@ -422,14 +435,19 @@ def _get_workers_groups_with_deletes(
worker_groups_with_deletes.add(node_type)

for worker in workersToDelete:
worker_to_delete_set.add(worker)
if worker in node_set:
worker_groups_with_pending_deletes.add(node_type)
break

worker_groups_with_finished_deletes = (
worker_groups_with_deletes - worker_groups_with_pending_deletes
)
return worker_groups_with_pending_deletes, worker_groups_with_finished_deletes
return (
worker_groups_with_pending_deletes,
worker_groups_with_finished_deletes,
worker_to_delete_set,
)

def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
"""
Expand Down
10 changes: 0 additions & 10 deletions python/ray/autoscaler/v2/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,6 @@ def record_autoscaler_v2_usage(gcs_client: GcsClient) -> None:
parser.add_argument(
"--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--redis-address", required=False, type=str, help="This is deprecated"
)
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="This is deprecated",
)
parser.add_argument(
"--autoscaling-config",
required=False,
Expand Down
Loading

0 comments on commit aee7696

Please sign in to comment.