diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 7adfbbfa2..64ea3e8ee 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -12,7 +12,7 @@ use bincode::serialized_size; use clippy_utilities::{NumericCast, OverflowArithmetic}; use itertools::Itertools; use tokio::sync::mpsc; -use tracing::{error, trace, warn}; +use tracing::{error, warn}; use crate::{ cmd::Command, @@ -118,7 +118,6 @@ impl LogEntryVecDeque { /// push a log entry into the back of queue fn push_back(&mut self, entry: Arc>) -> Result<(), bincode::Error> { - #![allow(clippy::indexing_slicing)] let entry_size = serialized_size(&entry)?; if entry_size > self.batch_limit { @@ -128,36 +127,17 @@ impl LogEntryVecDeque { self.entries.push_back(entry); self.entry_size.push_back(entry_size); self.batch_index.push_back(0); // placeholder - - if entry_size > self.batch_limit { - let entry_idx = self.batch_index.len() - 1; - for prev_idx in self.first_entry_at_last_batch..entry_idx { - self.batch_index[prev_idx] = entry_idx - prev_idx; // record offset but not absolute index - } - self.batch_index[entry_idx] = 1; - self.last_batch_size = 0; - self.first_entry_at_last_batch = entry_idx + 1; - return Ok(()); - } - - while self.last_batch_size + entry_size > self.batch_limit - && self.first_entry_at_last_batch < self.entries.len() - { - self.batch_index[self.first_entry_at_last_batch] = - self.entries.len() - 1 - self.first_entry_at_last_batch; // record offset but not absolute index - self.last_batch_size -= self.entry_size[self.first_entry_at_last_batch]; - self.first_entry_at_last_batch += 1; - } - self.last_batch_size += entry_size; - if self.first_entry_at_last_batch >= self.entries.len() { - self.batch_index[self.entries.len() - 1] = 1; - } - - if self.last_batch_size == self.batch_limit { - self.batch_index[self.first_entry_at_last_batch] = - self.entries.len() - self.first_entry_at_last_batch; // record offset but not absolute index + while self.last_batch_size > self.batch_limit { + if let Some(cur_batch_index) = self.batch_index.get_mut(self.first_entry_at_last_batch) + { + *cur_batch_index = self.entries.len() - 1 - self.first_entry_at_last_batch; + if let Some(cur_entry_size) = self.entry_size.get(self.first_entry_at_last_batch) { + self.last_batch_size -= *cur_entry_size; + } + self.first_entry_at_last_batch += 1; + } } Ok(()) @@ -165,9 +145,8 @@ impl LogEntryVecDeque { /// pop a log entry from the front of queue fn pop_front(&mut self) -> Option>> { - #![allow(clippy::indexing_slicing)] if self.entries.front().is_some() { - let front_size = self.entry_size[0]; + let front_size = *self.entry_size.front().unwrap_or_else(|| unreachable!("The entry_size cannot be empty")); if self.first_entry_at_last_batch == 0 { self.last_batch_size -= front_size; @@ -213,16 +192,13 @@ impl LogEntryVecDeque { } /// Get the range [left, right) of the log entry, whose size should be equal or smaller than `batch_limit` + #[allow(clippy::range_plus_one)] fn get_range_by_batch(&self, left: usize) -> Range { #![allow(clippy::indexing_slicing)] if left >= self.batch_index.len() { - trace!( - "left = {}, self.batch_index.len() = {}, self.entries.len() = {}", - left, - self.batch_index.len(), - self.entries.len() - ); left..self.entries.len() + } else if self.entry_size[left] > self.batch_limit { + left..left + 1 } else if self.batch_index[left] == 0 { left..self.entries.len() } else { @@ -240,6 +216,9 @@ impl LogEntryVecDeque { fn has_next_batch(&self, left: usize) -> bool { if let Some(&offset) = self.batch_index.get(left) { offset != 0 + || (self.first_entry_at_last_batch == left + && self.batch_limit == self.last_batch_size) + || left == self.batch_index.len() - 1 } else { false } @@ -252,41 +231,14 @@ impl LogEntryVecDeque { self.batch_limit = batch_limit; self.last_batch_size = 0; self.first_entry_at_last_batch = 0; - self.batch_index.iter_mut().for_each(|val| *val = 0); - - for entry_idx in 0..self.entries.len() { - let entry_size = self.entry_size[entry_idx]; - - if entry_size > self.batch_limit { - for prev_idx in self.first_entry_at_last_batch..entry_idx { - self.batch_index[prev_idx] = entry_idx - prev_idx; // record offset but not absolute index - } - self.batch_index[entry_idx] = 1; - self.last_batch_size = 0; - self.first_entry_at_last_batch = entry_idx + 1; - continue; - } - - while self.last_batch_size + entry_size > self.batch_limit - && self.first_entry_at_last_batch < self.entries.len() - { - self.batch_index[self.first_entry_at_last_batch] = - entry_idx - self.first_entry_at_last_batch; // record offset but not absolute index - self.last_batch_size -= self.entry_size[self.first_entry_at_last_batch]; - self.first_entry_at_last_batch += 1; - } - - self.last_batch_size += entry_size; + self.batch_index.clear(); - if self.first_entry_at_last_batch >= self.entries.len() { - self.batch_index[entry_idx] = 1; - } + let prev_entries = self.entries.clone(); + self.entries.clear(); - if entry_idx == self.entries.len() - 1 && self.last_batch_size == self.batch_limit { - self.batch_index[self.first_entry_at_last_batch] = - self.entries.len() - self.first_entry_at_last_batch; // record offset but not absolute index - } - } + let _unused = prev_entries.into_iter().for_each(|item| { + let _u = self.push_back(item); + }); } }