Skip to content

Commit

Permalink
feat: add prefetch support to InvertedIndexFooterReader for reduced…
Browse files Browse the repository at this point in the history
… I/O time (#5146)

* feat: add prefetch support to `InvertedIndeFooterReader`

* chore: correct struct name

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored and evenyag committed Dec 20, 2024
1 parent dd4d0a8 commit f24b9d8
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 44 deletions.
16 changes: 15 additions & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ pub enum Error {
location: Location,
},

#[snafu(display("Blob size too small"))]
BlobSizeTooSmall {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid footer payload size"))]
InvalidFooterPayloadSize {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))]
UnexpectedFooterPayloadSize {
max_payload_size: u64,
Expand Down Expand Up @@ -220,7 +232,9 @@ impl ErrorExt for Error {
| KeysApplierUnexpectedPredicates { .. }
| CommonIo { .. }
| UnknownIntermediateCodecMagic { .. }
| FstCompile { .. } => StatusCode::Unexpected,
| FstCompile { .. }
| InvalidFooterPayloadSize { .. }
| BlobSizeTooSmall { .. } => StatusCode::Unexpected,

ParseRegex { .. }
| ParseDFA { .. }
Expand Down
6 changes: 4 additions & 2 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};

use super::footer::DEFAULT_PREFETCH_SIZE;
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
use crate::inverted_index::format::reader::footer::InvertedIndexFooterReader;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::MIN_BLOB_SIZE;

Expand Down Expand Up @@ -72,7 +73,8 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;

let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
let mut footer_reader = InvertedIndexFooterReader::new(&mut self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new)
}
}
Expand Down
135 changes: 94 additions & 41 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,88 @@ use prost::Message;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

/// InvertedIndeFooterReader is for reading the footer section of the blob.
pub struct InvertedIndeFooterReader<R> {
pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB

/// InvertedIndexFooterReader is for reading the footer section of the blob.
pub struct InvertedIndexFooterReader<R> {
source: R,
blob_size: u64,
prefetch_size: Option<u64>,
}

impl<R> InvertedIndeFooterReader<R> {
impl<R> InvertedIndexFooterReader<R> {
pub fn new(source: R, blob_size: u64) -> Self {
Self { source, blob_size }
Self {
source,
blob_size,
prefetch_size: None,
}
}

/// Set the prefetch size for the footer reader.
pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
self
}

pub fn prefetch_size(&self) -> u64 {
self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
}
}

impl<R: RangeReader> InvertedIndeFooterReader<R> {
impl<R: RangeReader> InvertedIndexFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let payload_size = self.read_payload_size().await?;
let metas = self.read_payload(payload_size).await?;
Ok(metas)
}
ensure!(
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
BlobSizeTooSmallSnafu
);

async fn read_payload_size(&mut self) -> Result<u64> {
let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
let end = self.blob_size;
let start = end - FOOTER_PAYLOAD_SIZE_SIZE;
self.source
.read_into(start..end, &mut &mut size_buf[..])
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
let suffix = self
.source
.read(footer_start..self.blob_size)
.await
.context(CommonIoSnafu)?;
let suffix_len = suffix.len();
let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
self.validate_payload_size(length)?;

let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;

// Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size {
let metadata_start = self.blob_size - length - footer_size;
let meta = self
.source
.read(metadata_start..self.blob_size - footer_size)
.await
.context(CommonIoSnafu)?;
self.parse_payload(&meta, length)
} else {
let metadata_start = self.blob_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(meta, length)
}
}

let payload_size = u32::from_le_bytes(size_buf) as u64;
self.validate_payload_size(payload_size)?;
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
let suffix_len = suffix.len();
ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
let mut bytes = [0; 4];
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);

Ok(payload_size)
Ok(bytes)
}

async fn read_payload(&mut self, payload_size: u64) -> Result<InvertedIndexMetas> {
let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
let start = end - payload_size;
let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?;

let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?;
fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
self.validate_metas(&metas, payload_size)?;

Ok(metas)
}

Expand Down Expand Up @@ -113,9 +148,12 @@ impl<R: RangeReader> InvertedIndeFooterReader<R> {

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use prost::Message;

use super::*;
use crate::inverted_index::error::Error;

fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
let mut metas = InvertedIndexMetas {
Expand All @@ -141,14 +179,18 @@ mod tests {

let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
let metas = reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
}
}

#[tokio::test]
Expand All @@ -157,14 +199,20 @@ mod tests {
name: "test".to_string(),
..Default::default()
};

let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

let result = reader.metadata().await;
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
}
}

#[tokio::test]
Expand All @@ -178,10 +226,15 @@ mod tests {

let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
assert!(payload_result.is_err());
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

let result = reader.metadata().await;
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
}
}
}
1 change: 1 addition & 0 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
#![feature(assert_matches)]

pub mod fulltext_index;
pub mod inverted_index;

0 comments on commit f24b9d8

Please sign in to comment.