Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduces Postable for InternalKVInterface. #48584

Draft
wants to merge 31 commits into
base: master
Choose a base branch
from

Conversation

rynewang
Copy link
Contributor

@rynewang rynewang commented Nov 5, 2024

InternalKVInterface is used by various GCS managers to talk to the internal KV Redis, and we did not pay too much attention in the thread safety issues. It was fine because everyone was on a same main thread & io context. But now we want to harden GCS by making managers thread safe.

To achieve this, introduce a new type Postable<R(Args...)>. It's just a std::function + an ioctx.

Methods:

  • Post: post the function to the held ioctx.
  • OnInvocation: adds a callback, invoked on Post. does not take any args and returns void.
  • TransformArg: apply a mapper to the input argument, into a new Postable.
  • Rebind: apply a function converter to allow more generic transformations, with the same ioctx.

The new stack:

  • Managers: in their own io_ctx
  • InternalKVInterface (implemented by StoreClientInternalKV)
    • used to take a std::function<R(T)>
    • (New!) take a Postable<R(T)> which is a function + an ioctx
  • StoreClient (RedisStoreClient, InMemoryStoreClient, ObservableStoreClient)
    • used to take a std::function<R(T)>
    • (New!) take a Postable<R(T)> which is a function + an ioctx
  • In the end of the call stack, invoke Postable by Post().

So a timeline of a call to internal kv may look like this:

  1. From a Gcs Manager, calls into RedisContext
  2. RedisContext dispatches a request_context->Run(); into internal kv ioctx
  3. hiredis works in internal kv ioctx
  4. hiredis calls back, dispatches to the Gcs Manager's ioctx, provided in Postable.

If it's not redis but InMemoryStoreClient, it's like this

  • Managers: in their own io_ctx
  • InternalKVInterface (implemented by StoreClientInternalKV)
    • used to take a std::function<R(T)>
    • (New!) take a Dispatchable<R(T)> which is a function + an ioctx
    • calls the underlying StoreClient with dispatched callbacks
  • StoreClient (InMemoryStoreClient): use mutex, then posts to the callback caller's desired ioctx.

rynewang and others added 21 commits September 18, 2024 14:08
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@rynewang rynewang added the go add ONLY when ready to merge, run all tests label Nov 5, 2024
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the plan on testing / figuring out race conditions that could've been missed?

@@ -139,7 +140,11 @@ class IOContextProvider {
instrumented_io_context &GetIOContext() const {
constexpr int index = Policy::template GetDedicatedIOContextIndex<T>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return size_t to avoid cast

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to allow -1 to indicate "use default thread". do you think std::optional<size_t> is the way to go?

if (func_) {
RAY_CHECK(io_context_ != nullptr);
io_context_->dispatch(
[func = func_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if func_ has a capture, you'd be copying everything in the capture here

Moving the func also means you can't dispatch twice. Should set func_ to nullptr after dispatch to assure it isn't called again.

Also should it be a RAY_CHECK(_func) here instead as well, since if dispatch gets called without a valid function we won't get any fatal logs or anything, it'll just fail silently

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to say that a Dispatchable can only be dispatched once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could document it clearly whether repeated invocation is expected / encouraged / acceptable.

src/ray/common/asio/dispatchable.h Outdated Show resolved Hide resolved
src/ray/common/asio/dispatchable.h Outdated Show resolved Hide resolved
src/ray/common/asio/dispatchable.h Outdated Show resolved Hide resolved
src/ray/common/asio/dispatchable.h Outdated Show resolved Hide resolved
if (func_) {
RAY_CHECK(io_context_ != nullptr);
io_context_->dispatch(
[func = func_,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to say that a Dispatchable can only be dispatched once?

src/ray/common/asio/dispatchable.h Outdated Show resolved Hide resolved
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@rynewang rynewang changed the title [core] Introduces Dispatchable for InternalKVInterface. [core] Introduces Postable for InternalKVInterface. Nov 26, 2024
@rynewang
Copy link
Contributor Author

Hi @jjyao @dentiny @dayshah, I updated this PR with the new Postable design. Sketch (how to review):

  1. a new function_traits.h to handle std::function, lambdas and all other callables
  2. postable.h defines a Postable. It can Rebind or TransformArg, or OnInvocation those are transformers to the func_ within the same ioctx.
  3. All StoreClient and InternalKVInterface APIs are changed (std::function -> Postable). The middle layers transform the Postables, while the bottom layer (Redis, InMemory) invokes Post().
  4. All callers of InternalKVInterface, that is all GCS*Managers, now pass in io_context_. Many of them used to not have one - they receive one from ctor
  5. GcsServer updated to provide ioctx to those Managers' ctors.

It looks like a lot of code but many of changes in the Managers are extra indentation from a wrapping block: $lambda -> {$lambda, io_context_}.

Signed-off-by: Ruiyang Wang <[email protected]>
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insane type gymnastics, generally lgtm

src/ray/common/asio/postable.h Outdated Show resolved Hide resolved
src/ray/common/asio/postable.h Outdated Show resolved Hide resolved
src/ray/common/asio/postable.h Outdated Show resolved Hide resolved
src/ray/common/asio/postable.h Show resolved Hide resolved
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants