Skip to content

Commit

Permalink
fix: remove the reapplication of log entries when leader retires
Browse files Browse the repository at this point in the history
Closes: xline-kv#331

Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and Phoenix500526 committed Jul 19, 2023
1 parent 3960b1b commit e03c4f0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
16 changes: 2 additions & 14 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,22 +1014,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
fn leader_retires(&self) {
debug!("leader {} retires", self.id());

// when a leader retires, it should wipe up speculatively executed cmds by resetting and re-executing
let _ig = self.ctx.cmd_tx.send_reset(None);

let mut cb_w = self.ctx.cb.write();
cb_w.clear();

let log_r = self.log.read();
for i in 1..=log_r.commit_index {
let entry = log_r.get(i).unwrap_or_else(|| {
unreachable!(
"system corrupted, apply log[{i}] when we only have {} log entries",
log_r.last_log_index()
)
});
self.ctx.cmd_tx.send_after_sync(Arc::clone(&entry.cmd), i);
}
let mut ucp_l = self.ctx.ucp.lock();
ucp_l.clear();
}
}

Expand Down
57 changes: 55 additions & 2 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::{sync::oneshot, time::sleep};
use tracing_test::traced_test;
use utils::config::{
default_candidate_timeout_ticks, default_follower_timeout_ticks, default_heartbeat_interval,
CurpConfigBuilder,
};

use super::*;
Expand Down Expand Up @@ -36,20 +37,26 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
let cmd_board = Arc::new(RwLock::new(CommandBoard::new()));
let spec_pool = Arc::new(Mutex::new(SpeculativePool::new()));
let uncommitted_pool = Arc::new(Mutex::new(UncommittedPool::new()));
let (log_tx, _log_rx) = mpsc::unbounded_channel();
let (log_tx, log_rx) = mpsc::unbounded_channel();
// prevent the channel from being closed
std::mem::forget(log_rx);
let sync_events = cluster_info
.peers_id()
.iter()
.map(|id| (id.clone(), Arc::new(Event::new())))
.collect();
let curp_config = CurpConfigBuilder::default()
.log_entries_cap(10)
.build()
.unwrap();

Self::new(
cluster_info,
true,
cmd_board,
spec_pool,
uncommitted_pool,
Arc::new(CurpConfig::default()),
Arc::new(curp_config),
Arc::new(exe_tx),
sync_events,
log_tx,
Expand Down Expand Up @@ -521,6 +528,52 @@ fn recover_from_spec_pools_will_pick_the_correct_cmds() {
});
}

/*************** tests for leader retires **************/

/// To ensure #331 is fixed
#[traced_test]
#[test]
fn leader_retires_after_log_compact_will_succeed() {
let curp = RawCurp::new_test(
3,
MockCEEventTxApi::<TestCommand>::default(),
mock_role_change(),
);
let mut log_w = curp.log.write();
for _ in 1..=20 {
let cmd = Arc::new(TestCommand::default());
log_w.push_cmd(0, cmd).unwrap();
}
log_w.last_as = 20;
log_w.last_exe = 20;
log_w.commit_index = 20;
log_w.compact();
drop(log_w);

curp.leader_retires();
}

#[traced_test]
#[test]
fn leader_retires_should_cleanup() {
let curp = {
let mut exe_tx = MockCEEventTxApi::<TestCommand>::default();
exe_tx.expect_send_sp_exe().returning(|_, _| {});
RawCurp::new_test(3, exe_tx, mock_role_change())
};

let _ignore = curp.handle_propose(Arc::new(TestCommand::new_put(vec![1], 0)));
let _ignore = curp.handle_propose(Arc::new(TestCommand::new_get(vec![1])));

curp.leader_retires();

let cb_r = curp.ctx.cb.read();
assert!(cb_r.er_buffer.is_empty(), "er buffer should be empty");
assert!(cb_r.asr_buffer.is_empty(), "asr buffer should be empty");
let ucp_l = curp.ctx.ucp.lock();
assert!(ucp_l.is_empty(), "ucp should be empty");
}

/*************** tests for other small functions **************/

#[traced_test]
Expand Down

0 comments on commit e03c4f0

Please sign in to comment.