Skip to content

Commit

Permalink
Removing async from mrecordlog. (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Mar 1, 2024
1 parent bc6a998 commit 2c593d3
Show file tree
Hide file tree
Showing 22 changed files with 511 additions and 665 deletions.
10 changes: 1 addition & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,15 @@ rust-version = "1.68" # 1.67 contains an UB we would tri
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1"
bytes = "1"
crc32fast = "1.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = [
"fs",
"io-std",
"io-util",
"macros",
"rt-multi-thread",
] }
tracing = "0.1.37"

[dev-dependencies]
criterion = { version = "0.4", features = ["async_tokio"] }
criterion = "0.4"
futures = "0.3"
proptest = "1"
rand = "0.8"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ It is possible to truncate each of the queues individually.

```rust
pub struct MultiRecordLog {
pub async fn create_queue(&mut self, queue: &str) -> Result<(), CreateQueueError>;
pub async fn delete_queue(&mut self, queue: &str) -> Result<(), DeleteQueueError>;
pub fn create_queue(&mut self, queue: &str) -> Result<(), CreateQueueError>;
pub fn delete_queue(&mut self, queue: &str) -> Result<(), DeleteQueueError>;
pub fn queue_exists(&self, queue: &str) -> bool;
pub fn list_queues(&self) -> impl Iterator<Item = &str> {
pub async fn append_record(
pub fn append_record(
&mut self,
queue: &str,
position_opt: Option<u64>,
payload: &[u8],
);
pub async fn truncate(&mut self, queue: &str, position: u64) -> Result<(), TruncateError>;
pub fn truncate(&mut self, queue: &str, position: u64) -> Result<(), TruncateError>;
pub fn range<R>(
&self,
queue: &str,
Expand Down
14 changes: 4 additions & 10 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use mrecordlog::MultiRecordLog;

async fn bench_single_size(size: usize, count: usize, loop_count: usize) {
fn bench_single_size(size: usize, count: usize, loop_count: usize) {
let tempdir = tempfile::tempdir().unwrap();
let mut record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
record_log.create_queue("q1").await.unwrap();
let mut record_log = MultiRecordLog::open(tempdir.path()).unwrap();
record_log.create_queue("q1").unwrap();

let record = vec![0; size];

for _ in 0..loop_count {
record_log
.append_records("q1", None, std::iter::repeat(&record[..]).take(count))
.await
.unwrap();
}
}
Expand All @@ -38,12 +37,7 @@ fn insert_throughput(c: &mut Criterion) {
),
&(record_size, record_count, loop_count),
|b, (record_size, record_count, loop_count)| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
b.to_async(runtime)
.iter(|| bench_single_size(*record_size, *record_count, *loop_count));
b.iter(|| bench_single_size(*record_size, *record_count, *loop_count));
},
);
}
Expand Down
18 changes: 6 additions & 12 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use std::io;

use async_trait::async_trait;

pub const BLOCK_NUM_BYTES: usize = 32_768;

#[async_trait]
pub trait BlockRead {
/// Loads the next block.
/// If `Ok(true)` is returned, the new block is available through
/// `.block()`.
///
/// If `Ok(false)` is returned, the end of the `BlockReader`
/// has been reached and the content of `block()` could be anything.
async fn next_block(&mut self) -> io::Result<bool>;
fn next_block(&mut self) -> io::Result<bool>;

/// A `BlockReader` is always position on a specific block.
///
Expand All @@ -25,12 +22,11 @@ pub trait BlockRead {
fn block(&self) -> &[u8; BLOCK_NUM_BYTES];
}

#[async_trait]
pub trait BlockWrite {
/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
async fn write(&mut self, buf: &[u8]) -> io::Result<()>;
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// The semantics of flush may depend on the implementation.
async fn flush(&mut self) -> io::Result<()>;
fn flush(&mut self) -> io::Result<()>;
/// Number of bytes that can be added in the block.
fn num_bytes_remaining_in_block(&self) -> usize;
}
Expand All @@ -50,9 +46,8 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> {
}
}

#[async_trait]
impl<'a> BlockRead for ArrayReader<'a> {
async fn next_block(&mut self) -> io::Result<bool> {
fn next_block(&mut self) -> io::Result<bool> {
if self.data.len() < BLOCK_NUM_BYTES {
return Ok(false);
}
Expand Down Expand Up @@ -83,9 +78,8 @@ impl From<VecBlockWriter> for Vec<u8> {
}
}

#[async_trait]
impl BlockWrite for VecBlockWriter {
async fn write(&mut self, buf: &[u8]) -> io::Result<()> {
fn write(&mut self, buf: &[u8]) -> io::Result<()> {
assert!(buf.len() <= self.num_bytes_remaining_in_block());
if self.cursor + buf.len() > self.buffer.len() {
let new_len = ceil_to_block((self.cursor + buf.len()) * 2 + 1);
Expand All @@ -96,7 +90,7 @@ impl BlockWrite for VecBlockWriter {
Ok(())
}

async fn flush(&mut self) -> io::Result<()> {
fn flush(&mut self) -> io::Result<()> {
Ok(())
}

Expand Down
14 changes: 7 additions & 7 deletions src/frame/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
crate::BLOCK_NUM_BYTES - self.cursor
}

async fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> {
fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> {
let num_bytes_to_end_of_block = self.num_bytes_to_end_of_block();
let need_to_skip_block = self.block_corrupted || num_bytes_to_end_of_block < HEADER_LEN;
if !need_to_skip_block {
return Ok(());
}
if !self.reader.next_block().await? {
if !self.reader.next_block()? {
return Err(ReadFrameError::NotAvailable);
}

Expand All @@ -79,8 +79,8 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
}

// Reads the next frame.
pub async fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> {
self.go_to_next_block_if_necessary().await?;
pub fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> {
self.go_to_next_block_if_necessary()?;
let header = self.get_frame_header()?;
self.cursor += HEADER_LEN;
if self.cursor + header.len() > BLOCK_NUM_BYTES {
Expand All @@ -105,9 +105,9 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
}

impl FrameReader<RollingReader> {
pub async fn into_writer(self) -> io::Result<FrameWriter<RollingWriter>> {
let mut rolling_writer: RollingWriter = self.reader.into_writer().await?;
rolling_writer.forward(self.cursor).await?;
pub fn into_writer(self) -> io::Result<FrameWriter<RollingWriter>> {
let mut rolling_writer: RollingWriter = self.reader.into_writer()?;
rolling_writer.forward(self.cursor)?;
Ok(FrameWriter::create(rolling_writer))
}
}
70 changes: 30 additions & 40 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,121 +5,111 @@ use crate::frame::header::{FrameType, HEADER_LEN};
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::BLOCK_NUM_BYTES;

#[tokio::test]
async fn test_frame_simple() {
#[test]
fn test_frame_simple() {
let block_writer = {
let wrt: VecBlockWriter = VecBlockWriter::default();
let mut frame_writer = FrameWriter::create(wrt);
frame_writer
.write_frame(FrameType::First, &b"abc"[..])
.await
.unwrap();
frame_writer
.write_frame(FrameType::Middle, &b"de"[..])
.await
.unwrap();
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.await
.unwrap();
frame_writer.flush().await.unwrap();
frame_writer.flush().unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
let read_frame_res = frame_reader.read_frame().await;
let read_frame_res = frame_reader.read_frame();
assert_eq!(read_frame_res.unwrap(), (FrameType::First, &b"abc"[..]));
assert_eq!(
frame_reader.read_frame().await.unwrap(),
frame_reader.read_frame().unwrap(),
(FrameType::Middle, &b"de"[..])
);
assert_eq!(
frame_reader.read_frame().await.unwrap(),
frame_reader.read_frame().unwrap(),
(FrameType::Last, &b"fgh"[..])
);
assert!(matches!(
frame_reader.read_frame().await.unwrap_err(),
frame_reader.read_frame().unwrap_err(),
ReadFrameError::NotAvailable
));
}

#[tokio::test]
async fn test_frame_corruption_in_payload() -> io::Result<()> {
#[test]
fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer
.write_frame(FrameType::First, &b"abc"[..])
.await?;
frame_writer.flush().await?;
frame_writer
.write_frame(FrameType::Middle, &b"de"[..])
.await?;
frame_writer.flush().await?;
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
frame_writer.flush()?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.flush()?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
let mut frame_reader = FrameReader::open(ArrayReader::from(&buf[..]));
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Err(ReadFrameError::Corruption)
));
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Ok((FrameType::Middle, b"de"))
));
Ok(())
}

async fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
for _ in 0..repeat {
frame_writer
.write_frame(FrameType::Full, &b""[..])
.await
.unwrap();
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
}
frame_writer.flush().await.unwrap();
frame_writer.flush().unwrap();
frame_writer.into_writer().into()
}

#[tokio::test]
async fn test_simple_multiple_blocks() -> io::Result<()> {
#[test]
fn test_simple_multiple_blocks() -> io::Result<()> {
let num_frames = 1 + BLOCK_NUM_BYTES / HEADER_LEN;
let buffer = repeat_empty_frame_util(num_frames).await;
let buffer = repeat_empty_frame_util(num_frames);
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
for _ in 0..num_frames {
let read_frame_res = frame_reader.read_frame().await;
let read_frame_res = frame_reader.read_frame();
assert!(matches!(read_frame_res, Ok((FrameType::Full, &[]))));
}
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Err(ReadFrameError::NotAvailable)
));
Ok(())
}

#[tokio::test]
async fn test_multiple_blocks_corruption_on_length() -> io::Result<()> {
#[test]
fn test_multiple_blocks_corruption_on_length() -> io::Result<()> {
// We end up with 4681 frames on the first block.
// 1 frame on the second block
let num_frames = 1 + crate::BLOCK_NUM_BYTES / HEADER_LEN;
let mut buffer = repeat_empty_frame_util(num_frames).await;
let mut buffer = repeat_empty_frame_util(num_frames);
buffer[2000 * HEADER_LEN + 5] = 255u8;
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
for _ in 0..2000 {
let read_frame_res = frame_reader.read_frame().await;
let read_frame_res = frame_reader.read_frame();
assert!(matches!(read_frame_res, Ok((FrameType::Full, &[]))));
}
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Err(ReadFrameError::Corruption)
));
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Ok((FrameType::Full, &[]))
));
assert!(matches!(
frame_reader.read_frame().await,
frame_reader.read_frame(),
Err(ReadFrameError::NotAvailable)
));
Ok(())
Expand Down
11 changes: 5 additions & 6 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// Writes a frame. The payload has to be lower than the
/// remaining space in the frame as defined
/// by `max_writable_frame_length`.
pub async fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> {
pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> {
let num_bytes_remaining_in_block = self.wrt.num_bytes_remaining_in_block();
if num_bytes_remaining_in_block < HEADER_LEN {
let zero_bytes = [0u8; HEADER_LEN];
self.wrt
.write(&zero_bytes[..num_bytes_remaining_in_block])
.await?;
.write(&zero_bytes[..num_bytes_remaining_in_block])?;
}
let record_len = HEADER_LEN + payload.len();
let (buffer_header, buffer_record) = self.buffer[..record_len].split_at_mut(HEADER_LEN);
buffer_record.copy_from_slice(payload);
Header::for_payload(frame_type, payload).serialize(buffer_header);
self.wrt.write(&self.buffer[..record_len]).await?;
self.wrt.write(&self.buffer[..record_len])?;
Ok(())
}

Expand All @@ -42,8 +41,8 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
/// When writing to a file, this performs a syscall and
/// the OS will be in charge of eventually writing the data
/// to disk, but this is not sufficient to ensure durability.
pub async fn flush(&mut self) -> io::Result<()> {
self.wrt.flush().await
pub fn flush(&mut self) -> io::Result<()> {
self.wrt.flush()
}

/// Returns the maximum amount of bytes that can be written.
Expand Down
Loading

0 comments on commit 2c593d3

Please sign in to comment.