Skip to content

Commit

Permalink
refactor: avoid new thread for writeout_start
Browse files Browse the repository at this point in the history
Within the `rollback` module, `SyncController::writeout_start` does
not need to be spawned within another thread.
  • Loading branch information
gabriele-0201 authored and pepyakin committed Feb 12, 2025
1 parent 59fa2be commit d4236e7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 29 deletions.
27 changes: 5 additions & 22 deletions nomt/src/rollback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,6 @@ impl Rollback {
pub struct SyncController {
rollback: Rollback,
writeout_data: Option<WriteoutData>,
// The channel to send the result of the writeout task. Option is to allow `take`.
writeout_result_tx: Option<Sender<TaskResult<WriteoutData>>>,
// The channel to receive the result of the writeout task.
writeout_result_rx: Receiver<TaskResult<WriteoutData>>,
// The channel to send the result of the post meta task. Option is to allow `take`.
post_meta_result_tx: Option<Sender<TaskResult<std::io::Result<()>>>>,
// The channel to receive the result of the post meta task.
Expand All @@ -305,13 +301,10 @@ pub struct SyncController {

impl SyncController {
fn new(rollback: Rollback) -> Self {
let (writeout_result_tx, writeout_result_rx) = crossbeam_channel::bounded(1);
let (post_meta_result_tx, post_meta_result_rx) = crossbeam_channel::bounded(1);
Self {
rollback,
writeout_data: None,
writeout_result_tx: Some(writeout_result_tx),
writeout_result_rx,
post_meta_result_tx: Some(post_meta_result_tx),
post_meta_result_rx,
}
Expand All @@ -320,22 +313,12 @@ impl SyncController {
/// Begins the sync process.
///
/// This function doesn't block.
pub fn begin_sync(&mut self) {
let tp = self.rollback.shared.sync_tp.clone();
let rollback = self.rollback.clone();
// UNWRAP: safe because begin_sync is called only once.
let writeout_result_tx = self.writeout_result_tx.take().unwrap();
spawn_task(&tp, move || rollback.writeout_start(), writeout_result_tx);
}

/// Wait for the rollback writeout to complete. Returns the new rollback live range
/// `(start_live, end_live)`.
///
/// This should be called by the sync thread. Blocking.
pub fn wait_pre_meta(&mut self) -> (u64, u64) {
let wd_result = join_task(&self.writeout_result_rx);
let res = (wd_result.rollback_start_live, wd_result.rollback_end_live);
self.writeout_data.replace(wd_result);
/// Returns the new rollback live range `(start_live, end_live)`.
pub fn begin_sync(&mut self) -> (u64, u64) {
let wa = self.rollback.writeout_start();
let res = (wa.rollback_start_live, wa.rollback_end_live);
self.writeout_data.replace(wa);
res
}

Expand Down
11 changes: 4 additions & 7 deletions nomt/src/store/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ impl Sync {

bitbox_sync.begin_sync(sync_seqn, page_cache, updated_pages);
beatree_sync.begin_sync(value_tx);
if let Some(ref mut rollback) = rollback_sync {
rollback.begin_sync();
}

bitbox_sync.wait_pre_meta()?;
let beatree_meta_wd = beatree_sync.wait_pre_meta()?;
let (rollback_start_live, rollback_end_live) = match rollback_sync {
Some(ref mut rollback) => rollback.wait_pre_meta(),
Some(ref mut rollback) => rollback.begin_sync(),
None => (0, 0),
};

bitbox_sync.wait_pre_meta()?;
let beatree_meta_wd = beatree_sync.wait_pre_meta()?;

if let Some(PanicOnSyncMode::PostWal) = self.panic_on_sync {
panic!("panic_on_sync is true (post-wal)")
}
Expand Down

0 comments on commit d4236e7

Please sign in to comment.