diff --git a/crates/curp/src/server/storage/wal/codec.rs b/crates/curp/src/server/storage/wal/codec.rs index d3edda801..41c2a6575 100644 --- a/crates/curp/src/server/storage/wal/codec.rs +++ b/crates/curp/src/server/storage/wal/codec.rs @@ -5,10 +5,10 @@ use curp_external_api::LogIndex; use serde::{de::DeserializeOwned, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; -use tokio_util::codec::{Decoder, Encoder}; use super::{ error::{CorruptType, WALError}, + framed::{Decoder, Encoder}, util::{get_checksum, validate_data}, }; use crate::log_entry::LogEntry; @@ -104,18 +104,13 @@ where { type Error = io::Error; - fn encode( - &mut self, - frames: Vec>, - dst: &mut bytes::BytesMut, - ) -> Result<(), Self::Error> { - let frames_bytes: Vec<_> = frames.into_iter().flat_map(|f| f.encode()).collect(); - let commit_frame = CommitFrame::new_from_data(&frames_bytes); + /// Encodes a frame + fn encode(&mut self, frames: Vec>) -> Result, Self::Error> { + let mut frame_data: Vec<_> = frames.into_iter().flat_map(|f| f.encode()).collect(); + let commit_frame = CommitFrame::new_from_data(&frame_data); + frame_data.extend_from_slice(&commit_frame.encode()); - dst.extend(frames_bytes); - dst.extend(commit_frame.encode()); - - Ok(()) + Ok(frame_data) } } @@ -127,30 +122,34 @@ where type Error = WALError; - fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - loop { - if let Some((frame, len)) = WALFrame::::decode(src)? { - let decoded_bytes = src.split_to(len); - match frame { - WALFrame::Data(data) => { - self.frames.push(data); - self.hasher.update(decoded_bytes); - } - WALFrame::Commit(commit) => { - let frames_bytes: Vec<_> = - self.frames.iter().flat_map(DataFrame::encode).collect(); - let checksum = self.hasher.clone().finalize(); - self.hasher.reset(); - if commit.validate(&checksum) { - return Ok(Some(self.frames.drain(..).collect())); - } - return Err(WALError::Corrupted(CorruptType::Checksum)); + #[allow(clippy::arithmetic_side_effects)] // the arithmetic only used as slice indices + fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error> { + let mut current = 0; + while current < src.len() { + let next = src.get(current..).ok_or(WALError::UnexpectedEof)?; + let Some((frame, len)) = WALFrame::::decode(next)? else { + return Err(WALError::UnexpectedEof); + }; + let decoded_bytes = src + .get(current..current + len) + .ok_or(WALError::UnexpectedEof)?; + current += len; + match frame { + WALFrame::Data(data) => { + self.frames.push(data); + self.hasher.update(decoded_bytes); + } + WALFrame::Commit(commit) => { + let checksum = self.hasher.clone().finalize(); + self.hasher.reset(); + if commit.validate(&checksum) { + return Ok((self.frames.drain(..).collect(), current)); } + return Err(WALError::Corrupted(CorruptType::Checksum)); } - } else { - return Ok(None); } } + Err(WALError::UnexpectedEof) } } @@ -191,7 +190,7 @@ where .unwrap_or_else(|_| unreachable!("this conversion will always succeed")); let frame_type = header[0]; match frame_type { - INVALID => Err(WALError::MaybeEnded), + INVALID => Err(WALError::UnexpectedEof), ENTRY => Self::decode_entry(header, &src[8..]), SEAL => Self::decode_seal_index(header), COMMIT => Self::decode_commit(&src[8..]), @@ -323,25 +322,19 @@ mod tests { #[tokio::test] async fn frame_encode_decode_is_ok() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); let seal_frame = DataFrame::::SealIndex(1); - framed.send(vec![data_frame]).await.unwrap(); - framed.send(vec![seal_frame]).await.unwrap(); - framed.get_mut().flush().await; - - let mut file = framed.into_inner(); - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - let mut framed = Framed::new(file, WAL::::new()); + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded.extend_from_slice(&codec.encode(vec![seal_frame]).unwrap()); - let data_frame_get = &framed.next().await.unwrap().unwrap()[0]; - let seal_frame_get = &framed.next().await.unwrap().unwrap()[0]; - let DataFrame::Entry(ref entry_get) = *data_frame_get else { + let (data_frame_get, len) = codec.decode(&encoded).unwrap(); + let (seal_frame_get, _) = codec.decode(&encoded[len..]).unwrap(); + let DataFrame::Entry(ref entry_get) = data_frame_get[0] else { panic!("frame should be type: DataFrame::Entry"); }; - let DataFrame::SealIndex(ref index) = *seal_frame_get else { + let DataFrame::SealIndex(ref index) = seal_frame_get[0] else { panic!("frame should be type: DataFrame::Entry"); }; @@ -351,46 +344,30 @@ mod tests { #[tokio::test] async fn frame_zero_write_will_be_detected() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); - framed.send(vec![data_frame]).await.unwrap(); - framed.get_mut().flush().await; - - let mut file = framed.into_inner(); - /// zero the first byte, it will reach a success state, - /// all following data will be truncated - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - file.write_u8(0).await; - - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - - let mut framed = Framed::new(file, WAL::::new()); + let seal_frame = DataFrame::::SealIndex(1); + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded[0] = 0; - let err = framed.next().await.unwrap().unwrap_err(); - assert!(matches!(err, WALError::MaybeEnded), "error {err} not match"); + let err = codec.decode(&encoded).unwrap_err(); + assert!( + matches!(err, WALError::UnexpectedEof), + "error {err} not match" + ); } #[tokio::test] async fn frame_corrupt_will_be_detected() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); - framed.send(vec![data_frame]).await.unwrap(); - framed.get_mut().flush().await; - - let mut file = framed.into_inner(); - /// This will cause a failure state - file.seek(io::SeekFrom::Start(1)).await.unwrap(); - file.write_u8(0).await; - - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - - let mut framed = Framed::new(file, WAL::::new()); + let seal_frame = DataFrame::::SealIndex(1); + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded[1] = 0; - let err = framed.next().await.unwrap().unwrap_err(); + let err = codec.decode(&encoded).unwrap_err(); assert!( matches!(err, WALError::Corrupted(_)), "error {err} not match" diff --git a/crates/curp/src/server/storage/wal/error.rs b/crates/curp/src/server/storage/wal/error.rs index bc705a3ef..34a3259dc 100644 --- a/crates/curp/src/server/storage/wal/error.rs +++ b/crates/curp/src/server/storage/wal/error.rs @@ -5,12 +5,9 @@ use thiserror::Error; /// Errors of the `WALStorage` #[derive(Debug, Error)] pub(crate) enum WALError { - /// The WAL segment might reach on end - /// - /// NOTE: This exists because we cannot tell the difference between a corrupted WAL - /// and a normally ended WAL, as the segment files are all preallocated with zeros + /// Unexpected end of file of the WAL #[error("WAL ended")] - MaybeEnded, + UnexpectedEof, /// The WAL corrupt error #[error("WAL corrupted: {0}")] Corrupted(CorruptType), diff --git a/crates/curp/src/server/storage/wal/framed.rs b/crates/curp/src/server/storage/wal/framed.rs new file mode 100644 index 000000000..d5ea00e19 --- /dev/null +++ b/crates/curp/src/server/storage/wal/framed.rs @@ -0,0 +1,22 @@ +use std::io; + +/// Decoding of frames via buffers. +pub(super) trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + type Error: From; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error>; +} + +/// Trait of helper objects to write out messages as bytes +pub(super) trait Encoder { + /// The type of encoding errors. + type Error: From; + + /// Encodes a frame + fn encode(&mut self, item: Item) -> Result, Self::Error>; +} diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index c393c2b0d..4a97f2195 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -15,6 +15,9 @@ mod segment; /// File utils mod util; +/// Framed traits +mod framed; + /// The magic of the WAL file const WAL_MAGIC: u32 = 0xd86e_0be2; diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index b5af7b57f..f40a9eb44 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -29,7 +29,7 @@ pub(super) struct FilePipeline { /// The size of the temp file file_size: u64, /// The file receive stream - file_stream: RecvStream<'static, LockedFile>, + file_stream: flume::IntoIter, /// Stopped flag stopped: Arc, } @@ -97,7 +97,7 @@ impl FilePipeline { Ok(Self { dir, file_size, - file_stream: file_rx.into_stream(), + file_stream: file_rx.into_iter(), stopped, }) } @@ -136,18 +136,14 @@ impl Drop for FilePipeline { } } -impl Stream for FilePipeline { +impl Iterator for FilePipeline { type Item = io::Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn next(&mut self) -> Option { if self.stopped.load(Ordering::Relaxed) { - return Poll::Ready(None); + return None; } - - self.file_stream.poll_next_unpin(cx).map(|opt| opt.map(Ok)) + self.file_stream.next().map(Ok) } } @@ -175,11 +171,11 @@ mod tests { let file = file.into_std(); assert_eq!(file.metadata().unwrap().len(), file_size,); }; - let file0 = pipeline.next().await.unwrap().unwrap(); + let file0 = pipeline.next().unwrap().unwrap(); check_size(file0); - let file1 = pipeline.next().await.unwrap().unwrap(); + let file1 = pipeline.next().unwrap().unwrap(); check_size(file1); pipeline.stop(); - assert!(pipeline.next().await.is_none()); + assert!(pipeline.next().is_none()); } } diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 525ef3aa5..6e78f4999 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -1,20 +1,26 @@ -use std::{io, iter, pin::Pin, sync::Arc, task::Poll}; +use std::{ + fs::File, + io::{self, Read, Write}, + iter, + pin::Pin, + sync::Arc, + task::Poll, +}; use clippy_utilities::{NumericCast, OverflowArithmetic}; use curp_external_api::LogIndex; use futures::{ready, FutureExt, SinkExt}; use serde::{de::DeserializeOwned, Serialize}; use tokio::{ - fs::File as TokioFile, io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, sync::Mutex, }; use tokio_stream::StreamExt; -use tokio_util::codec::Framed; use super::{ codec::{DataFrame, WAL}, error::{CorruptType, WALError}, + framed::{Decoder, Encoder}, util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; @@ -33,101 +39,66 @@ pub(super) struct WALSegment { /// The soft size limit of this segment size_limit: u64, /// The opened file of this segment - file: TokioFile, + file: File, /// The file size of the segment size: u64, /// The highest index of the segment seal_index: LogIndex, - /// The IO state of the file - io_state: IOState, -} - -/// The IO state of the file -#[derive(Clone, Copy, Debug, Default)] -pub(super) enum IOState { - /// The initial state that haven't written any data or fsynced - #[default] - Fsynced, - /// Already wrote some data, but haven't flushed yet - Written, - /// Already flushed, but haven't called fsync yet - Flushed, - /// Shutdowned - Shutdown, - /// The IO has failed on this file - Errored, } impl WALSegment { /// Creates a new `WALSegment` - pub(super) async fn create( + pub(super) fn create( tmp_file: LockedFile, base_index: LogIndex, segment_id: u64, size_limit: u64, ) -> io::Result { let segment_name = Self::segment_name(segment_id, base_index); - let mut lfile = tmp_file.rename(segment_name)?; - let mut tokio_file = lfile.into_async(); - tokio_file - .write_all(&Self::gen_header(base_index, segment_id)) - .await?; - tokio_file.flush().await?; - tokio_file.sync_all().await?; + let mut locked_file = tmp_file.rename(segment_name)?; + let mut file = locked_file.into_std(); + file.write_all(&Self::gen_header(base_index, segment_id))?; + file.flush()?; + file.sync_data()?; Ok(Self { base_index, segment_id, size_limit, - file: tokio_file, + file, size: WAL_HEADER_SIZE.numeric_cast(), // For convenience we set it to largest u64 value that represent not sealed seal_index: u64::MAX, - io_state: IOState::default(), }) } /// Open an existing WAL segment file - pub(super) async fn open(mut lfile: LockedFile, size_limit: u64) -> Result { - let mut tokio_file = lfile.into_async(); - let size = tokio_file.metadata().await?.len(); + pub(super) fn open(mut locked_file: LockedFile, size_limit: u64) -> Result { + let mut file = locked_file.into_std(); + let size = file.metadata()?.len(); let mut buf = vec![0; WAL_HEADER_SIZE]; - let _ignore = tokio_file.read_exact(&mut buf).await?; + file.read_exact(&mut buf)?; let (base_index, segment_id) = Self::parse_header(&buf)?; Ok(Self { base_index, segment_id, size_limit, - file: tokio_file, + file, size, // Index 0 means the seal_index hasn't been read yet seal_index: 0, - io_state: IOState::default(), }) } /// Recover log entries from a `WALSegment` - pub(super) async fn recover_segment_logs( + pub(super) fn recover_segment_logs( &mut self, ) -> Result>, WALError> where C: Serialize + DeserializeOwned + 'static, { - let mut self_framed = Framed::new(self, WAL::::new()); - let mut frame_batches = vec![]; - while let Some(result) = self_framed.next().await { - match result { - Ok(f) => frame_batches.push(f), - Err(e) => { - /// If the segment file reaches on end, stop reading - if matches!(e, WALError::MaybeEnded) { - break; - } - return Err(e); - } - } - } + let frame_batches = self.read_all(WAL::::new())?; // The highest_index of this segment let mut highest_index = u64::MAX; // We get the last frame batch to check it's type @@ -141,7 +112,7 @@ impl WALSegment { } // Update seal index - self_framed.get_mut().update_seal_index(highest_index); + self.update_seal_index(highest_index); // Get log entries that index is no larger than `highest_index` Ok(frame_batches.into_iter().flatten().filter_map(move |f| { @@ -156,23 +127,45 @@ impl WALSegment { /// Seal the current segment /// /// After the seal, the log index in this segment should be less than `next_index` - pub(super) async fn seal(&mut self, next_index: LogIndex) -> io::Result<()> { - let mut framed = Framed::new(self, WAL::::new()); - framed.send(vec![DataFrame::SealIndex(next_index)]).await?; - framed.flush().await?; - framed.get_mut().sync_all().await?; - framed.get_mut().update_seal_index(next_index); + pub(super) fn seal(&mut self, next_index: LogIndex) -> io::Result<()> { + self.write_sync(vec![DataFrame::SealIndex(next_index)], WAL::::new())?; + self.update_seal_index(next_index); Ok(()) } - /// Syncs the file of this segment - pub(super) async fn sync_all(&mut self) -> io::Result<()> { - self.file.sync_all().await?; - self.io_state.fsynced(); + /// Writes an item to the segment + pub(super) fn write_sync(&mut self, item: Item, mut encoder: U) -> io::Result<()> + where + U: Encoder, + { + let encoded_bytes = encoder.encode(item)?; + self.file.write_all(&encoded_bytes)?; + self.size = self.size.overflow_add(encoded_bytes.len().numeric_cast()); + self.file.flush()?; + self.file.sync_data()?; Ok(()) } + /// Read all items from the segment + #[allow(clippy::indexing_slicing)] + #[allow(clippy::arithmetic_side_effects)] // only used for slice indices + fn read_all(&mut self, mut decoder: U) -> Result, WALError> + where + U: Decoder, + { + let mut buf = Vec::new(); + let _ignore = self.file.read_to_end(&mut buf)?; + let mut pos = 0; + let mut entries = Vec::new(); + while pos < buf.len() { + let (item, n) = decoder.decode(&buf[pos..])?; + entries.push(item); + pos += n; + } + Ok(entries) + } + /// Updates the seal index pub(super) fn update_seal_index(&mut self, index: LogIndex) { self.seal_index = self.seal_index.max(index); @@ -205,11 +198,6 @@ impl WALSegment { self.seal_index < self.base_index } - /// Gets the io state of this segment - pub(super) fn io_state(&self) -> IOState { - self.io_state - } - /// Gets the file name of the WAL segment pub(super) fn segment_name(segment_id: u64, log_index: u64) -> String { format!("{segment_id:016x}-{log_index:016x}{WAL_FILE_EXT}") @@ -275,68 +263,6 @@ impl WALSegment { } } -impl AsyncWrite for WALSegment { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_write(cx, buf)) { - Ok(len) => { - self.io_state.written(); - self.size = self.size.overflow_add(len.numeric_cast()); - Poll::Ready(Ok(len)) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_flush(cx)) { - Ok(()) => { - self.io_state.flushed(); - Poll::Ready(Ok(())) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_shutdown(cx)) { - Ok(()) => { - self.io_state.shutdowned(); - Poll::Ready(Ok(())) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } -} - -impl AsyncRead for WALSegment { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.file).poll_read(cx, buf) - } -} - impl PartialEq for WALSegment { fn eq(&self, other: &Self) -> bool { self.segment_id.eq(&other.segment_id) @@ -357,57 +283,6 @@ impl Ord for WALSegment { } } -impl IOState { - /// Mutate the state to `IOState::Written` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Written` or `IOState::Fsynced` - fn written(&mut self) { - assert!( - matches!(*self, IOState::Written | IOState::Fsynced), - "current state is {self:?}" - ); - *self = IOState::Written; - } - - /// Mutate the state to `IOState::Flushed` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Flushed` or `IOState::Written` - fn flushed(&mut self) { - assert!( - matches!(*self, IOState::Flushed | IOState::Written), - "current state is {self:?}" - ); - *self = IOState::Flushed; - } - - /// Mutate the state to `IOState::Written` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Fsynced` or `IOState::Flushed` - fn fsynced(&mut self) { - assert!( - matches!(*self, IOState::Fsynced | IOState::Flushed), - "current state is {self:?}" - ); - *self = IOState::Fsynced; - } - - /// Mutate the state to `IOState::Errored` - fn errored(&mut self) { - *self = IOState::Errored; - } - - /// Mutate the state to `IOState::Shutdowned` - fn shutdowned(&mut self) { - *self = IOState::Shutdown; - } -} - #[cfg(test)] mod tests { use std::{path::PathBuf, time::Duration}; @@ -417,40 +292,6 @@ mod tests { use super::*; use crate::log_entry::EntryData; - #[tokio::test] - async fn segment_state_transition_is_correct() { - let expect_state = |segment: &WALSegment, state: IOState| { - assert!( - matches!(segment.io_state(), state), - "expect {state:?}, current state: {:?}", - segment.io_state() - ); - }; - - let temp_dir = tempfile::tempdir().unwrap(); - let mut file_path = PathBuf::from(temp_dir.path()); - file_path.push("0.tmp"); - let lfile = LockedFile::open_rw(&file_path).unwrap(); - let mut segment = WALSegment::create(lfile, 1, 0, 512).await.unwrap(); - - expect_state(&segment, IOState::Fsynced); - - segment.write_u64(1).await.unwrap(); - expect_state(&segment, IOState::Written); - segment.write_u64(2).await.unwrap(); - expect_state(&segment, IOState::Written); - - segment.flush().await; - expect_state(&segment, IOState::Flushed); - segment.flush().await; - expect_state(&segment, IOState::Flushed); - - segment.sync_all().await; - expect_state(&segment, IOState::Fsynced); - segment.sync_all().await; - expect_state(&segment, IOState::Fsynced); - } - #[test] fn gen_parse_header_is_correct() { fn corrupt(mut header: Vec, pos: usize) -> Vec { @@ -471,8 +312,8 @@ mod tests { } } - #[tokio::test] - async fn segment_seal_is_ok() { + #[test] + fn segment_seal_is_ok() { const BASE_INDEX: u64 = 17; const SEGMENT_ID: u64 = 2; const SIZE_LIMIT: u64 = 5; @@ -483,17 +324,15 @@ mod tests { let mut wal_path = dir.path().to_path_buf(); wal_path.push(segment_name); let file = LockedFile::open_rw(&tmp_path).unwrap(); - let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT) - .await - .unwrap(); - segment.seal::<()>(20).await.unwrap(); - segment.seal::<()>(30).await.unwrap(); - segment.seal::<()>(40).await.unwrap(); + let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT).unwrap(); + segment.seal::<()>(20).unwrap(); + segment.seal::<()>(30).unwrap(); + segment.seal::<()>(40).unwrap(); drop(segment); let file = LockedFile::open_rw(wal_path).unwrap(); - let mut segment = WALSegment::open(file, SIZE_LIMIT).await.unwrap(); - let _ignore = segment.recover_segment_logs::<()>().await.unwrap(); + let mut segment = WALSegment::open(file, SIZE_LIMIT).unwrap(); + let _ignore = segment.recover_segment_logs::<()>().unwrap(); assert_eq!(segment.seal_index, 40); } @@ -509,11 +348,8 @@ mod tests { let mut wal_path = dir.path().to_path_buf(); wal_path.push(segment_name); let file = LockedFile::open_rw(&tmp_path).unwrap(); - let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT) - .await - .unwrap(); + let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT).unwrap(); - let mut seg_framed = Framed::new(&mut segment, WAL::::new()); let frames: Vec<_> = (0..100) .map(|i| { DataFrame::Entry(LogEntry::new( @@ -524,17 +360,14 @@ mod tests { )) }) .collect(); - seg_framed.send(frames.clone()).await.unwrap(); - seg_framed.flush().await.unwrap(); - seg_framed.get_mut().sync_all().await.unwrap(); + segment.write_sync(frames.clone(), WAL::new()); drop(segment); let file = LockedFile::open_rw(wal_path).unwrap(); - let mut segment = WALSegment::open(file, SIZE_LIMIT).await.unwrap(); + let mut segment = WALSegment::open(file, SIZE_LIMIT).unwrap(); let recovered: Vec<_> = segment .recover_segment_logs::() - .await .unwrap() .map(|e| DataFrame::Entry(e)) .collect(); diff --git a/crates/curp/src/server/storage/wal/util.rs b/crates/curp/src/server/storage/wal/util.rs index d5ad61492..080990990 100644 --- a/crates/curp/src/server/storage/wal/util.rs +++ b/crates/curp/src/server/storage/wal/util.rs @@ -70,11 +70,6 @@ impl LockedFile { .unwrap_or_else(|| unreachable!("File should always exist after creation")) } - /// Converts self to tokio file - pub(super) fn into_async(self) -> TokioFile { - TokioFile::from_std(self.into_std()) - } - /// Gets the file wrapped inside an `Option` fn file(&mut self) -> &mut StdFile { self.file