-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: write memtable in parallel #5456
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
if mutations.len() == 1 { | ||
if let Err(err) = mutable.write(&mutations[0].1) { | ||
self.notifiers[mutations[0].0].err = Some(Arc::new(err)); | ||
} | ||
} else { | ||
let write_tasks = mutations.into_iter().map(|(i, kvs)| { | ||
let mutable = mutable.clone(); | ||
// use tokio runtime to schedule tasks. | ||
common_runtime::spawn_blocking_global(move || (i, mutable.write(&kvs))) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use something like rayon instead of spawn futures that inherently not async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rayon can't reuse existing tokio managed threads, and I'm hesitant to create another group of runtimes... all those tasks go into spawn_blocking which isn't async, I think that's fine.
BTW I do find a combination of rayon + tokio https://github.com/andybarron/tokio-rayon, but the last update time is 4 years ago 🤣
Signed-off-by: Ruihang Xia <[email protected]>
// use tokio runtime to schedule tasks. | ||
common_runtime::spawn_blocking_global(move || (i, mutable.write(&kvs))) | ||
}); | ||
for result in futures::future::join_all(write_tasks).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
join_all is ordered, maybe using FuturesUnordered
makes more sense since already record the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from its document the ordered is more performant, let me have a try https://docs.rs/futures/latest/futures/future/fn.join_all.html#see-also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from its document the ordered is more performant, let me have a try https://docs.rs/futures/latest/futures/future/fn.join_all.html#see-also
by performant it mean compare to itself, JoinAll
is like this:
pub struct JoinAll<F>
where
F: Future,
{
kind: JoinAllKind<F>,
}
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
pub(crate) const SMALL: usize = 30;
enum JoinAllKind<F>
where
F: Future,
{
Small {
elems: Pin<Box<[MaybeDone<F>]>>,
},
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
Big {
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
},
}
So it's ordered even <30 futures, but do we need ordered here or is unordered good enough
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
Write memtable in parallel. No more 1 core sucks 127 cores rest.
Previous ~550k, After:
And the CPU usages are all below 50%. The next step is pipelining the write process to alleviate WAL's influence.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.