Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduces Postable for InternalKVInterface. #48584

Draft
wants to merge 31 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bcf81c8
dedicated kv ioctx
rynewang Sep 18, 2024
c84fd80
move gcs_table_storage_ back to main service.
rynewang Sep 18, 2024
a021521
Merge branch 'master' into dedicated-kv-ioctx
rynewang Sep 20, 2024
5d8eaee
Merge branch 'master' into dedicated-kv-ioctx
rynewang Sep 20, 2024
36fc808
fix cpp test
rynewang Sep 23, 2024
a87c39d
fix atomics now that we have multiple thread reads...
rynewang Sep 24, 2024
7cd7705
atomics
rynewang Sep 24, 2024
bbf02cd
fix
rynewang Sep 24, 2024
99f7ba9
size_t -> int for proto
rynewang Sep 25, 2024
a1ab6c6
fix atomics in periodical_runner
rynewang Sep 25, 2024
cf3f343
update doc
rynewang Sep 25, 2024
6d006e9
stopped -> shared_ptr<atomic<bool>>
rynewang Sep 26, 2024
110ae3e
rename
rynewang Sep 27, 2024
3d5e7f0
Merge remote-tracking branch 'origin/master' into dedicated-kv-ioctx
rynewang Sep 27, 2024
3461330
fit lint
rynewang Sep 27, 2024
4ade4af
Merge remote-tracking branch 'origin/master' into dedicated-kv-ioctx
rynewang Oct 29, 2024
a607528
type traits and policy for kv
rynewang Oct 29, 2024
698cbe9
remove temp code
rynewang Oct 29, 2024
a5e3d57
fix GetOrConnectRedis
rynewang Nov 4, 2024
265f305
Dispatchable class and InternalKVInterface
rynewang Nov 5, 2024
69a1c48
remove copy-as, only move-as
rynewang Nov 5, 2024
97bc8ea
wip big postable
rynewang Nov 22, 2024
d07ffef
std function based
rynewang Nov 26, 2024
e6c9df9
Postable for all
rynewang Nov 26, 2024
1572f8f
add .h and revert accidental change
rynewang Nov 26, 2024
e447b83
lint
rynewang Nov 26, 2024
5e0a080
add Dispatch() method for redis
rynewang Dec 2, 2024
5ada425
Merge remote-tracking branch 'origin/master' into ioctx-dispatchable-…
rynewang Dec 2, 2024
9b57f07
lint
rynewang Dec 2, 2024
a1eede1
move only Post(), and unit tests
rynewang Dec 3, 2024
418d9d3
lint
rynewang Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5249,7 +5249,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
user_callback = <object>user_callback_ptr
user_callback(result)
except Exception:
# Only log the error here because this calllback is called from Cpp
# Only log the error here because this callback is called from Cpp
# and Cython will ignore the exception anyway
logger.exception(f"failed to run async callback (user func)")
finally:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ cdef extern from * namespace "ray::gcs" nogil:
std::make_unique<RedisStoreClient>(std::move(redis_client)));

bool ret_val = false;
cli->Get("session", key, [&](std::optional<std::string> result) {
cli->Get("session", key, {[&](std::optional<std::string> result) {
if (result.has_value()) {
*data = result.value();
ret_val = true;
Expand All @@ -111,7 +111,7 @@ cdef extern from * namespace "ray::gcs" nogil:
<< " from persistent storage.";
ret_val = false;
}
});
}, io_service});
io_service.run_for(std::chrono::milliseconds(1000));

return ret_val;
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_http_proxy_return_aribitary_objects(ray_instance):
],
indirect=True,
)
def test_http_proxy_calllback_failures(ray_instance, capsys):
def test_http_proxy_callback_failures(ray_instance, capsys):
"""Test http proxy keeps restarting when callback function fails"""

try:
Expand Down
4 changes: 3 additions & 1 deletion src/mock/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ namespace gcs {
class MockGcsActorManager : public GcsActorManager {
public:
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager)
GcsFunctionManager &function_manager,
instrumented_io_context &io_service)
: GcsActorManager(
nullptr,
nullptr,
nullptr,
runtime_env_manager,
function_manager,
[](const ActorID &) {},
io_service,
[](const rpc::Address &) { return nullptr; }) {}

MOCK_METHOD(void,
Expand Down
58 changes: 29 additions & 29 deletions src/mock/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,78 @@ namespace gcs {

class MockInternalKVInterface : public ray::gcs::InternalKVInterface {
public:
MockInternalKVInterface() {}
explicit MockInternalKVInterface() = default;

MOCK_METHOD(void,
Get,
(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback),
Postable<void(std::optional<std::string>)> callback),
(override));
MOCK_METHOD(void,
MultiGet,
(const std::string &ns,
const std::vector<std::string> &keys,
Postable<void(std::unordered_map<std::string, std::string>)> callback),
(override));
MOCK_METHOD(
void,
MultiGet,
(const std::string &ns,
const std::vector<std::string> &keys,
std::function<void(std::unordered_map<std::string, std::string>)> callback),
(override));
MOCK_METHOD(void,
Put,
(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
MOCK_METHOD(void,
Del,
(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback),
Postable<void(int64_t)> callback),
(override));
MOCK_METHOD(void,
Exists,
(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
MOCK_METHOD(void,
Keys,
(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback),
Postable<void(std::vector<std::string>)> callback),
(override));
};

// Fake internal KV interface that simply stores keys and values in a C++ map.
// Only supports Put and Get.
// Warning: Naively prepends the namespace to the key, so e.g.
// the (namespace, key) pairs ("a", "bc") and ("ab", "c") will collide which is a bug.

// TODO(ryw): DO NOT SUBMIT. Get and Put used to be sync, now it's async. We need to use
// promise wait.
class FakeInternalKVInterface : public ray::gcs::InternalKVInterface {
public:
FakeInternalKVInterface() {}
explicit FakeInternalKVInterface() = default;

// The C++ map.
std::unordered_map<std::string, std::string> kv_store_ = {};

void Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) override {
Postable<void(std::optional<std::string>)> callback) override {
std::string full_key = ns + key;
auto it = kv_store_.find(full_key);
if (it == kv_store_.end()) {
callback(std::nullopt);
std::move(callback).Post("FakeInternalKVInterface::Get", std::nullopt);
} else {
callback(it->second);
std::move(callback).Post("FakeInternalKVInterface::Get", it->second);
}
}

void MultiGet(const std::string &ns,
const std::vector<std::string> &keys,
std::function<void(std::unordered_map<std::string, std::string>)>
callback) override {
void MultiGet(
const std::string &ns,
const std::vector<std::string> &keys,
Postable<void(std::unordered_map<std::string, std::string>)> callback) override {
std::unordered_map<std::string, std::string> result;
for (const auto &key : keys) {
std::string full_key = ns + key;
Expand All @@ -100,20 +100,20 @@ class FakeInternalKVInterface : public ray::gcs::InternalKVInterface {
result[key] = it->second;
}
}
callback(result);
std::move(callback).Post("FakeInternalKVInterface::MultiGet", result);
}

void Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) override {
Postable<void(bool)> callback) override {
std::string full_key = ns + key;
if (kv_store_.find(full_key) != kv_store_.end() && !overwrite) {
callback(false);
std::move(callback).Post("FakeInternalKVInterface::Put", false);
} else {
kv_store_[full_key] = value;
callback(true);
std::move(callback).Post("FakeInternalKVInterface::Put", true);
}
}

Expand All @@ -122,19 +122,19 @@ class FakeInternalKVInterface : public ray::gcs::InternalKVInterface {
(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback),
Postable<void(int64_t)> callback),
(override));
MOCK_METHOD(void,
Exists,
(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
MOCK_METHOD(void,
Keys,
(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback),
Postable<void(std::vector<std::string>)> callback),
(override));
};

Expand Down
3 changes: 2 additions & 1 deletion src/mock/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ namespace gcs {

class MockGcsNodeManager : public GcsNodeManager {
public:
MockGcsNodeManager()
MockGcsNodeManager(instrumented_io_context &io_context)
: GcsNodeManager(/*gcs_publisher=*/nullptr,
/*gcs_table_storage=*/nullptr,
/*raylet_client_pool=*/nullptr,
io_context,
/*cluster_id=*/ClusterID::Nil()) {}
MOCK_METHOD(void,
HandleRegisterNode,
Expand Down
6 changes: 2 additions & 4 deletions src/mock/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ namespace ray {
namespace gcs {
static instrumented_io_context __mock_io_context_;
static ClusterResourceManager __mock_cluster_resource_manager_(__mock_io_context_);
static GcsNodeManager __mock_gcs_node_manager_(nullptr,
nullptr,
nullptr,
ClusterID::Nil());
static GcsNodeManager __mock_gcs_node_manager_(
nullptr, nullptr, nullptr, __mock_io_context_, ClusterID::Nil());

class MockGcsResourceManager : public GcsResourceManager {
public:
Expand Down
55 changes: 24 additions & 31 deletions src/mock/ray/gcs/store_client/in_memory_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,65 +22,58 @@ class MockInMemoryStoreClient : public InMemoryStoreClient {
(const std::string &table_name,
const std::string &key,
const std::string &data,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncPutWithIndex,
(const std::string &table_name,
const std::string &key,
const std::string &index_key,
const std::string &data,
const StatusCallback &callback),
bool overwrite,
Postable<void(bool)> callback),
(override));

MOCK_METHOD(Status,
AsyncGet,
(const std::string &table_name,
const std::string &key,
const OptionalItemCallback<std::string> &callback),
(override));
MOCK_METHOD(Status,
AsyncGetByIndex,
(const std::string &table_name,
const std::string &index_key,
(const MapCallback<std::string, std::string> &callback)),
ToPostable<OptionalItemCallback<std::string>> callback),
(override));

MOCK_METHOD(Status,
AsyncGetAll,
(const std::string &table_name,
(const MapCallback<std::string, std::string> &callback)),
Postable<void(absl::flat_hash_map<std::string, std::string>)> callback),
(override));

MOCK_METHOD(Status,
AsyncDelete,
AsyncMultiGet,
(const std::string &table_name,
const std::string &key,
const StatusCallback &callback),
const std::vector<std::string> &keys,
Postable<void(absl::flat_hash_map<std::string, std::string>)> callback),
(override));

MOCK_METHOD(Status,
AsyncDeleteWithIndex,
AsyncDelete,
(const std::string &table_name,
const std::string &key,
const std::string &index_key,
const StatusCallback &callback),
Postable<void(bool)> callback),
(override));

MOCK_METHOD(Status,
AsyncBatchDelete,
(const std::string &table_name,
const std::vector<std::string> &keys,
const StatusCallback &callback),
Postable<void(int64_t)> callback),
(override));

MOCK_METHOD(Status,
AsyncBatchDeleteWithIndex,
AsyncGetKeys,
(const std::string &table_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &index_keys,
const StatusCallback &callback),
const std::string &prefix,
Postable<void(std::vector<std::string>)> callback),
(override));

MOCK_METHOD(Status,
AsyncDeleteByIndex,
AsyncExists,
(const std::string &table_name,
const std::string &index_key,
const StatusCallback &callback),
const std::string &key,
Postable<void(bool)> callback),
(override));

MOCK_METHOD(int, GetNextJobID, (), (override));
};

Expand Down
9 changes: 1 addition & 8 deletions src/mock/ray/gcs/store_client/redis_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ class MockRedisStoreClient : public RedisStoreClient {
const std::string &data,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncPutWithIndex,
(const std::string &table_name,
const std::string &key,
const std::string &index_key,
const std::string &data,
const StatusCallback &callback),
(override));

MOCK_METHOD(Status,
AsyncGet,
(const std::string &table_name,
Expand Down
16 changes: 8 additions & 8 deletions src/mock/ray/gcs/store_client/store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,50 @@ class MockStoreClient : public StoreClient {
const std::string &key,
const std::string &data,
bool overwrite,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
MOCK_METHOD(Status,
AsyncGet,
(const std::string &table_name,
const std::string &key,
const OptionalItemCallback<std::string> &callback),
ToPostable<OptionalItemCallback<std::string>> callback),
(override));
MOCK_METHOD(Status,
AsyncGetAll,
(const std::string &table_name,
(const MapCallback<std::string, std::string> &callback)),
Postable<void(absl::flat_hash_map<std::string, std::string>)> callback),
(override));
MOCK_METHOD(Status,
AsyncMultiGet,
(const std::string &table_name,
const std::vector<std::string> &key,
(const MapCallback<std::string, std::string> &callback)),
Postable<void(absl::flat_hash_map<std::string, std::string>)> callback),
(override));
MOCK_METHOD(Status,
AsyncDelete,
(const std::string &table_name,
const std::string &key,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
MOCK_METHOD(Status,
AsyncBatchDelete,
(const std::string &table_name,
const std::vector<std::string> &keys,
std::function<void(int64_t)> callback),
Postable<void(int64_t)> callback),
(override));
MOCK_METHOD(int, GetNextJobID, (), (override));
MOCK_METHOD(Status,
AsyncGetKeys,
(const std::string &table_name,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback),
Postable<void(std::vector<std::string>)> callback),
(override));

MOCK_METHOD(Status,
AsyncExists,
(const std::string &table_name,
const std::string &key,
std::function<void(bool)> callback),
Postable<void(bool)> callback),
(override));
};

Expand Down
Loading
Loading