diff --git a/src/mock/ray/gcs/gcs_server/gcs_actor_manager.h b/src/mock/ray/gcs/gcs_server/gcs_actor_manager.h index 528d0d1af6d9..48e909111092 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/mock/ray/gcs/gcs_server/gcs_actor_manager.h @@ -30,9 +30,9 @@ class MockGcsActorManager : public GcsActorManager { MockGcsActorManager(RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager) : GcsActorManager( - nullptr, - nullptr, - nullptr, + /*scheduler=*/nullptr, + /*gcs_table_storage=*/nullptr, + /*gcs_publisher=*/nullptr, runtime_env_manager, function_manager, [](const ActorID &) {}, diff --git a/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 1fd3d4b5bb5f..a0d6f84d1663 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -15,16 +15,6 @@ namespace ray { namespace gcs { -class Mockpair_hash : public pair_hash { - public: -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - class MockGcsPlacementGroupSchedulerInterface : public GcsPlacementGroupSchedulerInterface { public: diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 8bf3f3d484f1..e6ff1ef9fca4 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -324,17 +324,18 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause( ///////////////////////////////////////////////////////////////////////////////////////// GcsActorManager::GcsActorManager( std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_publisher, + GcsTableStorage *gcs_table_storage, + GcsPublisher *gcs_publisher, RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, std::function destroy_owned_placement_group_if_needed, const rpc::CoreWorkerClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), - gcs_table_storage_(std::move(gcs_table_storage)), - gcs_publisher_(std::move(gcs_publisher)), + gcs_table_storage_(gcs_table_storage), + gcs_publisher_(gcs_publisher), worker_client_factory_(worker_client_factory), - destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed), + destroy_owned_placement_group_if_needed_( + std::move(destroy_owned_placement_group_if_needed)), runtime_env_manager_(runtime_env_manager), function_manager_(function_manager), actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index afdf55be80d6..fcc1202f8ee3 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -314,8 +314,8 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param gcs_publisher Used to publish gcs message. GcsActorManager( std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_publisher, + GcsTableStorage *gcs_table_storage, + GcsPublisher *gcs_publisher, RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, std::function destroy_owned_placement_group_if_needed, @@ -687,9 +687,9 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// The scheduler to schedule all registered actors. std::shared_ptr gcs_actor_scheduler_; /// Used to update actor information upon creation, deletion, etc. - std::shared_ptr gcs_table_storage_; + GcsTableStorage *gcs_table_storage_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_publisher_; + GcsPublisher *gcs_publisher_; /// Factory to produce clients to workers. This is used to communicate with /// actors and their owners. rpc::CoreWorkerClientFactoryFn worker_client_factory_; diff --git a/src/ray/gcs/gcs_server/gcs_init_data.cc b/src/ray/gcs/gcs_server/gcs_init_data.cc index 60aceeb4178c..76bac0670648 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.cc +++ b/src/ray/gcs/gcs_server/gcs_init_data.cc @@ -47,7 +47,7 @@ void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) { << job_table_data_.size(); on_done(); }; - RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(load_job_table_data_callback)); + RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(load_job_table_data_callback)); } void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) { @@ -59,7 +59,7 @@ void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) { << node_table_data_.size(); on_done(); }; - RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback)); + RAY_CHECK_OK(gcs_table_storage_.NodeTable().GetAll(load_node_table_data_callback)); } void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) { @@ -72,7 +72,7 @@ void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) << placement_group_table_data_.size(); on_done(); }; - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll( + RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().GetAll( load_placement_group_table_data_callback)); } @@ -85,7 +85,7 @@ void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) { << actor_table_data_.size(); on_done(); }; - RAY_CHECK_OK(gcs_table_storage_->ActorTable().AsyncRebuildIndexAndGetAll( + RAY_CHECK_OK(gcs_table_storage_.ActorTable().AsyncRebuildIndexAndGetAll( load_actor_table_data_callback)); } @@ -98,9 +98,9 @@ void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done) << actor_task_spec_table_data_.size(); on_done(); }; - RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().GetAll( + RAY_CHECK_OK(gcs_table_storage_.ActorTaskSpecTable().GetAll( load_actor_task_spec_table_data_callback)); } } // namespace gcs -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_init_data.h b/src/ray/gcs/gcs_server/gcs_init_data.h index 7ee0df9da447..96890aeee5ae 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.h +++ b/src/ray/gcs/gcs_server/gcs_init_data.h @@ -31,8 +31,8 @@ class GcsInitData { /// Create a GcsInitData. /// /// \param gcs_table_storage The storage from which the metadata will be loaded. - explicit GcsInitData(std::shared_ptr gcs_table_storage) - : gcs_table_storage_(std::move(gcs_table_storage)) {} + explicit GcsInitData(gcs::GcsTableStorage &gcs_table_storage) + : gcs_table_storage_(gcs_table_storage) {} /// Load all required metadata from the store into memory at once asynchronously. /// @@ -89,7 +89,7 @@ class GcsInitData { protected: /// The gcs table storage. - std::shared_ptr gcs_table_storage_; + gcs::GcsTableStorage &gcs_table_storage_; /// Job metadata. absl::flat_hash_map job_table_data_; diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index f68a764f600c..48a1969046e1 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -102,7 +102,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request, RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id << ", driver pid = " << job_table_data.driver_pid(); } else { - RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, /*done=*/nullptr)); + RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, /*done=*/nullptr)); if (job_table_data.config().has_runtime_env_info()) { runtime_env_manager_.AddURIReference(job_id.Hex(), job_table_data.config().runtime_env_info()); @@ -122,7 +122,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request, }; Status status = - gcs_table_storage_->JobTable().Put(job_id, mutable_job_table_data, on_done); + gcs_table_storage_.JobTable().Put(job_id, mutable_job_table_data, on_done); if (!status.ok()) { on_done(status); } @@ -143,7 +143,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, if (!status.ok()) { RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; } else { - RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr)); + RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, nullptr)); runtime_env_manager_.RemoveURIReference(job_id.Hex()); ClearJobInfos(job_table_data); RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id; @@ -160,7 +160,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, done_callback(status); }; - Status status = gcs_table_storage_->JobTable().Put(job_id, job_table_data, on_done); + Status status = gcs_table_storage_.JobTable().Put(job_id, job_table_data, on_done); if (!status.ok()) { on_done(status); } @@ -176,7 +176,7 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request, GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_table_storage_->JobTable().Get( + Status status = gcs_table_storage_.JobTable().Get( job_id, [this, job_id, send_reply](const Status &status, const std::optional &result) { @@ -423,7 +423,7 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback); } }; - Status status = gcs_table_storage_->JobTable().GetAll(on_done); + Status status = gcs_table_storage_.JobTable().GetAll(on_done); if (!status.ok()) { on_done(absl::flat_hash_map()); } @@ -433,14 +433,14 @@ void GcsJobManager::HandleReportJobError(rpc::ReportJobErrorRequest request, rpc::ReportJobErrorReply *reply, rpc::SendReplyCallback send_reply_callback) { auto job_id = JobID::FromBinary(request.job_error().job_id()); - RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr)); + RAY_CHECK_OK(gcs_publisher_.PublishError(job_id.Hex(), request.job_error(), nullptr)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } void GcsJobManager::HandleGetNextJobID(rpc::GetNextJobIDRequest request, rpc::GetNextJobIDReply *reply, rpc::SendReplyCallback send_reply_callback) { - reply->set_job_id(gcs_table_storage_->GetNextJobID()); + reply->set_job_id(gcs_table_storage_.GetNextJobID()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } @@ -472,7 +472,7 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) { }; // make all jobs in current node to finished - RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done)); + RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(on_done)); } void GcsJobManager::RecordMetrics() { diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index b558cbc9c64d..0ae53270ca90 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -50,14 +50,14 @@ using JobFinishListenerCallback = rpc::JobInfoHandler::JobFinishListenerCallback /// This implementation class of `JobInfoHandler`. class GcsJobManager : public rpc::JobInfoHandler { public: - explicit GcsJobManager(std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_publisher, + explicit GcsJobManager(GcsTableStorage &gcs_table_storage, + GcsPublisher &gcs_publisher, RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, InternalKVInterface &internal_kv, rpc::CoreWorkerClientFactoryFn client_factory = nullptr) - : gcs_table_storage_(std::move(gcs_table_storage)), - gcs_publisher_(std::move(gcs_publisher)), + : gcs_table_storage_(gcs_table_storage), + gcs_publisher_(gcs_publisher), runtime_env_manager_(runtime_env_manager), function_manager_(function_manager), internal_kv_(internal_kv), @@ -118,8 +118,8 @@ class GcsJobManager : public rpc::JobInfoHandler { // Number of finished jobs since start of this GCS Server, used to report metrics. int64_t finished_jobs_count_ = 0; - std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_publisher_; + GcsTableStorage &gcs_table_storage_; + GcsPublisher &gcs_publisher_; /// Listeners which monitors the finish of jobs. std::vector job_finished_listeners_; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 15aa488cb65f..d73bbfb99985 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -29,12 +29,12 @@ namespace ray { namespace gcs { ////////////////////////////////////////////////////////////////////////////////////////// -GcsNodeManager::GcsNodeManager(std::shared_ptr gcs_publisher, - std::shared_ptr gcs_table_storage, +GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher, + gcs::GcsTableStorage *gcs_table_storage, rpc::NodeManagerClientPool *raylet_client_pool, const ClusterID &cluster_id) - : gcs_publisher_(std::move(gcs_publisher)), - gcs_table_storage_(std::move(gcs_table_storage)), + : gcs_publisher_(gcs_publisher), + gcs_table_storage_(gcs_table_storage), raylet_client_pool_(raylet_client_pool), cluster_id_(cluster_id) {} diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index b924fec264c9..e6c9bd2f7c0c 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -48,10 +48,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - explicit GcsNodeManager(std::shared_ptr gcs_publisher, - std::shared_ptr gcs_table_storage, - rpc::NodeManagerClientPool *raylet_client_pool, - const ClusterID &cluster_id); + GcsNodeManager(GcsPublisher *gcs_publisher, + gcs::GcsTableStorage *gcs_table_storage, + rpc::NodeManagerClientPool *raylet_client_pool, + const ClusterID &cluster_id); /// Handle register rpc request come from raylet. void HandleGetClusterId(rpc::GetClusterIdRequest request, @@ -244,9 +244,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::vector)>> node_removed_listeners_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_publisher_; + GcsPublisher *gcs_publisher_; /// Storage for GCS tables. - std::shared_ptr gcs_table_storage_; + gcs::GcsTableStorage *gcs_table_storage_; /// Raylet client pool. rpc::NodeManagerClientPool *raylet_client_pool_ = nullptr; /// Cluster ID to be shared with clients when connecting. diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 1aec60e9603c..46e19ff7a857 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -181,15 +181,19 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() { ///////////////////////////////////////////////////////////////////////////////////////// +GcsPlacementGroupManager::GcsPlacementGroupManager( + instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager) + : io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {} + GcsPlacementGroupManager::GcsPlacementGroupManager( instrumented_io_context &io_context, GcsPlacementGroupSchedulerInterface *scheduler, - std::shared_ptr gcs_table_storage, + gcs::GcsTableStorage *gcs_table_storage, GcsResourceManager &gcs_resource_manager, std::function get_ray_namespace) : io_context_(io_context), gcs_placement_group_scheduler_(scheduler), - gcs_table_storage_(std::move(gcs_table_storage)), + gcs_table_storage_(gcs_table_storage), gcs_resource_manager_(gcs_resource_manager), get_ray_namespace_(std::move(get_ray_namespace)) { placement_group_state_counter_.reset( @@ -205,10 +209,6 @@ GcsPlacementGroupManager::GcsPlacementGroupManager( Tick(); } -GcsPlacementGroupManager::GcsPlacementGroupManager( - instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager) - : io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {} - void GcsPlacementGroupManager::RegisterPlacementGroup( const std::shared_ptr &placement_group, StatusCallback callback) { // NOTE: After the abnormal recovery of the network between GCS client and GCS server or diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index a7d91388e264..728b1eacf741 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -18,9 +18,7 @@ #include #include -#include "absl/container/btree_map.h" #include "absl/container/flat_hash_map.h" -#include "absl/container/flat_hash_set.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/bundle_spec.h" #include "ray/common/id.h" @@ -238,11 +236,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param get_ray_namespace A callback to get the ray namespace. GcsPlacementGroupManager(instrumented_io_context &io_context, GcsPlacementGroupSchedulerInterface *scheduler, - std::shared_ptr gcs_table_storage, + gcs::GcsTableStorage *gcs_table_storage, GcsResourceManager &gcs_resource_manager, std::function get_ray_namespace); - ~GcsPlacementGroupManager() = default; + ~GcsPlacementGroupManager() override = default; void HandleCreatePlacementGroup(rpc::CreatePlacementGroupRequest request, rpc::CreatePlacementGroupReply *reply, @@ -484,7 +482,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { gcs::GcsPlacementGroupSchedulerInterface *gcs_placement_group_scheduler_ = nullptr; /// Used to update placement group information upon creation, deletion, etc. - std::shared_ptr gcs_table_storage_; + gcs::GcsTableStorage *gcs_table_storage_ = nullptr; /// Counter of placement groups broken down by State. std::shared_ptr> diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 85a94f863598..e58850e7e6a4 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -23,13 +23,13 @@ namespace gcs { GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( instrumented_io_context &io_context, - std::shared_ptr gcs_table_storage, + gcs::GcsTableStorage &gcs_table_storage, const gcs::GcsNodeManager &gcs_node_manager, ClusterResourceScheduler &cluster_resource_scheduler, rpc::NodeManagerClientPool &raylet_client_pool) : io_context_(io_context), return_timer_(io_context), - gcs_table_storage_(std::move(gcs_table_storage)), + gcs_table_storage_(gcs_table_storage), gcs_node_manager_(gcs_node_manager), cluster_resource_scheduler_(cluster_resource_scheduler), raylet_client_pool_(raylet_client_pool) {} @@ -395,7 +395,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( placement_group->UpdateState(rpc::PlacementGroupTableData::PREPARED); - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( + RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().Put( placement_group_id, placement_group->GetPlacementGroupTableData(), [this, lease_status_tracker, schedule_failure_handler, schedule_success_handler]( diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index df16f025d082..2fc76f9a2e22 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -291,7 +291,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// scheduling. /// \param lease_client_factory Factory to create remote lease client. GcsPlacementGroupScheduler(instrumented_io_context &io_context, - std::shared_ptr gcs_table_storage, + gcs::GcsTableStorage &gcs_table_storage, const GcsNodeManager &gcs_node_manager, ClusterResourceScheduler &cluster_resource_scheduler, rpc::NodeManagerClientPool &raylet_client_pool); @@ -485,7 +485,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { boost::asio::deadline_timer return_timer_; /// Used to update placement group information upon creation, deletion, etc. - std::shared_ptr gcs_table_storage_; + gcs::GcsTableStorage &gcs_table_storage_; /// Reference of GcsNodeManager. const GcsNodeManager &gcs_node_manager_; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 2d2574bc395e..efc84f5a0e7b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -73,11 +73,11 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, RAY_LOG(INFO) << "GCS storage type is " << storage_type_; switch (storage_type_) { case StorageType::IN_MEMORY: - gcs_table_storage_ = std::make_shared( + gcs_table_storage_ = std::make_unique( io_context_provider_.GetDefaultIOContext()); break; case StorageType::REDIS_PERSIST: - gcs_table_storage_ = std::make_shared(GetOrConnectRedis()); + gcs_table_storage_ = std::make_unique(GetOrConnectRedis()); break; default: RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_; @@ -123,7 +123,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, /*publish_batch_size_=*/RayConfig::instance().publish_batch_size(), /*publisher_id=*/NodeID::FromRandom()); - gcs_publisher_ = std::make_shared(std::move(inner_publisher)); + gcs_publisher_ = std::make_unique(std::move(inner_publisher)); } GcsServer::~GcsServer() { Stop(); } @@ -138,7 +138,7 @@ RedisClientOptions GcsServer::GetRedisClientOptions() const { void GcsServer::Start() { // Load gcs tables data asynchronously. - auto gcs_init_data = std::make_shared(gcs_table_storage_); + auto gcs_init_data = std::make_shared(*gcs_table_storage_); // Init KV Manager. This needs to be initialized first here so that // it can be used to retrieve the cluster ID. InitKVManager(); @@ -288,8 +288,8 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_); - gcs_node_manager_ = std::make_unique(gcs_publisher_, - gcs_table_storage_, + gcs_node_manager_ = std::make_unique(gcs_publisher_.get(), + gcs_table_storage_.get(), raylet_client_pool_.get(), rpc_server_.GetClusterId()); // Initialize by gcs tables data. @@ -417,8 +417,8 @@ void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) { }); }; RAY_CHECK(gcs_table_storage_ && gcs_publisher_); - gcs_job_manager_ = std::make_unique(gcs_table_storage_, - gcs_publisher_, + gcs_job_manager_ = std::make_unique(*gcs_table_storage_, + *gcs_publisher_, *runtime_env_manager_, *function_manager_, kv_manager_->GetInstance(), @@ -472,12 +472,11 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { [this](const NodeID &node_id, const rpc::ResourcesData &resources) { gcs_resource_manager_->UpdateNodeNormalTaskResources(node_id, resources); }); - gcs_actor_manager_ = std::make_unique( std::move(scheduler), - gcs_table_storage_, - gcs_publisher_, + gcs_table_storage_.get(), + gcs_publisher_.get(), *runtime_env_manager_, *function_manager_, [this](const ActorID &actor_id) { @@ -504,7 +503,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_node_manager_); gcs_placement_group_scheduler_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), - gcs_table_storage_, + *gcs_table_storage_, *gcs_node_manager_, *cluster_resource_scheduler_, *raylet_client_pool_); @@ -512,7 +511,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { gcs_placement_group_manager_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), gcs_placement_group_scheduler_.get(), - gcs_table_storage_, + gcs_table_storage_.get(), *gcs_resource_manager_, [this](const JobID &job_id) { return gcs_job_manager_->GetJobConfig(job_id)->ray_namespace(); @@ -598,7 +597,7 @@ void GcsServer::InitKVService() { void GcsServer::InitPubSubHandler() { auto &io_context = io_context_provider_.GetIOContext(); - pubsub_handler_ = std::make_unique(io_context, gcs_publisher_); + pubsub_handler_ = std::make_unique(io_context, *gcs_publisher_); pubsub_service_ = std::make_unique(io_context, *pubsub_handler_); // Register service. @@ -646,7 +645,7 @@ void GcsServer::InitRuntimeEnvManager() { void GcsServer::InitGcsWorkerManager() { gcs_worker_manager_ = - std::make_unique(gcs_table_storage_, gcs_publisher_); + std::make_unique(*gcs_table_storage_, *gcs_publisher_); // Register service. worker_info_service_.reset(new rpc::WorkerInfoGrpcService( io_context_provider_.GetDefaultIOContext(), *gcs_worker_manager_)); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 5c079bf8b3c8..194ba9351525 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -80,10 +80,14 @@ class GcsAutoscalerStateManager; /// and the management of actor creation. /// For more details, please see the design document. /// https://docs.google.com/document/d/1d-9qBlsh2UQHo-AWMWR0GptI_Ajwu4SKx0Q0LHKPpeI/edit#heading=h.csi0gaglj2pv +/// +/// Notes on lifecycle: +/// 1. Gcs server contains a lot of data member, gcs server outlives all of them. +/// 2. Gcs table storage and all gcs managers share a lifetime, that starts from a +/// `DoStart` call to `Stop`. class GcsServer { public: - explicit GcsServer(const GcsServerConfig &config, - instrumented_io_context &main_service); + GcsServer(const GcsServerConfig &config, instrumented_io_context &main_service); virtual ~GcsServer(); /// Start gcs server. @@ -289,13 +293,13 @@ class GcsServer { /// Backend client. std::shared_ptr redis_client_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_publisher_; + std::unique_ptr gcs_publisher_; /// Grpc based pubsub's periodical runner. PeriodicalRunner pubsub_periodical_runner_; /// The runner to run function periodically. PeriodicalRunner periodical_runner_; /// The gcs table storage. - std::shared_ptr gcs_table_storage_; + std::unique_ptr gcs_table_storage_; /// Stores references to URIs stored by the GCS for runtime envs. std::unique_ptr runtime_env_manager_; /// Gcs service state flag, which is used for ut. diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.cc b/src/ray/gcs/gcs_server/gcs_worker_manager.cc index 00bb62a9e35a..61819281fab3 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.cc @@ -19,10 +19,12 @@ namespace ray { namespace gcs { +namespace { bool IsIntentionalWorkerFailure(rpc::WorkerExitType exit_type) { return exit_type == rpc::WorkerExitType::INTENDED_USER_EXIT || exit_type == rpc::WorkerExitType::INTENDED_SYSTEM_EXIT; } +} // namespace void GcsWorkerManager::HandleReportWorkerFailure( rpc::ReportWorkerFailureRequest request, @@ -94,7 +96,7 @@ void GcsWorkerManager::HandleReportWorkerFailure( worker_failure.set_raylet_id( worker_failure_data->worker_address().raylet_id()); RAY_CHECK_OK( - gcs_publisher_->PublishWorkerFailure(worker_id, worker_failure, nullptr)); + gcs_publisher_.PublishWorkerFailure(worker_id, worker_failure, nullptr)); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; @@ -103,7 +105,7 @@ void GcsWorkerManager::HandleReportWorkerFailure( // receives the worker registration information first and then the worker failure // message, so we delete the get operation. Related issues: // https://github.com/ray-project/ray/pull/11599 - Status status = gcs_table_storage_->WorkerTable().Put( + Status status = gcs_table_storage_.WorkerTable().Put( worker_id, *worker_failure_data, on_done); if (!status.ok()) { on_done(status); @@ -196,7 +198,7 @@ void GcsWorkerManager::HandleGetAllWorkerInfo( RAY_LOG(DEBUG) << "Finished getting all worker info."; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }; - Status status = gcs_table_storage_->WorkerTable().GetAll(on_done); + Status status = gcs_table_storage_.WorkerTable().GetAll(on_done); if (!status.ok()) { on_done(absl::flat_hash_map()); } @@ -220,7 +222,7 @@ void GcsWorkerManager::HandleAddWorkerInfo(rpc::AddWorkerInfoRequest request, GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_table_storage_->WorkerTable().Put(worker_id, *worker_data, on_done); + Status status = gcs_table_storage_.WorkerTable().Put(worker_id, *worker_data, on_done); if (!status.ok()) { on_done(status); } @@ -257,7 +259,7 @@ void GcsWorkerManager::HandleUpdateWorkerDebuggerPort( auto worker_data = std::make_shared(); worker_data->CopyFrom(*result); worker_data->set_debugger_port(debugger_port); - Status status = gcs_table_storage_->WorkerTable().Put( + Status status = gcs_table_storage_.WorkerTable().Put( worker_id, *worker_data, on_worker_update_done); if (!status.ok()) { GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); @@ -265,7 +267,7 @@ void GcsWorkerManager::HandleUpdateWorkerDebuggerPort( } }; - Status status = gcs_table_storage_->WorkerTable().Get(worker_id, on_worker_get_done); + Status status = gcs_table_storage_.WorkerTable().Get(worker_id, on_worker_get_done); if (!status.ok()) { GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } @@ -313,7 +315,7 @@ void GcsWorkerManager::HandleUpdateWorkerNumPausedThreads( worker_data->has_num_paused_threads() ? worker_data->num_paused_threads() : 0; worker_data->set_num_paused_threads(current_num_paused_threads + num_paused_threads_delta); - Status status = gcs_table_storage_->WorkerTable().Put( + Status status = gcs_table_storage_.WorkerTable().Put( worker_id, *worker_data, on_worker_update_done); if (!status.ok()) { GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); @@ -321,7 +323,7 @@ void GcsWorkerManager::HandleUpdateWorkerNumPausedThreads( } }; - Status status = gcs_table_storage_->WorkerTable().Get(worker_id, on_worker_get_done); + Status status = gcs_table_storage_.WorkerTable().Get(worker_id, on_worker_get_done); if (!status.ok()) { GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } @@ -347,7 +349,7 @@ void GcsWorkerManager::GetWorkerInfo( } }; - Status status = gcs_table_storage_->WorkerTable().Get(worker_id, on_done); + Status status = gcs_table_storage_.WorkerTable().Get(worker_id, on_done); if (!status.ok()) { on_done(status, std::nullopt); } diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.h b/src/ray/gcs/gcs_server/gcs_worker_manager.h index a835b892a682..5b2ef7e019f1 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.h +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.h @@ -26,8 +26,7 @@ namespace gcs { /// This implementation class of `WorkerInfoHandler`. class GcsWorkerManager : public rpc::WorkerInfoHandler { public: - explicit GcsWorkerManager(std::shared_ptr gcs_table_storage, - std::shared_ptr &gcs_publisher) + GcsWorkerManager(gcs::GcsTableStorage &gcs_table_storage, GcsPublisher &gcs_publisher) : gcs_table_storage_(gcs_table_storage), gcs_publisher_(gcs_publisher) {} void HandleReportWorkerFailure(rpc::ReportWorkerFailureRequest request, @@ -68,8 +67,8 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler { const WorkerID &worker_id, std::function &)> callback) const; - std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_publisher_; + gcs::GcsTableStorage &gcs_table_storage_; + GcsPublisher &gcs_publisher_; UsageStatsClient *usage_stats_client_; std::vector)>> worker_dead_listeners_; diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index d926b051102c..be224cd78299 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -17,19 +17,16 @@ namespace ray { namespace gcs { -InternalPubSubHandler::InternalPubSubHandler( - instrumented_io_context &io_service, - const std::shared_ptr &gcs_publisher) - : io_service_(io_service), gcs_publisher_(gcs_publisher) { - RAY_CHECK(gcs_publisher_); -} +InternalPubSubHandler::InternalPubSubHandler(instrumented_io_context &io_service, + gcs::GcsPublisher &gcs_publisher) + : io_service_(io_service), gcs_publisher_(gcs_publisher) {} void InternalPubSubHandler::HandleGcsPublish(rpc::GcsPublishRequest request, rpc::GcsPublishReply *reply, rpc::SendReplyCallback send_reply_callback) { RAY_LOG(DEBUG) << "received publish request: " << request.DebugString(); for (const auto &msg : request.pub_messages()) { - gcs_publisher_->GetPublisher().Publish(msg); + gcs_publisher_.GetPublisher().Publish(msg); } send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -47,7 +44,7 @@ void InternalPubSubHandler::HandleGcsSubscriberPoll( pubsub_req.set_max_processed_sequence_id(request.max_processed_sequence_id()); auto pubsub_reply = std::make_shared(); auto pubsub_reply_ptr = pubsub_reply.get(); - gcs_publisher_->GetPublisher().ConnectToSubscriber( + gcs_publisher_.GetPublisher().ConnectToSubscriber( pubsub_req, pubsub_reply_ptr, [reply, @@ -83,13 +80,13 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch( for (const auto &command : request.commands()) { if (command.has_unsubscribe_message()) { - gcs_publisher_->GetPublisher().UnregisterSubscription( + gcs_publisher_.GetPublisher().UnregisterSubscription( command.channel_type(), subscriber_id, command.key_id().empty() ? std::nullopt : std::make_optional(command.key_id())); iter->second.erase(subscriber_id); } else if (command.has_subscribe_message()) { - gcs_publisher_->GetPublisher().RegisterSubscription( + gcs_publisher_.GetPublisher().RegisterSubscription( command.channel_type(), subscriber_id, command.key_id().empty() ? std::nullopt : std::make_optional(command.key_id())); @@ -108,7 +105,7 @@ void InternalPubSubHandler::HandleGcsUnregisterSubscriber( rpc::GcsUnregisterSubscriberReply *reply, rpc::SendReplyCallback send_reply_callback) { const auto subscriber_id = UniqueID::FromBinary(request.subscriber_id()); - gcs_publisher_->GetPublisher().UnregisterSubscriber(subscriber_id); + gcs_publisher_.GetPublisher().UnregisterSubscriber(subscriber_id); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -118,7 +115,7 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { return; } for (auto &subscriber_id : iter->second) { - gcs_publisher_->GetPublisher().UnregisterSubscriber(subscriber_id); + gcs_publisher_.GetPublisher().UnregisterSubscriber(subscriber_id); } sender_to_subscribers_.erase(iter); } diff --git a/src/ray/gcs/gcs_server/pubsub_handler.h b/src/ray/gcs/gcs_server/pubsub_handler.h index a92209a6954c..979c75db498d 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.h +++ b/src/ray/gcs/gcs_server/pubsub_handler.h @@ -29,7 +29,7 @@ namespace gcs { class InternalPubSubHandler : public rpc::InternalPubSubHandler { public: InternalPubSubHandler(instrumented_io_context &io_service, - const std::shared_ptr &gcs_publisher); + gcs::GcsPublisher &gcs_publisher); void HandleGcsPublish(rpc::GcsPublishRequest request, rpc::GcsPublishReply *reply, @@ -54,7 +54,7 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler { private: /// Not owning the io service, to allow sharing it with pubsub::Publisher. instrumented_io_context &io_service_; - std::shared_ptr gcs_publisher_; + gcs::GcsPublisher &gcs_publisher_; absl::flat_hash_map> sender_to_subscribers_; }; diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc index ce6955fed38f..b4aa5ced1c63 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc @@ -37,7 +37,7 @@ using json = nlohmann::json; class MockActorScheduler : public gcs::GcsActorSchedulerInterface { public: - MockActorScheduler() {} + MockActorScheduler() = default; void Schedule(std::shared_ptr actor) { actors.push_back(actor); } void Reschedule(std::shared_ptr actor) {} @@ -147,15 +147,15 @@ class GcsActorManagerTest : public ::testing::Test { /*subscriber_timeout_ms=*/absl::ToInt64Microseconds(absl::Seconds(30)), /*batch_size=*/100); - gcs_publisher_ = std::make_shared(std::move(publisher)); + gcs_publisher_ = std::make_unique(std::move(publisher)); store_client_ = std::make_shared(io_service_); - gcs_table_storage_ = std::make_shared(io_service_); + gcs_table_storage_ = std::make_unique(io_service_); kv_ = std::make_unique(); function_manager_ = std::make_unique(*kv_); gcs_actor_manager_ = std::make_unique( mock_actor_scheduler_, - gcs_table_storage_, - gcs_publisher_, + gcs_table_storage_.get(), + gcs_publisher_.get(), *runtime_env_mgr_, *function_manager_, [](const ActorID &actor_id) {}, diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc index 832490d2f81e..06a2483e888e 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc @@ -107,14 +107,14 @@ TEST_F(GcsJobManagerTest, TestExportDriverJobEvents) { log_dir_, "warning", false); - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, client_factory_); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); auto job_api_job_id = JobID::FromInt(100); diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc index 61d2d0e8b932..14d1ddf4b582 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc @@ -43,9 +43,9 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { raylet_client_ = std::make_shared(); client_pool_ = std::make_unique( [this](const rpc::Address &) { return raylet_client_; }); - gcs_publisher_ = std::make_shared( + gcs_publisher_ = std::make_unique( std::make_unique()); - gcs_table_storage_ = std::make_shared(io_service_); + gcs_table_storage_ = std::make_unique(io_service_); RayConfig::instance().initialize( R"( @@ -70,7 +70,7 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { } protected: - std::shared_ptr gcs_table_storage_; + std::unique_ptr gcs_table_storage_; std::shared_ptr raylet_client_; std::unique_ptr client_pool_; std::shared_ptr gcs_publisher_; @@ -80,8 +80,10 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { // Test export event is written when a node is added with HandleRegisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); + gcs::GcsNodeManager node_manager(gcs_publisher_.get(), + gcs_table_storage_.get(), + client_pool_.get(), + ClusterID::Nil()); auto node = Mocker::GenNodeInfo(); rpc::RegisterNodeRequest register_request; @@ -102,8 +104,10 @@ TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { // Test export event is written when a node is removed with HandleUnregisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); + gcs::GcsNodeManager node_manager(gcs_publisher_.get(), + gcs_table_storage_.get(), + client_pool_.get(), + ClusterID::Nil()); auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); node_manager.AddNode(node); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 9bb274af97bd..044dfa588a01 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -142,15 +142,15 @@ class GcsActorManagerTest : public ::testing::Test { /*subscriber_timeout_ms=*/absl::ToInt64Microseconds(absl::Seconds(30)), /*batch_size=*/100); - gcs_publisher_ = std::make_shared(std::move(publisher)); + gcs_publisher_ = std::make_unique(std::move(publisher)); store_client_ = std::make_shared(io_service_); - gcs_table_storage_ = std::make_shared(io_service_); + gcs_table_storage_ = std::make_unique(io_service_); kv_ = std::make_unique(); function_manager_ = std::make_unique(*kv_); gcs_actor_manager_ = std::make_unique( mock_actor_scheduler_, - gcs_table_storage_, - gcs_publisher_, + gcs_table_storage_.get(), + gcs_publisher_.get(), *runtime_env_mgr_, *function_manager_, [](const ActorID &actor_id) {}, diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 6302ee02ed63..58f0c1fe1635 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -39,8 +39,10 @@ class GcsActorSchedulerTest : public ::testing::Test { std::make_unique()); store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(io_service_); - gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_.get(), ClusterID::Nil()); + gcs_node_manager_ = std::make_shared(gcs_publisher_.get(), + gcs_table_storage_.get(), + raylet_client_pool_.get(), + ClusterID::Nil()); gcs_actor_table_ = std::make_shared(store_client_); local_node_id_ = NodeID::FromRandom(); diff --git a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc index b18658dffc95..c49dbac3f89f 100644 --- a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc @@ -50,7 +50,7 @@ class GcsJobManagerTest : public ::testing::Test { }); promise.get_future().get(); - gcs_publisher_ = std::make_shared( + gcs_publisher_ = std::make_unique( std::make_unique()); store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(store_client_); @@ -102,14 +102,14 @@ TEST_F(GcsJobManagerTest, TestFakeInternalKV) { } TEST_F(GcsJobManagerTest, TestIsRunningTasks) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, client_factory_); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); // Add 100 jobs. Job i should have i running tasks. @@ -166,14 +166,14 @@ TEST_F(GcsJobManagerTest, TestIsRunningTasks) { } TEST_F(GcsJobManagerTest, TestGetAllJobInfo) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, client_factory_); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); // Add 100 jobs. @@ -343,8 +343,8 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfo) { } TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithFilter) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, @@ -352,7 +352,7 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithFilter) { auto job_id1 = JobID::FromInt(1); auto job_id2 = JobID::FromInt(2); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); rpc::AddJobReply empty_reply; @@ -428,8 +428,8 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithFilter) { } TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithLimit) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, @@ -437,7 +437,7 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithLimit) { auto job_id1 = JobID::FromInt(1); auto job_id2 = JobID::FromInt(2); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); rpc::AddJobReply empty_reply; @@ -531,8 +531,8 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithLimit) { } TEST_F(GcsJobManagerTest, TestGetJobConfig) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *kv_, @@ -540,7 +540,7 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) { auto job_id1 = JobID::FromInt(1); auto job_id2 = JobID::FromInt(2); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); rpc::AddJobReply empty_reply; @@ -573,15 +573,15 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) { } TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, client_factory_); auto job_id = JobID::FromInt(1); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); auto add_job_request = Mocker::GenAddJobRequest(job_id, "namespace"); @@ -640,8 +640,8 @@ TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) { } TEST_F(GcsJobManagerTest, TestNodeFailure) { - gcs::GcsJobManager gcs_job_manager(gcs_table_storage_, - gcs_publisher_, + gcs::GcsJobManager gcs_job_manager(*gcs_table_storage_, + *gcs_publisher_, runtime_env_manager_, *function_manager_, *fake_kv_, @@ -649,7 +649,7 @@ TEST_F(GcsJobManagerTest, TestNodeFailure) { auto job_id1 = JobID::FromInt(1); auto job_id2 = JobID::FromInt(2); - gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs::GcsInitData gcs_init_data(*gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); rpc::AddJobReply empty_reply; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index eb12d59dbdb3..ed3b052e9af1 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -30,20 +30,22 @@ class GcsNodeManagerTest : public ::testing::Test { raylet_client_ = std::make_shared(); client_pool_ = std::make_unique( [this](const rpc::Address &) { return raylet_client_; }); - gcs_publisher_ = std::make_shared( + gcs_publisher_ = std::make_unique( std::make_unique()); } protected: - std::shared_ptr gcs_table_storage_; + std::unique_ptr gcs_table_storage_; std::shared_ptr raylet_client_; std::unique_ptr client_pool_; - std::shared_ptr gcs_publisher_; + std::unique_ptr gcs_publisher_; }; TEST_F(GcsNodeManagerTest, TestManagement) { - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); + gcs::GcsNodeManager node_manager(gcs_publisher_.get(), + gcs_table_storage_.get(), + client_pool_.get(), + ClusterID::Nil()); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); @@ -57,8 +59,10 @@ TEST_F(GcsNodeManagerTest, TestManagement) { } TEST_F(GcsNodeManagerTest, TestListener) { - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); + gcs::GcsNodeManager node_manager(gcs_publisher_.get(), + gcs_table_storage_.get(), + client_pool_.get(), + ClusterID::Nil()); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc index 1e3ef61060c8..346ce1ba58a8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc @@ -48,7 +48,7 @@ class GcsPlacementGroupManagerMockTest : public Test { gcs_placement_group_manager_ = std::make_unique(io_context_, gcs_placement_group_scheduler_.get(), - gcs_table_storage_, + gcs_table_storage_.get(), *resource_manager_, [](auto &) { return ""; }); counter_.reset(new CounterMap()); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index ad808b644b67..4f50cea43cc8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -83,14 +83,14 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { cluster_resource_manager_(io_service_) { gcs_publisher_ = std::make_shared(std::make_unique()); - gcs_table_storage_ = std::make_shared(io_service_); + gcs_table_storage_ = std::make_unique(io_service_); gcs_node_manager_ = std::make_shared(); gcs_resource_manager_ = std::make_shared( io_service_, cluster_resource_manager_, *gcs_node_manager_, NodeID::FromRandom()); gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager( io_service_, mock_placement_group_scheduler_.get(), - gcs_table_storage_, + gcs_table_storage_.get(), *gcs_resource_manager_, [this](const JobID &job_id) { return job_namespace_table_[job_id]; })); counter_.reset(new CounterMap()); @@ -148,7 +148,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { } std::shared_ptr LoadDataFromDataStorage() { - auto gcs_init_data = std::make_shared(gcs_table_storage_); + auto gcs_init_data = std::make_shared(*gcs_table_storage_); std::promise promise; gcs_init_data->AsyncLoad([&promise] { promise.set_value(); }); RunIOService(); @@ -166,7 +166,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { std::shared_ptr> counter_; protected: - std::shared_ptr gcs_table_storage_; + std::unique_ptr gcs_table_storage_; private: instrumented_io_context io_service_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 093bdaf13fcc..9dcedcfb7793 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -54,8 +54,10 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { /*is_node_available_fn=*/ [](auto) { return true; }, /*is_local_node_with_raylet=*/false); - gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_.get(), ClusterID::Nil()); + gcs_node_manager_ = std::make_shared(gcs_publisher_.get(), + gcs_table_storage_.get(), + raylet_client_pool_.get(), + ClusterID::Nil()); gcs_resource_manager_ = std::make_shared( io_service_, cluster_resource_scheduler_->GetClusterResourceManager(), @@ -66,7 +68,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { [this](const rpc::Address &addr) { return raylet_clients_[addr.port()]; }); scheduler_ = std::make_shared( io_service_, - gcs_table_storage_, + *gcs_table_storage_, *gcs_node_manager_, *cluster_resource_scheduler_, *raylet_client_pool_); diff --git a/src/ray/gcs/gcs_server/test/gcs_worker_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_worker_manager_test.cc index 37d6a67b7b0d..1aa7c543fe0b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_worker_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_worker_manager_test.cc @@ -46,7 +46,7 @@ class GcsWorkerManagerTest : public Test { io_service_.run(); }); worker_manager_ = - std::make_shared(gcs_table_storage_, gcs_publisher_); + std::make_shared(*gcs_table_storage_, *gcs_publisher_); } void TearDown() override {