Skip to content

Commit

Permalink
feat(puffin): apply range reader (#4928)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Zhenchi <[email protected]>

* feat(puffin): apply range reader

Signed-off-by: Zhenchi <[email protected]>

* refactor: read_vec reduce iteration

Signed-off-by: Zhenchi <[email protected]>

* refactor: simplify rangereader for vec<u8>

Signed-off-by: Zhenchi <[email protected]>

* test: add unit test

Signed-off-by: Zhenchi <[email protected]>

* fix: toml format

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Nov 12, 2024
1 parent 6248a6c commit cbf21e5
Show file tree
Hide file tree
Showing 21 changed files with 499 additions and 302 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
paste = "1.0"
pin-project.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true
zeroize = { version = "1.6", default-features = false, features = ["alloc"] }

[dev-dependencies]
common-test-util.workspace = true
toml.workspace = true
226 changes: 210 additions & 16 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::io;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use bytes::{BufMut, Bytes};
use futures::{AsyncReadExt, AsyncSeekExt};
use futures::AsyncRead;
use pin_project::pin_project;
use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
use tokio::sync::Mutex;

/// `Metadata` contains the metadata of a source.
pub struct Metadata {
Expand Down Expand Up @@ -61,7 +69,7 @@ pub trait RangeReader: Send + Unpin {
}

#[async_trait]
impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
impl<R: ?Sized + RangeReader> RangeReader for &mut R {
async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
Expand All @@ -80,26 +88,212 @@ impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
}
}

/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`.
pub struct RangeReaderAdapter<R>(pub R);
/// `AsyncReadAdapter` adapts a `RangeReader` to an `AsyncRead`.
#[pin_project]
pub struct AsyncReadAdapter<R> {
/// The inner `RangeReader`.
/// Use `Mutex` to get rid of the borrow checker issue.
inner: Arc<Mutex<R>>,

/// The current position from the view of the reader.
position: u64,

/// The buffer for the read bytes.
buffer: Vec<u8>,

/// The length of the content.
content_length: u64,

/// The future for reading the next bytes.
#[pin]
read_fut: Option<Pin<Box<dyn Future<Output = io::Result<Bytes>> + Send>>>,
}

impl<R: RangeReader + 'static> AsyncReadAdapter<R> {
pub async fn new(inner: R) -> io::Result<Self> {
let mut inner = inner;
let metadata = inner.metadata().await?;
Ok(AsyncReadAdapter {
inner: Arc::new(Mutex::new(inner)),
position: 0,
buffer: Vec::new(),
content_length: metadata.content_length,
read_fut: None,
})
}
}

/// The maximum size per read for the inner reader in `AsyncReadAdapter`.
const MAX_SIZE_PER_READ: usize = 8 * 1024 * 1024; // 8MB

impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut this = self.as_mut().project();

if *this.position >= *this.content_length {
return Poll::Ready(Ok(0));
}

if !this.buffer.is_empty() {
let to_read = this.buffer.len().min(buf.len());
buf[..to_read].copy_from_slice(&this.buffer[..to_read]);
this.buffer.drain(..to_read);
*this.position += to_read as u64;
return Poll::Ready(Ok(to_read));
}

if this.read_fut.is_none() {
let size = (*this.content_length - *this.position).min(MAX_SIZE_PER_READ as u64);
let range = *this.position..(*this.position + size);
let inner = this.inner.clone();
let fut = async move {
let mut inner = inner.lock().await;
inner.read(range).await
};

*this.read_fut = Some(Box::pin(fut));
}

match this
.read_fut
.as_mut()
.as_pin_mut()
.expect("checked above")
.poll(cx)
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(bytes)) => {
*this.read_fut = None;

if !bytes.is_empty() {
this.buffer.extend_from_slice(&bytes);
self.poll_read(cx, buf)
} else {
Poll::Ready(Ok(0))
}
}
Poll::Ready(Err(e)) => {
*this.read_fut = None;
Poll::Ready(Err(e))
}
}
}
}

/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader
for RangeReaderAdapter<R>
{
impl RangeReader for Vec<u8> {
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.0.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
Ok(Metadata {
content_length: self.len() as u64,
})
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
range.end = range.end.min(self.len() as u64);

let bytes = Bytes::copy_from_slice(&self[range.start as usize..range.end as usize]);
Ok(bytes)
}
}

/// `FileReader` is a `RangeReader` for reading a file.
pub struct FileReader {
content_length: u64,
position: u64,
file: tokio::fs::File,
}

impl FileReader {
/// Creates a new `FileReader` for the file at the given path.
pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
let file = tokio::fs::File::open(path).await?;
let metadata = file.metadata().await?;
Ok(FileReader {
content_length: metadata.len(),
position: 0,
file,
})
}
}

#[async_trait]
impl RangeReader for FileReader {
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,
})
}

async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
if range.start != self.position {
self.file.seek(io::SeekFrom::Start(range.start)).await?;
self.position = range.start;
}

range.end = range.end.min(self.content_length);
if range.end <= self.position {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Start of range is out of bounds",
));
}

let mut buf = vec![0; (range.end - range.start) as usize];
self.0.seek(io::SeekFrom::Start(range.start)).await?;
self.0.read_exact(&mut buf).await?;

self.file.read_exact(&mut buf).await?;
self.position = range.end;

Ok(Bytes::from(buf))
}
}

#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_named_temp_file;
use futures::io::AsyncReadExt as _;

use super::*;

#[tokio::test]
async fn test_async_read_adapter() {
let data = b"hello world";
let reader = Vec::from(data);
let mut adapter = AsyncReadAdapter::new(reader).await.unwrap();

let mut buf = Vec::new();
adapter.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, data);
}

#[tokio::test]
async fn test_async_read_adapter_large() {
let data = (0..20 * 1024 * 1024).map(|i| i as u8).collect::<Vec<u8>>();
let mut adapter = AsyncReadAdapter::new(data.clone()).await.unwrap();

let mut buf = Vec::new();
adapter.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, data);
}

#[tokio::test]
async fn test_file_reader() {
let file = create_named_temp_file();
let path = file.path();
let data = b"hello world";
tokio::fs::write(path, data).await.unwrap();

let mut reader = FileReader::new(path).await.unwrap();
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.content_length, data.len() as u64);

let bytes = reader.read(0..metadata.content_length).await.unwrap();
assert_eq!(&*bytes, data);

let bytes = reader.read(0..5).await.unwrap();
assert_eq!(&*bytes, &data[..5]);
}
}
11 changes: 3 additions & 8 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::*;
use common_base::range_read::RangeReaderAdapter;
use fst::MapBuilder;
use futures::io::Cursor;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;

Expand Down Expand Up @@ -163,8 +161,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_metadata() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 2);
Expand All @@ -191,8 +188,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_fst() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down Expand Up @@ -224,8 +220,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_bitmap() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down
15 changes: 5 additions & 10 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ impl<R: RangeReader> InvertedIndeFooterReader<R> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use prost::Message;

use super::*;
Expand All @@ -141,10 +139,9 @@ mod tests {
..Default::default()
};

let payload_buf = create_test_payload(meta);
let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
Expand All @@ -164,8 +161,7 @@ mod tests {
let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
Expand All @@ -180,10 +176,9 @@ mod tests {
..Default::default()
};

let payload_buf = create_test_payload(meta);
let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
Expand Down
Loading

0 comments on commit cbf21e5

Please sign in to comment.