-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Introduces Postable for InternalKVInterface. #48584
base: master
Are you sure you want to change the base?
[core] Introduces Postable for InternalKVInterface. #48584
Conversation
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the plan on testing / figuring out race conditions that could've been missed?
@@ -139,7 +140,11 @@ class IOContextProvider { | |||
instrumented_io_context &GetIOContext() const { | |||
constexpr int index = Policy::template GetDedicatedIOContextIndex<T>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return size_t to avoid cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to allow -1 to indicate "use default thread". do you think std::optional<size_t>
is the way to go?
src/ray/common/asio/dispatchable.h
Outdated
if (func_) { | ||
RAY_CHECK(io_context_ != nullptr); | ||
io_context_->dispatch( | ||
[func = func_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if func_ has a capture, you'd be copying everything in the capture here
Moving the func also means you can't dispatch twice. Should set func_ to nullptr after dispatch to assure it isn't called again.
Also should it be a RAY_CHECK(_func) here instead as well, since if dispatch gets called without a valid function we won't get any fatal logs or anything, it'll just fail silently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe to say that a Dispatchable can only be dispatched once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we could document it clearly whether repeated invocation is expected / encouraged / acceptable.
src/ray/common/asio/dispatchable.h
Outdated
if (func_) { | ||
RAY_CHECK(io_context_ != nullptr); | ||
io_context_->dispatch( | ||
[func = func_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe to say that a Dispatchable can only be dispatched once?
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Hi @jjyao @dentiny @dayshah, I updated this PR with the new Postable design. Sketch (how to review):
It looks like a lot of code but many of changes in the Managers are extra indentation from a wrapping block: |
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insane type gymnastics, generally lgtm
Signed-off-by: Ruiyang Wang <[email protected]>
…InternalKVInterface Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
InternalKVInterface is used by various GCS managers to talk to the internal KV Redis, and we did not pay too much attention in the thread safety issues. It was fine because everyone was on a same main thread & io context. But now we want to harden GCS by making managers thread safe.
To achieve this, introduce a new type
Postable<R(Args...)>
. It's just a std::function + an ioctx.Methods:
Post
: post the function to the held ioctx.OnInvocation
: adds a callback, invoked onPost
. does not take any args and returns void.TransformArg
: apply a mapper to the input argument, into a new Postable.Rebind
: apply a function converter to allow more generic transformations, with the same ioctx.The new stack:
std::function<R(T)>
Postable<R(T)>
which is a function + an ioctxstd::function<R(T)>
Postable<R(T)>
which is a function + an ioctxSo a timeline of a call to internal kv may look like this:
request_context->Run();
into internal kv ioctxPostable
.If it's not redis but InMemoryStoreClient, it's like this
std::function<R(T)>
Dispatchable<R(T)>
which is a function + an ioctx