Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] [6/N] Fix shared pointer usage for gcs server #48990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/mock/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class MockGcsActorManager : public GcsActorManager {
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager)
: GcsActorManager(
nullptr,
nullptr,
nullptr,
/*scheduler=*/nullptr,
/*gcs_table_storage=*/nullptr,
/*gcs_publisher=*/nullptr,
runtime_env_manager,
function_manager,
[](const ActorID &) {},
Expand Down
10 changes: 0 additions & 10 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
namespace ray {
namespace gcs {

class Mockpair_hash : public pair_hash {
public:
};

} // namespace gcs
} // namespace ray

namespace ray {
namespace gcs {

class MockGcsPlacementGroupSchedulerInterface
: public GcsPlacementGroupSchedulerInterface {
public:
Expand Down
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,18 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause(
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
GcsTableStorage *gcs_table_storage,
GcsPublisher *gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::CoreWorkerClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(gcs_table_storage),
gcs_publisher_(gcs_publisher),
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed),
destroy_owned_placement_group_if_needed_(
std::move(destroy_owned_placement_group_if_needed)),
runtime_env_manager_(runtime_env_manager),
function_manager_(function_manager),
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param gcs_publisher Used to publish gcs message.
GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
GcsTableStorage *gcs_table_storage,
GcsPublisher *gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
Expand Down Expand Up @@ -687,9 +687,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// The scheduler to schedule all registered actors.
std::shared_ptr<GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Used to update actor information upon creation, deletion, etc.
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
GcsTableStorage *gcs_table_storage_;
/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsPublisher *gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::CoreWorkerClientFactoryFn worker_client_factory_;
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_init_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) {
<< job_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(load_job_table_data_callback));
RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(load_job_table_data_callback));
}

void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
Expand All @@ -59,7 +59,7 @@ void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
<< node_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback));
RAY_CHECK_OK(gcs_table_storage_.NodeTable().GetAll(load_node_table_data_callback));
}

void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) {
Expand All @@ -72,7 +72,7 @@ void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done)
<< placement_group_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(
RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().GetAll(
load_placement_group_table_data_callback));
}

Expand All @@ -85,7 +85,7 @@ void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) {
<< actor_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTable().AsyncRebuildIndexAndGetAll(
RAY_CHECK_OK(gcs_table_storage_.ActorTable().AsyncRebuildIndexAndGetAll(
load_actor_table_data_callback));
}

Expand All @@ -98,9 +98,9 @@ void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done)
<< actor_task_spec_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().GetAll(
RAY_CHECK_OK(gcs_table_storage_.ActorTaskSpecTable().GetAll(
load_actor_task_spec_table_data_callback));
}

} // namespace gcs
} // namespace ray
} // namespace ray
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_init_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class GcsInitData {
/// Create a GcsInitData.
///
/// \param gcs_table_storage The storage from which the metadata will be loaded.
explicit GcsInitData(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_table_storage_(std::move(gcs_table_storage)) {}
explicit GcsInitData(gcs::GcsTableStorage &gcs_table_storage)
: gcs_table_storage_(gcs_table_storage) {}

/// Load all required metadata from the store into memory at once asynchronously.
///
Expand Down Expand Up @@ -89,7 +89,7 @@ class GcsInitData {

protected:
/// The gcs table storage.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage &gcs_table_storage_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see reference and pointer are both used. What's the rule you are following?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Prefer to use reference as data member, since it guarantee non-null;
  • Have to use pointer in some places due to our testing implementation.

Copy link
Contributor Author

@dentiny dentiny Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our test case, quite a few places are implemented as

Class(/*member1=*/nullptr, /*member2=*/nullptr)

meanwhile mem1 and mem2 have their own dependency which I don't want to touch, so using nullptr in some places, but the ownership should be clear.


/// Job metadata.
absl::flat_hash_map<JobID, rpc::JobTableData> job_table_data_;
Expand Down
18 changes: 9 additions & 9 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, /*done=*/nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, /*done=*/nullptr));
if (job_table_data.config().has_runtime_env_info()) {
runtime_env_manager_.AddURIReference(job_id.Hex(),
job_table_data.config().runtime_env_info());
Expand All @@ -122,7 +122,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
};

Status status =
gcs_table_storage_->JobTable().Put(job_id, mutable_job_table_data, on_done);
gcs_table_storage_.JobTable().Put(job_id, mutable_job_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
Expand All @@ -143,7 +143,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_table_data);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
Expand All @@ -160,7 +160,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
done_callback(status);
};

Status status = gcs_table_storage_->JobTable().Put(job_id, job_table_data, on_done);
Status status = gcs_table_storage_.JobTable().Put(job_id, job_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
Expand All @@ -176,7 +176,7 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};

Status status = gcs_table_storage_->JobTable().Get(
Status status = gcs_table_storage_.JobTable().Get(
job_id,
[this, job_id, send_reply](const Status &status,
const std::optional<rpc::JobTableData> &result) {
Expand Down Expand Up @@ -423,7 +423,7 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback);
}
};
Status status = gcs_table_storage_->JobTable().GetAll(on_done);
Status status = gcs_table_storage_.JobTable().GetAll(on_done);
if (!status.ok()) {
on_done(absl::flat_hash_map<JobID, JobTableData>());
}
Expand All @@ -433,14 +433,14 @@ void GcsJobManager::HandleReportJobError(rpc::ReportJobErrorRequest request,
rpc::ReportJobErrorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto job_id = JobID::FromBinary(request.job_error().job_id());
RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishError(job_id.Hex(), request.job_error(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsJobManager::HandleGetNextJobID(rpc::GetNextJobIDRequest request,
rpc::GetNextJobIDReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_job_id(gcs_table_storage_->GetNextJobID());
reply->set_job_id(gcs_table_storage_.GetNextJobID());
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Expand Down Expand Up @@ -472,7 +472,7 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
};

// make all jobs in current node to finished
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done));
RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(on_done));
}

void GcsJobManager::RecordMetrics() {
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ using JobFinishListenerCallback = rpc::JobInfoHandler::JobFinishListenerCallback
/// This implementation class of `JobInfoHandler`.
class GcsJobManager : public rpc::JobInfoHandler {
public:
explicit GcsJobManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
explicit GcsJobManager(GcsTableStorage &gcs_table_storage,
GcsPublisher &gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
InternalKVInterface &internal_kv,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
: gcs_table_storage_(gcs_table_storage),
gcs_publisher_(gcs_publisher),
runtime_env_manager_(runtime_env_manager),
function_manager_(function_manager),
internal_kv_(internal_kv),
Expand Down Expand Up @@ -118,8 +118,8 @@ class GcsJobManager : public rpc::JobInfoHandler {
// Number of finished jobs since start of this GCS Server, used to report metrics.
int64_t finished_jobs_count_ = 0;

std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsTableStorage &gcs_table_storage_;
GcsPublisher &gcs_publisher_;

/// Listeners which monitors the finish of jobs.
std::vector<JobFinishListenerCallback> job_finished_listeners_;
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ namespace ray {
namespace gcs {

//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher,
gcs::GcsTableStorage *gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id)
: gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(std::move(gcs_table_storage)),
: gcs_publisher_(gcs_publisher),
gcs_table_storage_(gcs_table_storage),
raylet_client_pool_(raylet_client_pool),
cluster_id_(cluster_id) {}

Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
///
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id);
GcsNodeManager(GcsPublisher *gcs_publisher,
gcs::GcsTableStorage *gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id);

/// Handle register rpc request come from raylet.
void HandleGetClusterId(rpc::GetClusterIdRequest request,
Expand Down Expand Up @@ -244,9 +244,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_removed_listeners_;
/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsPublisher *gcs_publisher_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage *gcs_table_storage_;
/// Raylet client pool.
rpc::NodeManagerClientPool *raylet_client_pool_ = nullptr;
/// Cluster ID to be shared with clients when connecting.
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() {

/////////////////////////////////////////////////////////////////////////////////////////

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager)
: io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {}

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context,
GcsPlacementGroupSchedulerInterface *scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage *gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace)
: io_context_(io_context),
gcs_placement_group_scheduler_(scheduler),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_table_storage_(gcs_table_storage),
gcs_resource_manager_(gcs_resource_manager),
get_ray_namespace_(std::move(get_ray_namespace)) {
placement_group_state_counter_.reset(
Expand All @@ -205,10 +209,6 @@ GcsPlacementGroupManager::GcsPlacementGroupManager(
Tick();
}

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager)
: io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {}

void GcsPlacementGroupManager::RegisterPlacementGroup(
const std::shared_ptr<GcsPlacementGroup> &placement_group, StatusCallback callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
Expand Down
8 changes: 3 additions & 5 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include <optional>
#include <utility>

#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/common/id.h"
Expand Down Expand Up @@ -238,11 +236,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param get_ray_namespace A callback to get the ray namespace.
GcsPlacementGroupManager(instrumented_io_context &io_context,
GcsPlacementGroupSchedulerInterface *scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage *gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace);

~GcsPlacementGroupManager() = default;
~GcsPlacementGroupManager() override = default;

void HandleCreatePlacementGroup(rpc::CreatePlacementGroupRequest request,
rpc::CreatePlacementGroupReply *reply,
Expand Down Expand Up @@ -484,7 +482,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
gcs::GcsPlacementGroupSchedulerInterface *gcs_placement_group_scheduler_ = nullptr;

/// Used to update placement group information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage *gcs_table_storage_ = nullptr;

/// Counter of placement groups broken down by State.
std::shared_ptr<CounterMap<rpc::PlacementGroupTableData::PlacementGroupState>>
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ namespace gcs {

GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage &gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
rpc::NodeManagerClientPool &raylet_client_pool)
: io_context_(io_context),
return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_table_storage_(gcs_table_storage),
gcs_node_manager_(gcs_node_manager),
cluster_resource_scheduler_(cluster_resource_scheduler),
raylet_client_pool_(raylet_client_pool) {}
Expand Down Expand Up @@ -395,7 +395,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(

placement_group->UpdateState(rpc::PlacementGroupTableData::PREPARED);

RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().Put(
placement_group_id,
placement_group->GetPlacementGroupTableData(),
[this, lease_status_tracker, schedule_failure_handler, schedule_success_handler](
Expand Down
Loading
Loading