Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactors WorkerPool with Prestarts. #48677

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,11 @@ RAY_CONFIG(bool, enable_worker_prestart, false)
/// TODO(clarng): reconcile with enable_worker_prestart
RAY_CONFIG(bool, prestart_worker_first_driver, true)

/// For a PrestartWorkers request to a single raylet, the maximum number of workers to
/// prestart. If a request asks for more workers than this, the num of workers will be
/// capped.
RAY_CONFIG(uint64_t, restart_workers_api_max_num_workers, 10)
rynewang marked this conversation as resolved.
Show resolved Hide resolved

/// The interval of periodic idle worker killing. Value of 0 means worker capping is
/// disabled.
RAY_CONFIG(uint64_t, kill_idle_workers_interval_ms, 200)
Expand Down
21 changes: 21 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "ray/core_worker/core_worker.h"

#include <future>

#ifndef _WIN32
#include <unistd.h>
#endif
Expand Down Expand Up @@ -2163,6 +2165,25 @@ void CoreWorker::BuildCommonTaskSpec(
}
}

Status CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info,
uint64_t keep_alive_duration_secs,
size_t num_workers) {
rpc::PrestartWorkersRequest request;
request.set_language(GetLanguage());
request.set_job_id(GetCurrentJobId().Binary());
jjyao marked this conversation as resolved.
Show resolved Hide resolved
*request.mutable_runtime_env_info() =
*OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info);
request.set_keep_alive_duration_secs(keep_alive_duration_secs);
request.set_num_workers(num_workers);
// this is sync
std::promise<Status> promise;
local_raylet_client_->PrestartWorkers(
request, [&promise](const Status &status, const rpc::PrestartWorkersReply &reply) {
promise.set_value(status);
});
return promise.get_future().get();
}

std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
Expand Down
14 changes: 13 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "ray/pubsub/subscriber.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/rpc/worker/core_worker_server.h"
#include "ray/util/process.h"
#include "src/ray/protobuf/pubsub.pb.h"
Expand Down Expand Up @@ -916,6 +915,19 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const std::string &error_message,
double timestamp);

// Prestart workers. The workers:
// - uses current language.
// - uses current JobID.
// - does NOT support root_detached_actor_id.
// - uses provided runtime_env_info applied to the job runtime env, as if it's a task
// request.
//
// This API is sync. It blocks until raylet replies. But it provides no guarantee that
rynewang marked this conversation as resolved.
Show resolved Hide resolved
// the workers are actually started.
Status PrestartWorkers(const std::string &serialized_runtime_env_info,
uint64_t keep_alive_duration_secs,
size_t num_workers);

/// Submit a normal task.
///
/// \param[in] function The remote function to execute.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/protobuf/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ proto_library(
":autoscaler_proto",
":common_proto",
":gcs_proto",
":runtime_env_common_proto",
],
)

Expand Down Expand Up @@ -249,7 +250,7 @@ proto_library(
name = "export_event_proto",
srcs = ["export_api/export_event.proto"],
deps = [
":export_task_event_proto",
":export_task_event_proto",
":export_node_event_proto",
":export_actor_event_proto",
":export_driver_job_event_proto",
Expand Down
20 changes: 20 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";
import "src/ray/protobuf/autoscaler.proto";
import "src/ray/protobuf/runtime_env_common.proto";

message WorkerBacklogReport {
// TaskSpec indicating the scheduling class.
Expand Down Expand Up @@ -94,6 +95,23 @@ message RequestWorkerLeaseReply {
string scheduling_failure_message = 10;
}

// Request to prestart workers. At this time we don't yet know the resource, or task type.
message PrestartWorkersRequest {
Language language = 1;
// Owner: job or root detached actor.
optional bytes job_id = 2;
optional bytes root_detached_actor_id = 3;
rynewang marked this conversation as resolved.
Show resolved Hide resolved
RuntimeEnvInfo runtime_env_info = 4;
// Started idle workers will be kept alive for this duration. Reset on task assignment.
uint64 keep_alive_duration_secs = 5;
// Raylet will try to start `num_workers` workers.
uint64 num_workers = 6;
}

message PrestartWorkersReply {
}


message PrepareBundleResourcesRequest {
// Bundles that containing the requested resources.
repeated Bundle bundle_specs = 1;
Expand Down Expand Up @@ -385,6 +403,8 @@ service NodeManagerService {
rpc GetResourceLoad(GetResourceLoadRequest) returns (GetResourceLoadReply);
// Request a worker from the raylet.
rpc RequestWorkerLease(RequestWorkerLeaseRequest) returns (RequestWorkerLeaseReply);
// Request to prestart workers.
rpc PrestartWorkers(PrestartWorkersRequest) returns (PrestartWorkersReply);
// Report task backlog information from a worker to the raylet
rpc ReportWorkerBacklog(ReportWorkerBacklogRequest) returns (ReportWorkerBacklogReply);
// Release a worker back to its raylet.
Expand Down
47 changes: 47 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#include <csignal>
#include <fstream>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "absl/functional/bind_front.h"
#include "absl/time/clock.h"
Expand All @@ -40,6 +43,7 @@
#include "ray/util/event.h"
#include "ray/util/event_label.h"
#include "ray/util/util.h"
#include "src/ray/raylet/worker_pool.h"

namespace {

Expand Down Expand Up @@ -1865,6 +1869,49 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
send_reply_callback_wrapper);
}

void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
rpc::PrestartWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const uint64_t num_workers = std::min(
RayConfig::instance().restart_workers_api_max_num_workers(), request.num_workers());
if (num_workers < request.num_workers()) {
RAY_LOG(WARNING) << "Requested to prestart " << request.num_workers()
<< " workers, but only " << num_workers
<< " workers are allowed to prestart. See "
"RAY_restart_workers_api_max_num_workers";
}

auto pop_worker_request = std::make_shared<PopWorkerRequest>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PopWorkerRequest is supposed to be a private thing inside worker_pool. Let's just make StartNewWorkers to accept needed parameters to construct PopWorkerRequest inside worker pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartNewWorker(PopWorkerRequest) is also used by PopWorker(PopWorkerRequest) internally. The Request does not have anything private so I guess we can just expose it and allow node_manager.cc to use it? It will be much easier. If we just expose another method with the scattered 10 arguments it doesn't have any added values any way.

request.language(),
rpc::WorkerType::WORKER,
request.has_job_id() ? JobID::FromBinary(request.job_id()) : JobID::Nil(),
request.has_root_detached_actor_id()
? ActorID::FromBinary(request.root_detached_actor_id())
: ActorID::Nil(),
/*gpu=*/std::nullopt,
/*actor_worker=*/std::nullopt,
request.runtime_env_info(),
/*options=*/std::vector<std::string>{},
absl::Seconds(request.keep_alive_duration_secs()),
/*callback=*/
[request](const std::shared_ptr<WorkerInterface> &worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) {
// This callback does not use the worker.
RAY_LOG(ERROR) << "Prestart worker started! token " <<
rynewang marked this conversation as resolved.
Show resolved Hide resolved

worker->GetStartupToken() << ", id " << worker->WorkerId() << ", status "
<< status << ", runtime_env_setup_error_message "
<< runtime_env_setup_error_message;
return false;
});

for (int64_t i = 0; i < num_workers; i++) {
worker_pool_.StartNewWorker(pop_worker_request);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::HandlePrepareBundleResources(
rpc::PrepareBundleResourcesRequest request,
rpc::PrepareBundleResourcesReply *reply,
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
rpc::PrestartWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `ReportWorkerBacklog` request.
void HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest request,
rpc::ReportWorkerBacklogReply *reply,
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/raylet/worker.h"

#include <boost/bind/bind.hpp>
#include <optional>
#include <utility>

#include "ray/raylet/format/node_manager_generated.h"
Expand Down
19 changes: 19 additions & 0 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class WorkerInterface {

virtual const ActorID &GetRootDetachedActorId() const = 0;

virtual void SetIdleKeepAliveDeadline(absl::Time deadline) = 0;
virtual bool IsIdleKillable(absl::Time now) const = 0;

rynewang marked this conversation as resolved.
Show resolved Hide resolved
protected:
virtual void SetStartupToken(StartupToken startup_token) = 0;

Expand Down Expand Up @@ -234,6 +237,7 @@ class Worker : public WorkerInterface {
SetIsActorWorker(task_spec.IsActorCreationTask());
assigned_task_ = assigned_task;
root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId();
idle_keep_alive_deadline_ = std::nullopt;
}

absl::Time GetAssignedTaskTime() const { return task_assign_time_; };
Expand All @@ -256,6 +260,17 @@ class Worker : public WorkerInterface {
void SetIsGpu(bool is_gpu);
void SetIsActorWorker(bool is_actor_worker);

bool IsIdleKillable(absl::Time now) const {
if (!idle_keep_alive_deadline_.has_value()) {
return true;
}
return now > idle_keep_alive_deadline_.value();
}

void SetIdleKeepAliveDeadline(absl::Time deadline) {
idle_keep_alive_deadline_ = deadline;
}

protected:
void SetStartupToken(StartupToken startup_token);

Expand Down Expand Up @@ -330,6 +345,10 @@ class Worker : public WorkerInterface {
std::optional<bool> is_actor_worker_ = std::nullopt;
/// If true, a RPC need to be sent to notify the worker about GCS restarting.
bool notify_gcs_restarted_ = false;
/// If set, the worker is not eligible for killing even if it's idle. This status is
/// reset when the worker is assigned a new task. Note if the job is finished, the
/// worker is still killable.
std::optional<absl::Time> idle_keep_alive_deadline_ = std::nullopt;
};

} // namespace raylet
Expand Down
Loading