Skip to content

Commit

Permalink
fix: add ucp recovery after new leader is elected
Browse files Browse the repository at this point in the history
Closes: xline-kv#438

Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and mergify[bot] committed Sep 22, 2023
1 parent 6d05901 commit 32694b0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
18 changes: 17 additions & 1 deletion curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::cmd_worker::CEEventTxApi;
use crate::{
cmd::{Command, ProposeId},
error::{ApplyConfChangeError, ProposeError},
log_entry::LogEntry,
log_entry::{EntryData, LogEntry},
members::{ClusterInfo, Member, ServerId},
role_change::RoleChange,
rpc::{ConfChange, ConfChangeType, IdSet, ReadState},
Expand Down Expand Up @@ -548,6 +548,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {

let prev_last_log_index = log_w.last_log_index();
self.recover_from_spec_pools(&mut st_w, &mut log_w, spec_pools);
self.recover_ucp_from_log(&mut log_w);
let last_log_index = log_w.last_log_index();

self.become_leader(&mut st_w);
Expand Down Expand Up @@ -1009,6 +1010,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
let spec_pools = cst.sps.drain().collect();
let mut log_w = RwLockUpgradableReadGuard::upgrade(log);
self.recover_from_spec_pools(st, &mut log_w, spec_pools);
self.recover_ucp_from_log(&mut log_w);
self.become_leader(st);
None
} else {
Expand Down Expand Up @@ -1140,6 +1142,20 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}
}

/// Recover the ucp from uncommitted log entries
fn recover_ucp_from_log(&self, log: &mut Log<C>) {
let mut ucp_l = self.ctx.ucp.lock();

for i in log.commit_index + 1..=log.last_log_index() {
let entry = log.get(i).unwrap_or_else(|| {
unreachable!("system corrupted, get a `None` value on log[{i}]")
});
if let EntryData::Command(ref cmd) = entry.entry_data {
let _ignore = ucp_l.insert(cmd.id().clone(), Arc::clone(cmd));
}
}
}

/// Apply new logs
fn apply(&self, log: &mut Log<C>) {
for i in (log.last_as + 1)..=log.commit_index {
Expand Down
31 changes: 31 additions & 0 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,37 @@ fn recover_from_spec_pools_will_pick_the_correct_cmds() {
});
}

#[traced_test]
#[test]
fn recover_ucp_from_logs_will_pick_the_correct_cmds() {
let curp = {
let mut exe_tx = MockCEEventTxApi::<TestCommand>::default();
exe_tx
.expect_send_reset()
.returning(|_| oneshot::channel().1);
Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change()))
};
curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1);

let cmd0 = Arc::new(TestCommand::new_put(vec![1], 1));
let cmd1 = Arc::new(TestCommand::new_put(vec![2], 1));
let cmd2 = Arc::new(TestCommand::new_put(vec![3], 1));
curp.push_cmd(Arc::clone(&cmd0));
curp.push_cmd(Arc::clone(&cmd1));
curp.push_cmd(Arc::clone(&cmd2));
curp.log.map_write(|mut log_w| log_w.commit_index = 1);

curp.recover_ucp_from_log(&mut *curp.log.write());

curp.ctx.ucp.map_lock(|ucp| {
let mut ids: Vec<_> = ucp.values().map(|c| c.id()).collect();
assert_eq!(ids.len(), 2);
ids.sort();
assert_eq!(ids[0], cmd1.id());
assert_eq!(ids[1], cmd2.id());
});
}

/*************** tests for leader retires **************/

/// To ensure #331 is fixed
Expand Down

0 comments on commit 32694b0

Please sign in to comment.