Skip to content

Commit

Permalink
cache the whole runtime env
Browse files Browse the repository at this point in the history
Signed-off-by: hjiang <[email protected]>
  • Loading branch information
dentiny committed Dec 3, 2024
1 parent e83469c commit b52df48
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 66 deletions.
71 changes: 23 additions & 48 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
task_execution_service_work_(task_execution_service_),
exiting_detail_(std::nullopt),
pid_(getpid()),
runtime_env_pb_serialization_cache_(kDefaultSerializationCacheCap),
runtime_env_json_serialization_cache_(kDefaultSerializationCacheCap) {
// Notify that core worker is initialized.
auto initialzed_scope_guard = absl::MakeCleanup([this] {
Expand Down Expand Up @@ -2163,55 +2162,31 @@ json CoreWorker::OverrideRuntimeEnv(const json &child, std::shared_ptr<json> par
return result_runtime_env;
}

std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::GetCachedPbRuntimeEnvOrParse(
// TODO(hjiang): Current implementation is not the most ideal version, since it acquires a
// global lock for all operations; it's acceptable for now since no heavy-lifted operation
// is involved (considering the overall scheduling overhead is single-digit millisecond
// magnitude). But a better solution is LRU cache native providing a native support for
// sharding and `GetOrCreate` API.
std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::OverrideTaskOrActorRuntimeEnvInfo(
const std::string &serialized_runtime_env_info) const {
{
std::lock_guard lck(runtime_env_serialization_mutex_);
auto runtime_info =
runtime_env_pb_serialization_cache_.Get(serialized_runtime_env_info);
if (runtime_info != nullptr) {
return runtime_info;
}
}
auto pb_runtime_env_info = std::make_shared<rpc::RuntimeEnvInfo>();
RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info,
pb_runtime_env_info.get())
.ok());
{
std::lock_guard lck(runtime_env_serialization_mutex_);
runtime_env_pb_serialization_cache_.Put(serialized_runtime_env_info,
pb_runtime_env_info);
}
return pb_runtime_env_info;
}

std::shared_ptr<nlohmann::json> CoreWorker::GetCachedJsonRuntimeEnvOrParse(
const std::string &serialized_runtime_env) const {
{
std::lock_guard lck(runtime_env_serialization_mutex_);
auto runtime_info = runtime_env_json_serialization_cache_.Get(serialized_runtime_env);
if (runtime_info != nullptr) {
return runtime_info;
}
}
auto parsed_json = std::make_shared<json>();
*parsed_json = json::parse(serialized_runtime_env);
{
std::lock_guard lck(runtime_env_serialization_mutex_);
runtime_env_json_serialization_cache_.Put(serialized_runtime_env, parsed_json);
std::lock_guard lck(job_runtime_env_serialization_mutex_);
if (auto cached_runtime_env_info =
runtime_env_json_serialization_cache_.Get(serialized_runtime_env_info);
cached_runtime_env_info != nullptr) {
return cached_runtime_env_info;
}
return parsed_json;
}

std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::OverrideTaskOrActorRuntimeEnvInfo(
const std::string &serialized_runtime_env_info) const {
// TODO(Catch-Bull,SongGuyang): task runtime env not support the field eager_install
// yet, we will overwrite the filed eager_install when it did.
std::shared_ptr<json> parent = nullptr;
std::shared_ptr<json> parent = job_runtime_env_;
std::shared_ptr<rpc::RuntimeEnvInfo> parent_runtime_env_info = nullptr;
auto runtime_env_info = std::make_shared<rpc::RuntimeEnvInfo>();
std::shared_ptr<rpc::RuntimeEnvInfo> runtime_env_info = nullptr;
runtime_env_info.reset(new rpc::RuntimeEnvInfo());

if (!IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) {
runtime_env_info = GetCachedPbRuntimeEnvOrParse(serialized_runtime_env_info);
RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info,
runtime_env_info.get())
.ok());
}

if (options_.worker_type == WorkerType::DRIVER) {
Expand All @@ -2222,8 +2197,8 @@ std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::OverrideTaskOrActorRuntimeEnvIn

auto job_serialized_runtime_env =
worker_context_.GetCurrentJobConfig().runtime_env_info().serialized_runtime_env();
if (!IsRuntimeEnvEmpty(job_serialized_runtime_env)) {
parent = GetCachedJsonRuntimeEnvOrParse(job_serialized_runtime_env);
if (!IsRuntimeEnvEmpty(job_serialized_runtime_env) && parent != nullptr) {
parent = std::make_shared<json>(json::parse(job_serialized_runtime_env));
}
parent_runtime_env_info = std::make_shared<rpc::RuntimeEnvInfo>(
worker_context_.GetCurrentJobConfig().runtime_env_info());
Expand All @@ -2237,9 +2212,9 @@ std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::OverrideTaskOrActorRuntimeEnvIn
if (!parent) {
return runtime_env_info;
}
auto child_runtime_env =
GetCachedJsonRuntimeEnvOrParse(runtime_env_info->serialized_runtime_env());
auto override_runtime_env = OverrideRuntimeEnv(*child_runtime_env, parent);
std::string serialized_runtime_env = runtime_env_info->serialized_runtime_env();
json child_runtime_env = json::parse(serialized_runtime_env);
auto override_runtime_env = OverrideRuntimeEnv(child_runtime_env, parent);
auto serialized_override_runtime_env = override_runtime_env.dump();
runtime_env_info->set_serialized_runtime_env(serialized_override_runtime_env);
if (runtime_env_info->uris().working_dir_uri().empty() &&
Expand Down
31 changes: 13 additions & 18 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#pragma once

#include <memory>
#include <mutex>

#include "absl/base/optimization.h"
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
Expand Down Expand Up @@ -1655,13 +1658,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Sends AnnounceWorkerPort to the GCS. Called in ctor and also in ConnectToRaylet.
void ConnectToRayletInternal();

/// Get json deserialized runtime env in cache; parse and fill in if uncached.
std::shared_ptr<nlohmann::json> GetCachedJsonRuntimeEnvOrParse(
const std::string &serialized_runtime_env_info) const;

/// Get protobuf deserialized runtime env in cache; parse and fill in if uncached.
std::shared_ptr<rpc::RuntimeEnvInfo> GetCachedPbRuntimeEnvOrParse(
const std::string &serialized_runtime_env) const;
// Fallback for when GetAsync cannot directly get the requested object.
void PlasmaCallback(SetResultCallback success,
std::shared_ptr<RayObject> ray_object,
ObjectID object_id,
void *py_future);

/// Shared state of the worker. Includes process-level and thread-level state.
/// TODO(edoakes): we should move process-level state into this class and make
Expand Down Expand Up @@ -1839,12 +1840,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
absl::flat_hash_map<ObjectID, std::vector<std::function<void(void)>>>
async_plasma_callbacks_ ABSL_GUARDED_BY(plasma_mutex_);

// Fallback for when GetAsync cannot directly get the requested object.
void PlasmaCallback(SetResultCallback success,
std::shared_ptr<RayObject> ray_object,
ObjectID object_id,
void *py_future);

/// The detail reason why the core worker has exited.
/// If this value is set, it means the exit process has begun.
std::optional<std::string> exiting_detail_ ABSL_GUARDED_BY(mutex_);
Expand Down Expand Up @@ -1873,12 +1868,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
absl::flat_hash_set<ObjectID> deleted_generator_ids_;

/// Serialized runtime info env are cached.
mutable std::mutex runtime_env_serialization_mutex_;
/// Maps serialized runtime env to **immutable** deserialized protobuf.
mutable std::mutex job_runtime_env_serialization_mutex_;
/// For a job, [job_runtime_env_] never changes. non-`nullptr` if cached, otherwise
/// nullptr.
mutable std::shared_ptr<nlohmann::json> job_runtime_env_;
/// Maps serialized runtime env info to **immutable** deserialized protobuf.
mutable utils::container::SharedLruCache<std::string, rpc::RuntimeEnvInfo>
runtime_env_pb_serialization_cache_;
/// Maps serialized runtime env to **immutable** deserialized json.
mutable utils::container::SharedLruCache<std::string, nlohmann::json>
runtime_env_json_serialization_cache_;
};

Expand Down

0 comments on commit b52df48

Please sign in to comment.