From cdde1143fa10e27da6eb6b22721acb46b0d41367 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Tue, 4 Jun 2024 18:12:26 +0800 Subject: [PATCH] refactor: inline size into LogEntry Signed-off-by: Phoeniix Zhao --- crates/curp/src/log_entry.rs | 11 +- crates/curp/src/server/cmd_worker/mod.rs | 183 ++++++++++-------- crates/curp/src/server/raw_curp/log.rs | 170 ++++++++++------ crates/curp/src/server/raw_curp/tests.rs | 6 +- crates/curp/src/server/storage/db.rs | 6 +- crates/curp/src/server/storage/wal/codec.rs | 6 +- crates/curp/src/server/storage/wal/segment.rs | 18 +- 7 files changed, 240 insertions(+), 160 deletions(-) diff --git a/crates/curp/src/log_entry.rs b/crates/curp/src/log_entry.rs index 0cb890332..eedd23a97 100644 --- a/crates/curp/src/log_entry.rs +++ b/crates/curp/src/log_entry.rs @@ -23,6 +23,8 @@ pub struct LogEntry { pub(crate) propose_id: ProposeId, /// Entry data pub(crate) entry_data: EntryData, + /// entry size + pub(crate) size: u64, } /// Entry data of a `LogEntry` @@ -78,13 +80,16 @@ where term: u64, propose_id: ProposeId, entry_data: impl Into>, - ) -> Self { - Self { + ) -> Result { + let mut entry = Self { term, index, propose_id, entry_data: entry_data.into(), - } + size: 0, + }; + entry.size = bincode::serialized_size(&entry)?; + Ok(entry) } /// Get the inflight id of this log entry diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index f97be228c..269792e11 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -455,12 +455,9 @@ mod tests { done_tx, ); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default()), - )); + let entry = Arc::new( + LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap(), + ); ce_event_tx.send_sp_exe(Arc::clone(&entry)); assert_eq!(er_rx.recv().await.unwrap().1.values, Vec::::new()); @@ -499,12 +496,15 @@ mod tests { ); let begin = Instant::now(); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default().set_exe_dur(Duration::from_secs(1))), - )); + let entry = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 0), + Arc::new(TestCommand::default().set_exe_dur(Duration::from_secs(1))), + ) + .unwrap(), + ); ce_event_tx.send_sp_exe(Arc::clone(&entry)); @@ -546,16 +546,19 @@ mod tests { done_tx, ); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new( - TestCommand::default() - .set_exe_dur(Duration::from_secs(1)) - .set_exe_should_fail(), - ), - )); + let entry = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 0), + Arc::new( + TestCommand::default() + .set_exe_dur(Duration::from_secs(1)) + .set_exe_should_fail(), + ), + ) + .unwrap(), + ); ce_event_tx.send_sp_exe(Arc::clone(&entry)); @@ -598,12 +601,9 @@ mod tests { done_tx, ); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default()), - )); + let entry = Arc::new( + LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap(), + ); ce_event_tx.send_after_sync(entry); @@ -640,12 +640,15 @@ mod tests { done_tx, ); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default().set_exe_should_fail()), - )); + let entry = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 0), + Arc::new(TestCommand::default().set_exe_should_fail()), + ) + .unwrap(), + ); ce_event_tx.send_after_sync(entry); @@ -685,18 +688,24 @@ mod tests { done_tx, ); - let entry1 = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::new_put(vec![1], 1)), - )); - let entry2 = Arc::new(LogEntry::new( - 2, - 1, - ProposeId(0, 1), - Arc::new(TestCommand::new_get(vec![1])), - )); + let entry1 = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 0), + Arc::new(TestCommand::new_put(vec![1], 1)), + ) + .unwrap(), + ); + let entry2 = Arc::new( + LogEntry::new( + 2, + 1, + ProposeId(0, 1), + Arc::new(TestCommand::new_get(vec![1])), + ) + .unwrap(), + ); ce_event_tx.send_sp_exe(Arc::clone(&entry1)); ce_event_tx.send_sp_exe(Arc::clone(&entry2)); @@ -747,18 +756,24 @@ mod tests { done_tx, ); - let entry1 = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::new_put(vec![1], 1).set_as_dur(Duration::from_millis(50))), - )); - let entry2 = Arc::new(LogEntry::new( - 2, - 1, - ProposeId(0, 1), - Arc::new(TestCommand::new_get(vec![1])), - )); + let entry1 = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 0), + Arc::new(TestCommand::new_put(vec![1], 1).set_as_dur(Duration::from_millis(50))), + ) + .unwrap(), + ); + let entry2 = Arc::new( + LogEntry::new( + 2, + 1, + ProposeId(0, 1), + Arc::new(TestCommand::new_get(vec![1])), + ) + .unwrap(), + ); ce_event_tx.send_sp_exe(Arc::clone(&entry1)); ce_event_tx.send_sp_exe(Arc::clone(&entry2)); @@ -766,12 +781,15 @@ mod tests { ce_event_tx.send_reset(None); - let entry3 = Arc::new(LogEntry::new( - 3, - 1, - ProposeId(0, 2), - Arc::new(TestCommand::new_get(vec![1])), - )); + let entry3 = Arc::new( + LogEntry::new( + 3, + 1, + ProposeId(0, 2), + Arc::new(TestCommand::new_get(vec![1])), + ) + .unwrap(), + ); ce_event_tx.send_after_sync(entry3); @@ -813,23 +831,21 @@ mod tests { s2_id, 0, 0, - vec![LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default()), - )], + vec![LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap()], 0, ) .unwrap(); start_cmd_workers(Arc::clone(&ce1), Arc::new(curp), task_rx, done_tx); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 1), - Arc::new(TestCommand::new_put(vec![1], 1).set_exe_dur(Duration::from_millis(50))), - )); + let entry = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 1), + Arc::new(TestCommand::new_put(vec![1], 1).set_exe_dur(Duration::from_millis(50))), + ) + .unwrap(), + ); ce_event_tx.send_after_sync(entry); @@ -866,12 +882,15 @@ mod tests { ce_event_tx.send_reset(Some(snapshot)).await.unwrap(); - let entry = Arc::new(LogEntry::new( - 1, - 1, - ProposeId(0, 2), - Arc::new(TestCommand::new_get(vec![1])), - )); + let entry = Arc::new( + LogEntry::new( + 1, + 1, + ProposeId(0, 2), + Arc::new(TestCommand::new_get(vec![1])), + ) + .unwrap(), + ); ce_event_tx.send_after_sync(entry); assert_eq!(er_rx.recv().await.unwrap().1.revisions, vec![1]); task_manager1.shutdown(true).await; diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 62a133729..060c6d6c7 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -1,7 +1,7 @@ #![allow(clippy::arithmetic_side_effects)] // u64 is large enough and won't overflow use std::{ - cmp::min, + cmp::{min, Ordering}, collections::{HashMap, HashSet, VecDeque}, fmt::Debug, ops::{Bound, Range, RangeBounds, RangeInclusive}, @@ -9,7 +9,6 @@ use std::{ vec, }; -use bincode::serialized_size; use clippy_utilities::NumericCast; use itertools::Itertools; use tokio::sync::mpsc; @@ -66,8 +65,8 @@ impl From> for LogRange { /// For the leader, there should never be a gap between snapshot and entries /// /// Examples: -/// This example will describe the relationship among `entry_size`, `batch_end`, `batch_limit`, `first_idx_in_cur_batch` and `cur_batch_size` -/// if `batch_limit` = 11 and `entry_size` = vec![1,5,6,2,3,4,2], then the relationship between `entry_size` and `batch_end` looks like: +/// This example will describe the relationship among `batch_end`, `batch_limit`, `first_idx_in_cur_batch` and `cur_batch_size` +/// if `batch_limit` = 11 and entry sizes is vec![1,5,6,2,3,4,2], then the relationship between entry size and `batch_end` looks like: /// ------------------------------------------- /// `entry_size[i]`| 1 | 5 | 6 | 2 | 3 | 4 | 2 | /// ---------------+---------------------------+ @@ -86,8 +85,6 @@ pub(super) struct Log { /// A VecDeque to store log entries, it will be serialized and persisted /// Note that the logical index in `LogEntry` is different from physical index entries: VecDeque>>, - /// entry size of each item in entries - entry_size: VecDeque, /// Each element `batch_end[i]` represents the right inclusive bound of a log batch whose size is less than or equal to `batch_limit` /// if you want to fetch a batch which begins at the index `i`, you can fetch it directly by `i..=batch_end[i]` batch_end: VecDeque, @@ -150,27 +147,61 @@ impl Log { /// `batch_end` will keep len elem fn truncate(&mut self, len: usize) { self.entries.truncate(len); - self.entry_size.truncate(len); self.batch_end.truncate(len); self.first_idx_in_cur_batch = min(self.first_idx_in_cur_batch, len); - + let last_index = len - 1; #[allow(clippy::indexing_slicing)] - while self.li_to_pi(self.batch_end[self.first_idx_in_cur_batch - 1]) >= len { - self.batch_end[self.first_idx_in_cur_batch - 1] = 0; - self.first_idx_in_cur_batch -= 1; + // it's safe since we check `self.first_idx_in_cur_batch` at the very first beginning + loop { + if self.first_idx_in_cur_batch == 0 { + break; + } + let end = self.li_to_pi(self.batch_end[self.first_idx_in_cur_batch - 1]); + match end.cmp(&last_index) { + // All the `batch_end[i]` lager than `len - 1` should be reset to zero + Ordering::Greater => { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } + Ordering::Equal => { + // when the `end == last_index`, it means that we should compare the sum of `get_range_by_batch(self.first_idx_in_cur_batch - 1)` with `batch_limit` + // Less: indicates that it should be a part of the current batch so we should update the relevant element in `batch_end` + // Equal: indicates that it shouldn't be a part of the current batch. We terminate this loop when it happens. + // Greater: never gonna be happened + let real_batch_size: u64 = self + .entries + .range(self.get_range_by_batch(self.first_idx_in_cur_batch - 1)) + .map(|e| e.size) + .sum(); + if real_batch_size < self.batch_limit { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } else { + break; + } + } + Ordering::Less => { + break; + } + } + } + + // recalculate the `cur_batch_size` + self.cur_batch_size = 0; + for entry in self.entries.iter().skip(self.first_idx_in_cur_batch) { + self.cur_batch_size += entry.size; } } /// push a log entry into the back of queue - fn push_back(&mut self, entry: Arc>) -> Result<(), bincode::Error> { - let entry_size = serialized_size(&entry)?; + fn push_back(&mut self, entry: Arc>) { + let entry_size = entry.size; if entry_size > self.batch_limit { warn!("entry_size of an entry > batch_limit, which may be too small.",); } self.entries.push_back(entry); - self.entry_size.push_back(entry_size); self.batch_end.push_back(0); // placeholder self.cur_batch_size += entry_size; @@ -190,35 +221,24 @@ impl Log { } else { self.entries[self.entries.len() - 2].index }; - self.cur_batch_size -= self.entry_size[self.first_idx_in_cur_batch]; + self.cur_batch_size -= self.entries[self.first_idx_in_cur_batch].size; self.first_idx_in_cur_batch += 1; } - Ok(()) } /// pop a log entry from the front of queue fn pop_front(&mut self) -> Option>> { - if self.entries.front().is_some() { - let front_size = *self - .entry_size - .front() - .unwrap_or_else(|| unreachable!("The entry_size cannot be empty")); - + if let Some(entry) = self.entries.pop_front() { if self.first_idx_in_cur_batch == 0 { - self.cur_batch_size -= front_size; + self.cur_batch_size -= entry.size; } else { self.first_idx_in_cur_batch -= 1; } - let _ = self .batch_end .pop_front() .unwrap_or_else(|| unreachable!("The batch_end cannot be empty")); - let _ = self - .entry_size - .pop_front() - .unwrap_or_else(|| unreachable!("The pop_front cannot be empty")); - self.entries.pop_front() + Some(entry) } else { None } @@ -228,20 +248,18 @@ impl Log { fn restore(&mut self, entries: Vec>) { self.batch_end = VecDeque::with_capacity(entries.capacity()); self.entries = VecDeque::with_capacity(entries.capacity()); - self.entry_size = VecDeque::with_capacity(entries.capacity()); self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; for entry in entries { - let _unuse = self.push_back(Arc::from(entry)); + self.push_back(Arc::from(entry)); } } /// clear whole log entries fn clear(&mut self) { self.entries.clear(); - self.entry_size.clear(); self.batch_end.clear(); self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; @@ -254,7 +272,7 @@ impl Log { return LogRange::Range(self.batch_end.len()..self.batch_end.len()); } - if self.entry_size[left] == self.batch_limit { + if self.entries[left].size == self.batch_limit { return LogRange::RangeInclusive(left..=left); } @@ -295,7 +313,6 @@ impl Log { ) -> Self { Self { entries: VecDeque::with_capacity(entries_cap), - entry_size: VecDeque::with_capacity(entries_cap), batch_end: VecDeque::with_capacity(entries_cap), batch_limit, first_idx_in_cur_batch: 0, @@ -391,8 +408,7 @@ impl Log { conf_changes.push(Arc::clone(&entry)); } #[allow(clippy::expect_used)] // It's safe to expect here. - self.push_back(Arc::clone(&entry)) - .expect("log entry {entry:?} cannot be serialized"); + self.push_back(Arc::clone(&entry)); self.send_persist(entry); } @@ -426,8 +442,8 @@ impl Log { entry: impl Into>, ) -> Result>, bincode::Error> { let index = self.last_log_index() + 1; - let entry = Arc::new(LogEntry::new(index, term, propose_id, entry)); - self.push_back(Arc::clone(&entry))?; + let entry = Arc::new(LogEntry::new(index, term, propose_id, entry)?); + self.push_back(Arc::clone(&entry)); self.send_persist(Arc::clone(&entry)); Ok(entry) } @@ -437,7 +453,7 @@ impl Log { pub(super) fn has_next_batch(&self, li: u64) -> bool { let left = self.li_to_pi(li); if let Some(&batch_end) = self.batch_end.get(left) { - batch_end != 0 || self.entry_size[left] == self.batch_limit + batch_end != 0 || self.entries[left].size == self.batch_limit } else { false } @@ -542,7 +558,6 @@ impl Log { self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; self.batch_end.clear(); - self.entry_size.clear(); let prev_entries = self.entries.clone(); self.entries.clear(); @@ -579,8 +594,8 @@ mod tests { Log::::new(log_tx, default_batch_max_size(), default_log_entries_cap()); let result = log.try_append_entries( vec![ - LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())), - LogEntry::new(2, 1, ProposeId(0, 1), Arc::new(TestCommand::default())), + LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(2, 1, ProposeId(0, 1), Arc::new(TestCommand::default())).unwrap(), ], 0, 0, @@ -601,9 +616,9 @@ mod tests { Log::::new(log_tx, default_batch_max_size(), default_log_entries_cap()); let result = log.try_append_entries( vec![ - LogEntry::new(1, 1, ProposeId(0, 1), Arc::new(TestCommand::default())), - LogEntry::new(2, 1, ProposeId(0, 2), Arc::new(TestCommand::default())), - LogEntry::new(3, 1, ProposeId(0, 3), Arc::new(TestCommand::default())), + LogEntry::new(1, 1, ProposeId(0, 1), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(2, 1, ProposeId(0, 2), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(3, 1, ProposeId(0, 3), Arc::new(TestCommand::default())).unwrap(), ], 0, 0, @@ -612,8 +627,8 @@ mod tests { let result = log.try_append_entries( vec![ - LogEntry::new(2, 2, ProposeId(0, 4), Arc::new(TestCommand::default())), - LogEntry::new(3, 2, ProposeId(0, 5), Arc::new(TestCommand::default())), + LogEntry::new(2, 2, ProposeId(0, 4), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(3, 2, ProposeId(0, 5), Arc::new(TestCommand::default())).unwrap(), ], 1, 1, @@ -629,12 +644,7 @@ mod tests { let mut log = Log::::new(log_tx, default_batch_max_size(), default_log_entries_cap()); let result = log.try_append_entries( - vec![LogEntry::new( - 1, - 1, - ProposeId(0, 0), - Arc::new(TestCommand::default()), - )], + vec![LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap()], 0, 0, ); @@ -642,8 +652,8 @@ mod tests { let result = log.try_append_entries( vec![ - LogEntry::new(4, 2, ProposeId(0, 1), Arc::new(TestCommand::default())), - LogEntry::new(5, 2, ProposeId(0, 2), Arc::new(TestCommand::default())), + LogEntry::new(4, 2, ProposeId(0, 1), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(5, 2, ProposeId(0, 2), Arc::new(TestCommand::default())).unwrap(), ], 3, 1, @@ -652,8 +662,8 @@ mod tests { let result = log.try_append_entries( vec![ - LogEntry::new(2, 2, ProposeId(0, 3), Arc::new(TestCommand::default())), - LogEntry::new(3, 2, ProposeId(0, 4), Arc::new(TestCommand::default())), + LogEntry::new(2, 2, ProposeId(0, 3), Arc::new(TestCommand::default())).unwrap(), + LogEntry::new(3, 2, ProposeId(0, 4), Arc::new(TestCommand::default())).unwrap(), ], 1, 2, @@ -674,7 +684,7 @@ mod tests { .enumerate() .map(|(idx, cmd)| log.push(1, ProposeId(0, idx.numeric_cast()), cmd).unwrap()) .collect::>(); - let log_entry_size = log.entry_size[0]; + let log_entry_size = log.entries[0].size; log.set_batch_limit(3 * log_entry_size - 1); let bound_1 = log.get_range_by_batch(3); @@ -755,6 +765,7 @@ mod tests { ProposeId(0, idx.numeric_cast()), cmd, ) + .unwrap() }) .collect::>>(); let (tx, _rx) = mpsc::unbounded_channel(); @@ -764,7 +775,6 @@ mod tests { log.restore_entries(entries); assert_eq!(log.entries.len(), 10); assert_eq!(log.batch_end.len(), 10); - assert_eq!(log.entry_size.len(), 10); } #[test] @@ -783,7 +793,6 @@ mod tests { assert_eq!(log.entries.front().unwrap().index, 13); assert_eq!(log.batch_end.len(), 18); assert!(log.entries.len() == log.batch_end.len()); - assert!(log.entry_size.len() == log.entries.len()); } #[test] @@ -794,7 +803,7 @@ mod tests { log.push(0, ProposeId(0, i), Arc::new(TestCommand::default())) .unwrap(); } - let log_entry_size = log.entry_size[0]; + let log_entry_size = log.entries[0].size; log.set_batch_limit(2 * log_entry_size); log.last_as = 22; log.last_exe = 22; @@ -825,4 +834,43 @@ mod tests { ); assert!(log.has_next_batch(15)); } + + #[test] + fn batch_info_should_update_correctly_after_truncated() { + let (log_tx, _log_rx) = mpsc::unbounded_channel(); + let mut log = Log::::new(log_tx, 11, 10); + let mock_entries_sizes = vec![1, 5, 6, 2, 3, 4, 5]; + for i in 0..7 { + let index = log.last_log_index() + 1; + let mut entry = + LogEntry::new(index, 0, ProposeId(0, i), Arc::new(TestCommand::default())).unwrap(); + entry.size = mock_entries_sizes[i as usize]; + let entry_arc = Arc::new(entry); + log.push_back(Arc::clone(&entry_arc)); + log.send_persist(entry_arc); + } + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.first_idx_in_cur_batch, 5); + + // case 1. truncate len > first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3, 4], the `batch_end` should be [2, 3, 5, 0, 0, 0] + log.truncate(6); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0, 0])); + + // case 2. truncate len = first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3], the `batch_end` should be [2, 3, 5, 0, 0] + log.truncate(5); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 5); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0])); + + // case 3. truncate len < first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5], the `batch_end` should be [0, 0] + log.truncate(2); + assert_eq!(log.first_idx_in_cur_batch, 0); + assert_eq!(log.cur_batch_size, 6); + assert_eq!(log.batch_end, VecDeque::from(vec![0, 0])); + } } diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 5e3896c37..49a02d1ab 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -397,7 +397,8 @@ fn handle_ae_will_reject_wrong_log() { 1, ProposeId(TEST_CLIENT_ID, 0), Arc::new(TestCommand::default()), - )], + ) + .unwrap()], 0, ); assert_eq!(result, Err((1, 1))); @@ -568,7 +569,8 @@ fn handle_vote_will_reject_outdated_candidate() { 1, ProposeId(TEST_CLIENT_ID, 0), Arc::new(TestCommand::default()), - )], + ) + .unwrap()], 0, ); assert!(result.is_ok()); diff --git a/crates/curp/src/server/storage/db.rs b/crates/curp/src/server/storage/db.rs index 0c8433080..893f234f8 100644 --- a/crates/curp/src/server/storage/db.rs +++ b/crates/curp/src/server/storage/db.rs @@ -200,9 +200,9 @@ mod tests { let s = DB::::open(&storage_cfg)?; s.flush_voted_for(1, 222).await?; s.flush_voted_for(3, 111).await?; - let entry0 = LogEntry::new(1, 3, ProposeId(1, 1), Arc::new(TestCommand::default())); - let entry1 = LogEntry::new(2, 3, ProposeId(1, 2), Arc::new(TestCommand::default())); - let entry2 = LogEntry::new(3, 3, ProposeId(1, 3), Arc::new(TestCommand::default())); + let entry0 = LogEntry::new(1, 3, ProposeId(1, 1), Arc::new(TestCommand::default()))?; + let entry1 = LogEntry::new(2, 3, ProposeId(1, 2), Arc::new(TestCommand::default()))?; + let entry2 = LogEntry::new(3, 3, ProposeId(1, 3), Arc::new(TestCommand::default()))?; s.put_log_entry(&entry0).await?; s.put_log_entry(&entry1).await?; s.put_log_entry(&entry2).await?; diff --git a/crates/curp/src/server/storage/wal/codec.rs b/crates/curp/src/server/storage/wal/codec.rs index b891ccbd1..92840e13a 100644 --- a/crates/curp/src/server/storage/wal/codec.rs +++ b/crates/curp/src/server/storage/wal/codec.rs @@ -321,7 +321,7 @@ mod tests { #[tokio::test] async fn frame_encode_decode_is_ok() { let mut codec = WAL::::new(); - let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); + let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty).unwrap(); let data_frame = DataFrame::Entry(entry.clone()); let seal_frame = DataFrame::::SealIndex(1); let mut encoded = codec.encode(vec![data_frame]).unwrap(); @@ -343,7 +343,7 @@ mod tests { #[tokio::test] async fn frame_zero_write_will_be_detected() { let mut codec = WAL::::new(); - let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); + let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty).unwrap(); let data_frame = DataFrame::Entry(entry.clone()); let seal_frame = DataFrame::::SealIndex(1); let mut encoded = codec.encode(vec![data_frame]).unwrap(); @@ -356,7 +356,7 @@ mod tests { #[tokio::test] async fn frame_corrupt_will_be_detected() { let mut codec = WAL::::new(); - let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); + let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty).unwrap(); let data_frame = DataFrame::Entry(entry.clone()); let seal_frame = DataFrame::::SealIndex(1); let mut encoded = codec.encode(vec![data_frame]).unwrap(); diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 217d1f066..11035c7c8 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -369,12 +369,18 @@ mod tests { let frames: Vec<_> = (0..100) .map(|i| { - DataFrame::Entry(LogEntry::new( - i, - 1, - crate::rpc::ProposeId(0, 0), - EntryData::Command(Arc::new(TestCommand::new_put(vec![i as u32], i as u32))), - )) + DataFrame::Entry( + LogEntry::new( + i, + 1, + crate::rpc::ProposeId(0, 0), + EntryData::Command(Arc::new(TestCommand::new_put( + vec![i as u32], + i as u32, + ))), + ) + .unwrap(), + ) }) .collect(); segment.write_sync(frames.clone(), WAL::new());