Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Retryable grpc client #47981

Merged
merged 39 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b47dac0
Retryable grpc client
jjyao Oct 10, 2024
3ead394
up
jjyao Oct 11, 2024
e7fc94b
up
jjyao Oct 11, 2024
e944542
up
jjyao Oct 11, 2024
b072f62
up
jjyao Oct 11, 2024
4bf5f52
up
jjyao Oct 11, 2024
8ea5340
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Oct 17, 2024
bf8577e
up
jjyao Oct 17, 2024
2befc08
up
jjyao Oct 19, 2024
78a399b
up
jjyao Oct 19, 2024
ed8051b
comments
jjyao Oct 19, 2024
d94e08e
lock
jjyao Oct 22, 2024
3bf9089
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Oct 22, 2024
2d9af67
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Oct 31, 2024
16326f8
up
jjyao Oct 31, 2024
3e60b53
fix
jjyao Nov 5, 2024
0d7e1d5
up
jjyao Nov 7, 2024
0635cd6
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Nov 8, 2024
c3a2c04
up
jjyao Nov 8, 2024
2d3f419
up
jjyao Nov 8, 2024
e910fa0
up
jjyao Nov 9, 2024
72f2541
up
jjyao Nov 12, 2024
dc1fa24
fix
jjyao Nov 12, 2024
6102de7
fix
jjyao Nov 12, 2024
5fac510
up
jjyao Nov 15, 2024
4ddf163
up
jjyao Nov 18, 2024
cabae18
no mutex
jjyao Nov 18, 2024
536fe8c
up
jjyao Nov 18, 2024
c4ebc2b
up
jjyao Nov 18, 2024
c9c077b
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Nov 18, 2024
381a626
up
jjyao Nov 19, 2024
f90f6a5
up
jjyao Nov 22, 2024
4464a1d
up
jjyao Nov 26, 2024
fd90c4c
up
jjyao Nov 27, 2024
cda3b67
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Nov 27, 2024
b7e4181
Merge branch 'master' of github.com:ray-project/ray into jjyao/rettttry
jjyao Dec 3, 2024
e5fc8cd
up
jjyao Dec 3, 2024
4eececd
up
jjyao Dec 3, 2024
1ed3421
up
jjyao Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ray_cc_library(
"src/ray/rpc/client_call.h",
"src/ray/rpc/common.h",
"src/ray/rpc/grpc_client.h",
"src/ray/rpc/retryable_grpc_client.h",
"src/ray/rpc/grpc_server.h",
"src/ray/rpc/metrics_agent_client.h",
"src/ray/rpc/server_call.h",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class ObjectRefGenerator:
return False
else:
return True
else:
return False

"""
Private APIs
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3300,13 +3300,13 @@ Status CoreWorker::ReportGeneratorItemReturns(
if (status.ok()) {
num_objects_consumed = reply.total_num_object_consumed();
} else {
// TODO(sang): Handle network error more gracefully.
// If the request fails, we should just resume until task finishes without
// backpressure.
num_objects_consumed = waiter->TotalObjectGenerated();
RAY_LOG(WARNING).WithField(return_id)
<< "Failed to report streaming generator return "
"to the caller. The yield'ed ObjectRef may not be usable.";
"to the caller. The yield'ed ObjectRef may not be usable. "
<< status;
}
waiter->HandleObjectReported(num_objects_consumed);
});
Expand Down
256 changes: 53 additions & 203 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,13 @@

#include "absl/container/btree_map.h"
#include "ray/common/grpc_util.h"
#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "ray/rpc/retryable_grpc_client.h"
#include "src/ray/protobuf/autoscaler.grpc.pb.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"

namespace ray {
namespace rpc {

class GcsRpcClient;

/// \class Executor
/// Executor saves operation and support retries.
class Executor {
public:
Executor(std::function<void(const ray::Status &)> abort_callback)
: abort_callback_(std::move(abort_callback)) {}

/// This function is used to execute the given operation.
///
/// \param operation The operation to be executed.
void Execute(std::function<void()> operation) {
operation_ = std::move(operation);
operation_();
}

/// This function is used to retry the given operation.
void Retry() { operation_(); }

void Abort(const ray::Status &status) { abort_callback_(status); }

private:
std::function<void(ray::Status)> abort_callback_;
std::function<void()> operation_;
};

/// Convenience macro to invoke VOID_GCS_RPC_CLIENT_METHOD_FULL with defaults.
///
/// Creates a Sync and an Async method just like in VOID_GCS_RPC_CLIENT_METHOD_FULL,
Expand Down Expand Up @@ -166,23 +138,17 @@ class GcsRpcClient {
GcsRpcClient(const std::string &address,
const int port,
ClientCallManager &client_call_manager)
: gcs_address_(address),
gcs_port_(port),
io_context_(&client_call_manager.GetMainService()),
timer_(std::make_unique<boost::asio::deadline_timer>(*io_context_)) {
: gcs_address_(address), gcs_port_(port) {
channel_ = CreateGcsChannel(address, port);
// If not the reconnection will continue to work.
auto deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(::RayConfig::instance().gcs_rpc_server_connect_timeout_s());
if (!channel_->WaitForConnected(deadline)) {
RAY_LOG(ERROR) << "Failed to connect to GCS at address " << address << ":" << port
<< " within "
<< ::RayConfig::instance().gcs_rpc_server_connect_timeout_s()
<< " seconds.";
gcs_is_down_ = true;
} else {
gcs_is_down_ = false;
RAY_LOG(WARNING) << "Failed to connect to GCS at address " << address << ":" << port
<< " within "
<< ::RayConfig::instance().gcs_rpc_server_connect_timeout_s()
<< " seconds.";
}

job_info_grpc_client_ =
Expand Down Expand Up @@ -212,7 +178,31 @@ class GcsRpcClient {
runtime_env_grpc_client_ =
std::make_unique<GrpcClient<RuntimeEnvGcsService>>(channel_, client_call_manager);

SetupCheckTimer();
retryable_grpc_client_ = RetryableGrpcClient::Create(
channel_,
client_call_manager.GetMainService(),
/*max_pending_requests_bytes=*/
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes(),
/*check_channel_status_interval_milliseconds=*/
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds(),
/*server_unavailable_timeout_seconds=*/
::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s(),
/*server_unavailable_timeout_callback=*/
[]() {
RAY_LOG(ERROR) << "Failed to connect to GCS within "
<< ::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s()
<< " seconds. "
<< "GCS may have been killed. It's either GCS is terminated by "
"`ray stop` or "
<< "is killed unexpectedly. If it is killed unexpectedly, "
<< "see the log file gcs_server.out. "
<< "https://docs.ray.io/en/master/ray-observability/user-guides/"
"configure-logging.html#logging-directory-structure. "
<< "The program will terminate.";
std::_Exit(EXIT_FAILURE);
},
/*server_name=*/"GCS");
}

template <typename Service,
Expand All @@ -226,64 +216,27 @@ class GcsRpcClient {
const Request &request,
const ClientCallback<Reply> &callback,
const int64_t timeout_ms) {
auto executor = new Executor(
[callback](const ray::Status &status) { callback(status, Reply()); });
auto operation_callback = [this, request, callback, executor, timeout_ms](
const ray::Status &status, Reply &&reply) {
if (status.ok()) {
if constexpr (handle_payload_status) {
Status st =
(reply.status().code() == (int)StatusCode::OK)
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
callback(st, std::move(reply));
} else {
callback(status, std::move(reply));
}
delete executor;
} else if (!IsGrpcRetryableStatus(status)) {
callback(status, std::move(reply));
delete executor;
} else {
/* In case of GCS failure, we queue the request and these requests will be */
/* executed once GCS is back. */
gcs_is_down_ = true;
auto request_bytes = request.ByteSizeLong();
if (pending_requests_bytes_ + request_bytes >
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes()) {
RAY_LOG(WARNING) << "Pending queue for failed GCS request has reached the "
<< "limit. Blocking the current thread until GCS is back";
while (gcs_is_down_ && !shutdown_) {
CheckChannelStatus(false);
std::this_thread::sleep_for(std::chrono::milliseconds(
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds()));
}
if (shutdown_) {
callback(Status::Disconnected("GCS client has been disconnected."),
std::move(reply));
delete executor;
retryable_grpc_client_->template CallMethod<Service, Request, Reply>(
prepare_async_function,
grpc_client,
call_name,
request,
[callback](const Status &status, Reply &&reply) {
if (status.ok()) {
if constexpr (handle_payload_status) {
Status st = (reply.status().code() == (int)StatusCode::OK)
jjyao marked this conversation as resolved.
Show resolved Hide resolved
? Status()
: Status(StatusCode(reply.status().code()),
reply.status().message());
callback(st, std::move(reply));
} else {
callback(status, std::move(reply));
}
} else {
executor->Retry();
callback(status, std::move(reply));
}
} else {
pending_requests_bytes_ += request_bytes;
auto timeout = timeout_ms == -1 ? absl::InfiniteFuture()
: absl::Now() + absl::Milliseconds(timeout_ms);
pending_requests_.emplace(timeout, std::make_pair(executor, request_bytes));
}
}
};
auto operation = [prepare_async_function,
&grpc_client,
call_name,
request,
operation_callback,
timeout_ms]() {
grpc_client.template CallMethod<Request, Reply>(
prepare_async_function, request, operation_callback, call_name, timeout_ms);
};
executor->Execute(std::move(operation));
},
timeout_ms);
}

/// Add job info to GCS Service.
Expand Down Expand Up @@ -596,15 +549,7 @@ class GcsRpcClient {
runtime_env_grpc_client_,
/*method_timeout_ms*/ -1, )

void Shutdown() {
if (!shutdown_.exchange(true)) {
// First call to shut down this GCS RPC client.
absl::MutexLock lock(&timer_mu_);
timer_->cancel();
} else {
RAY_LOG(DEBUG) << "GCS RPC client has already shutdown.";
}
}
void Shutdown() { retryable_grpc_client_->Shutdown(); }

std::pair<std::string, int64_t> GetAddress() const {
return std::make_pair(gcs_address_, gcs_port_);
Expand All @@ -613,97 +558,10 @@ class GcsRpcClient {
std::shared_ptr<grpc::Channel> GetChannel() const { return channel_; }

private:
void SetupCheckTimer() {
auto duration = boost::posix_time::milliseconds(
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds());
absl::MutexLock lock(&timer_mu_);
timer_->expires_from_now(duration);
timer_->async_wait([this](boost::system::error_code error) {
if (error == boost::system::errc::success) {
CheckChannelStatus();
}
});
}

void CheckChannelStatus(bool reset_timer = true) {
if (shutdown_) {
return;
}

auto status = channel_->GetState(false);
// https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
// https://grpc.github.io/grpc/core/connectivity__state_8h_source.html
if (status != GRPC_CHANNEL_READY) {
RAY_LOG(DEBUG) << "GCS channel status: " << status;
}

// We need to cleanup all the pending requests which are timeout.
auto now = absl::Now();
while (!pending_requests_.empty()) {
auto iter = pending_requests_.begin();
if (iter->first > now) {
break;
}
auto [executor, request_bytes] = iter->second;
executor->Abort(
ray::Status::TimedOut("Timed out while waiting for GCS to become available."));
pending_requests_bytes_ -= request_bytes;
delete executor;
pending_requests_.erase(iter);
}

switch (status) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_CONNECTING:
if (!gcs_is_down_) {
gcs_is_down_ = true;
} else {
if (absl::ToInt64Seconds(absl::Now() - gcs_last_alive_time_) >=
::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s()) {
RAY_LOG(ERROR) << "Failed to connect to GCS within "
<< ::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s()
<< " seconds. "
<< "GCS may have been killed. It's either GCS is terminated by "
"`ray stop` or "
<< "is killed unexpectedly. If it is killed unexpectedly, "
<< "see the log file gcs_server.out. "
<< "https://docs.ray.io/en/master/ray-observability/user-guides/"
"configure-logging.html#logging-directory-structure. "
<< "The program will terminate.";
std::_Exit(EXIT_FAILURE);
}
}
break;
case GRPC_CHANNEL_SHUTDOWN:
RAY_CHECK(shutdown_) << "Channel shoud never go to this status.";
break;
case GRPC_CHANNEL_READY:
case GRPC_CHANNEL_IDLE:
gcs_last_alive_time_ = absl::Now();
gcs_is_down_ = false;
// Retry the one queued.
while (!pending_requests_.empty()) {
pending_requests_.begin()->second.first->Retry();
pending_requests_.erase(pending_requests_.begin());
}
pending_requests_bytes_ = 0;
break;
default:
RAY_LOG(FATAL) << "Not covered status: " << status;
}
SetupCheckTimer();
}

const std::string gcs_address_;
const int64_t gcs_port_;

instrumented_io_context *const io_context_;

// Timer can be called from either the GCS RPC event loop, or the application's
// main thread. It needs to be protected by a mutex.
absl::Mutex timer_mu_;
const std::unique_ptr<boost::asio::deadline_timer> timer_;
std::shared_ptr<grpc::Channel> channel_;
std::shared_ptr<RetryableGrpcClient> retryable_grpc_client_;

/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
Expand All @@ -720,14 +578,6 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<autoscaler::AutoscalerStateService>>
autoscaler_state_grpc_client_;

std::shared_ptr<grpc::Channel> channel_;
bool gcs_is_down_ = false;
absl::Time gcs_last_alive_time_ = absl::Now();

std::atomic<bool> shutdown_ = false;
absl::btree_multimap<absl::Time, std::pair<Executor *, size_t>> pending_requests_;
size_t pending_requests_bytes_ = 0;

friend class GcsClientReconnectionTest;
FRIEND_TEST(GcsClientReconnectionTest, ReconnectionBackoff);
};
Expand Down
Loading