-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: write memtable in parallel #5456
base: main
Are you sure you want to change the base?
Changes from 5 commits
a3c0164
1825fee
1198e3f
588d24c
e35fb76
bd251b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,23 +197,38 @@ impl RegionWriteCtx { | |
} | ||
|
||
/// Consumes mutations and writes them into mutable memtable. | ||
pub(crate) fn write_memtable(&mut self) { | ||
pub(crate) async fn write_memtable(&mut self) { | ||
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); | ||
|
||
if self.failed { | ||
return; | ||
} | ||
|
||
let mutable = &self.version.memtables.mutable; | ||
// Takes mutations from the wal entry. | ||
let mutations = mem::take(&mut self.wal_entry.mutations); | ||
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { | ||
// Write mutation to the memtable. | ||
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { | ||
continue; | ||
}; | ||
if let Err(e) = mutable.write(&kvs) { | ||
notify.err = Some(Arc::new(e)); | ||
let mutable = self.version.memtables.mutable.clone(); | ||
let mutations = mem::take(&mut self.wal_entry.mutations) | ||
.into_iter() | ||
.enumerate() | ||
.filter_map(|(i, mutation)| { | ||
let kvs = KeyValues::new(&self.version.metadata, mutation)?; | ||
Some((i, kvs)) | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
if mutations.len() == 1 { | ||
if let Err(err) = mutable.write(&mutations[0].1) { | ||
self.notifiers[mutations[0].0].err = Some(Arc::new(err)); | ||
} | ||
} else { | ||
let write_tasks = mutations.into_iter().map(|(i, kvs)| { | ||
let mutable = mutable.clone(); | ||
// use tokio runtime to schedule tasks. | ||
common_runtime::spawn_blocking_global(move || (i, mutable.write(&kvs))) | ||
}); | ||
for result in futures::future::join_all(write_tasks).await { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. join_all is ordered, maybe using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from its document the ordered is more performant, let me have a try https://docs.rs/futures/latest/futures/future/fn.join_all.html#see-also There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
by performant it mean compare to itself, pub struct JoinAll<F>
where
F: Future,
{
kind: JoinAllKind<F>,
}
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
pub(crate) const SMALL: usize = 30;
enum JoinAllKind<F>
where
F: Future,
{
Small {
elems: Pin<Box<[MaybeDone<F>]>>,
},
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
Big {
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
},
} So it's ordered even <30 futures, but do we need ordered here or is unordered good enough |
||
let (i, result) = result.unwrap(); | ||
waynexia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if let Err(err) = result { | ||
self.notifiers[i].err = Some(Arc::new(err)); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use something like rayon instead of spawn futures that inherently not async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rayon can't reuse existing tokio managed threads, and I'm hesitant to create another group of runtimes... all those tasks go into spawn_blocking which isn't async, I think that's fine.
BTW I do find a combination of rayon + tokio https://github.com/andybarron/tokio-rayon, but the last update time is 4 years ago 🤣