Skip to content

Commit

Permalink
feat: implement command tracker
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 18, 2023
1 parent 0aa1195 commit f63df26
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 3 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ itertools = "0.10.3"
madsim = { version = "0.2.22", features = ["rpc", "macros"] }
opentelemetry = "0.18.0"
parking_lot = "0.12.1"
bitvec = "1.0.1"
priority-queue = "1.3.2"
prost = "0.11"
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion curp/proto/common
Submodule common updated 0 files
3 changes: 3 additions & 0 deletions curp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,6 @@ mod rpc;

/// Snapshot
mod snapshot;

/// Command tracker shared between client and server
mod tracker;
26 changes: 24 additions & 2 deletions curp/src/server/cmd_board.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::{collections::HashMap, sync::Arc};
#![allow(unused)] // TODO remove

use bitvec::vec::BitVec;
use std::{collections::HashMap, collections::HashSet, sync::Arc};

use event_listener::{Event, EventListener};
use indexmap::{IndexMap, IndexSet};
use parking_lot::RwLock;
use utils::parking_lot_lock::RwLockMap;

use crate::cmd::{Command, ProposeId};
use crate::tracker::Tracker;

/// Ref to the cmd board
pub(super) type CmdBoardRef<C> = Arc<RwLock<CommandBoard<C>>>;
Expand All @@ -21,6 +25,8 @@ pub(super) struct CommandBoard<C: Command> {
shutdown_notifier: Event,
/// Store all notifiers for conf change results
conf_notifier: HashMap<ProposeId, Event>,
/// The result trackers track all cmd, this is used for dedup
trackers: HashMap<u64, Tracker>,
/// Store all conf change propose ids
pub(super) conf_buffer: IndexSet<ProposeId>,
/// The cmd has been received before, this is used for dedup
Expand All @@ -38,6 +44,7 @@ impl<C: Command> CommandBoard<C> {
er_notifiers: HashMap::new(),
asr_notifiers: HashMap::new(),
shutdown_notifier: Event::new(),
trackers: HashMap::new(),
sync: IndexSet::new(),
er_buffer: IndexMap::new(),
asr_buffer: IndexMap::new(),
Expand All @@ -46,6 +53,20 @@ impl<C: Command> CommandBoard<C> {
}
}

/// filter duplication, return true if duplicated
pub(super) fn filter_dup(&mut self, client_id: u64, seq_num: u64) -> bool {
let tracker = self
.trackers
.entry(client_id)
.or_insert_with(Tracker::default);
tracker.record(seq_num)
}

/// Remove client result tracker from trackers if it is expired
pub(super) fn client_expired(&mut self, client_id: u64) {
let _ig = self.trackers.remove(&client_id);
}

/// Release notifiers
pub(super) fn release_notifiers(&mut self) {
self.er_notifiers
Expand All @@ -56,10 +77,11 @@ impl<C: Command> CommandBoard<C> {
.for_each(|(_, event)| event.notify(usize::MAX));
}

/// Clear
/// Clear, called when leader retires
pub(super) fn clear(&mut self) {
self.er_buffer.clear();
self.asr_buffer.clear();
self.trackers.clear();
self.release_notifiers();
}

Expand Down
124 changes: 124 additions & 0 deletions curp/src/tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#![allow(unused)] // TODO remove

use bitvec::vec::BitVec;
use std::ops::{AddAssign, Sub};

/// Track sequence number for commands
#[derive(Debug, Default)]
pub(super) struct Tracker {
/// First incomplete seq num, it will be advanced by client
first_incomplete: u64,
/// inflight seq nums proposed by the client, each bit
/// represent the received status starting from `first_incomplete`
inflight: BitVec,
}

impl Tracker {
/// Record a sequence number, return whether it is duplicated
#[allow(clippy::as_conversions)]
#[allow(clippy::cast_possible_truncation)] // TODO: support 32 bits computers?
#[allow(clippy::indexing_slicing)] // it is checked
pub(crate) fn record(&mut self, seq_num: u64) -> bool {
if seq_num < self.first_incomplete {
return true;
}
let gap = seq_num.sub(self.first_incomplete) as usize;
if gap == 0 {
// received the next sequence number, advanced the first_incomplete
// and pop the front of inflight
self.first_incomplete.add_assign(1);
if !self.inflight.is_empty() {
let _ig = self.inflight.remove(0);
}
} else if gap < self.inflight.len() {
// received the sequence number that is recorded in inflight
// check its status to determine whether it is duplicated
if self.inflight[gap] {
return true;
}
// mark it is received
self.inflight.set(gap, true);
} else {
// received the sequence number that exceed inflight, extend
// the inflight and record the inflight[gap] as received
// TODO: set an upper bound of gaps?
self.inflight
.extend(std::iter::repeat(false).take(gap.sub(self.inflight.len())));
self.inflight.push(true);
}
loop {
if self.inflight.is_empty() || !self.inflight[0] {
break;
}
let _ig = self.inflight.remove(0);
self.first_incomplete.add_assign(1);
}
false
}

/// Reset the tracker
pub(crate) fn reset(&mut self) {
self.first_incomplete = 0;
self.inflight.clear();
}

/// Get the first incomplete sequence number
pub(crate) fn first_incomplete(&self) -> u64 {
self.first_incomplete
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_check_not_duplicate_ordered() {
let mut tracker = Tracker::default();
for i in 0..1024 {
assert!(!tracker.record(i));
assert_eq!(tracker.first_incomplete, i + 1);
assert!(tracker.inflight.len() <= 1);
}
}

#[test]
fn test_check_duplicate_ordered() {
let mut tracker = Tracker::default();
for i in 0..512 {
assert!(!tracker.record(i));
assert_eq!(tracker.first_incomplete, i + 1);
assert!(tracker.inflight.len() <= 1);
}
for i in 0..512 {
assert!(tracker.record(i));
}
}

#[test]
fn test_check_duplicate_gap() {
let mut tracker = Tracker::default();
assert!(!tracker.record(0));
assert!(!tracker.record(1));
assert!(!tracker.record(1000));
assert!(!tracker.record(1001));

assert!(tracker.record(0));
assert!(tracker.record(1));
assert!(tracker.record(1000));
assert!(tracker.record(1001));
assert_eq!(tracker.first_incomplete, 2);
}

#[test]
fn test_check_duplicate_clear_inflight() {
let mut tracker = Tracker::default();
for i in (1..256).step_by(2) {
assert!(!tracker.record(i));
}
for i in (0..256).step_by(2) {
assert!(!tracker.record(i));
}
assert_eq!(tracker.inflight.len(), 0);
}
}

0 comments on commit f63df26

Please sign in to comment.