From 8575331599bdc2989810fd7b62b81ef40fb09044 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Sat, 14 Dec 2024 01:14:38 +0800 Subject: [PATCH] feat: introduce Buffer for non-continuous bytes --- Cargo.lock | 1 + src/common/base/Cargo.toml | 1 + src/common/base/src/buffer.rs | 812 ++++++++++++++++++ src/common/base/src/lib.rs | 1 + src/index/src/inverted_index/format/reader.rs | 3 +- .../src/inverted_index/format/reader/blob.rs | 6 +- src/mito2/src/cache/index.rs | 90 +- 7 files changed, 853 insertions(+), 61 deletions(-) create mode 100644 src/common/base/src/buffer.rs diff --git a/Cargo.lock b/Cargo.lock index b60615c8e54c..a5c9386dc735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,6 +1918,7 @@ dependencies = [ "futures", "paste", "pin-project", + "rand", "serde", "snafu 0.8.5", "tokio", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 465599974dae..2d35ad5d31ad 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -17,6 +17,7 @@ common-macro.workspace = true futures.workspace = true paste = "1.0" pin-project.workspace = true +rand.workspace = true serde = { version = "1.0", features = ["derive"] } snafu.workspace = true tokio.workspace = true diff --git a/src/common/base/src/buffer.rs b/src/common/base/src/buffer.rs new file mode 100644 index 000000000000..050c1b0e6c93 --- /dev/null +++ b/src/common/base/src/buffer.rs @@ -0,0 +1,812 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copid from https://github.com/apache/opendal/blob/eaf6c1846f725b2bd31d049fab267e3799a3ea27/core/src/types/buffer.rs + +use std::collections::VecDeque; +use std::convert::Infallible; +use std::fmt::{Debug, Formatter}; +use std::io::IoSlice; +use std::mem; +use std::ops::{Bound, RangeBounds}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures::Stream; + +/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`. +/// +/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example, +/// http based storage like s3 could generate non-contiguous bytes by stream. +/// +/// ## Features +/// +/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly. +/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation. +/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write. +/// +/// ## Examples +/// +/// ### As `Buf` +/// +/// `Buffer` implements `Buf` trait: +/// +/// ```rust +/// use bytes::Buf; +/// use opendal::Buffer; +/// use serde_json; +/// +/// fn test(mut buf: Buffer) -> Vec { +/// serde_json::from_reader(buf.reader()).unwrap() +/// } +/// ``` +/// +/// ### As Bytes `Iterator` +/// +/// `Buffer` implements `Iterator` trait: +/// +/// ```rust +/// use bytes::Bytes; +/// use opendal::Buffer; +/// +/// fn test(mut buf: Buffer) -> Vec { +/// buf.into_iter().collect() +/// } +/// ``` +/// +/// ### As Bytes `Stream` +/// +/// `Buffer` implements `Stream>` trait: +/// +/// ```rust +/// use bytes::Bytes; +/// use futures::TryStreamExt; +/// use opendal::Buffer; +/// +/// async fn test(mut buf: Buffer) -> Vec { +/// buf.into_iter().try_collect().await.unwrap() +/// } +/// ``` +/// +/// ### As one contiguous Bytes +/// +/// `Buffer` can make contiguous by transform into `Bytes` or `Vec`. +/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't +/// reuse the same memory region anymore. +/// +/// ```rust +/// use bytes::Bytes; +/// use opendal::Buffer; +/// +/// fn test_to_vec(buf: Buffer) -> Vec { +/// buf.to_vec() +/// } +/// +/// fn test_to_bytes(buf: Buffer) -> Bytes { +/// buf.to_bytes() +/// } +/// ``` +#[derive(Clone)] +pub struct Buffer(Inner); + +#[derive(Clone)] +enum Inner { + Contiguous(Bytes), + NonContiguous { + parts: Arc<[Bytes]>, + size: usize, + idx: usize, + offset: usize, + }, +} + +impl Debug for Buffer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut b = f.debug_struct("Buffer"); + + match &self.0 { + Inner::Contiguous(bs) => { + b.field("type", &"contiguous"); + b.field("size", &bs.len()); + } + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + b.field("type", &"non_contiguous"); + b.field("parts", &parts); + b.field("size", &size); + b.field("idx", &idx); + b.field("offset", &offset); + } + } + b.finish_non_exhaustive() + } +} + +impl Default for Buffer { + fn default() -> Self { + Self::new() + } +} +impl Buffer { + /// Create a new empty buffer. + /// + /// This operation is const and no allocation will be performed. + #[inline] + pub const fn new() -> Self { + Self(Inner::Contiguous(Bytes::new())) + } + + /// Get the length of the buffer. + #[inline] + pub fn len(&self) -> usize { + match &self.0 { + Inner::Contiguous(b) => b.remaining(), + Inner::NonContiguous { size, .. } => *size, + } + } + + /// Check if buffer is empty. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Number of [`Bytes`] in [`Buffer`]. + /// + /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes + /// available for use. + pub fn count(&self) -> usize { + match &self.0 { + Inner::Contiguous(_) => 1, + Inner::NonContiguous { + parts, + idx, + size, + offset, + } => { + parts + .iter() + .skip(*idx) + .fold((0, size + offset), |(count, size), bytes| { + if size == 0 { + (count, 0) + } else { + (count + 1, size.saturating_sub(bytes.len())) + } + }) + .0 + } + } + } + + /// Get current [`Bytes`]. + pub fn current(&self) -> Bytes { + match &self.0 { + Inner::Contiguous(inner) => inner.clone(), + Inner::NonContiguous { + parts, + idx, + offset, + size, + } => { + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + chunk.slice(*offset..*offset + n) + } + } + } + + /// Shortens the buffer, keeping the first `len` bytes and dropping the rest. + /// + /// If `len` is greater than the buffer’s current length, this has no effect. + #[inline] + pub fn truncate(&mut self, len: usize) { + match &mut self.0 { + Inner::Contiguous(bs) => bs.truncate(len), + Inner::NonContiguous { size, .. } => { + *size = (*size).min(len); + } + } + } + + /// Returns a slice of self for the provided range. + /// + /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice. + /// + /// This operation is O(1). + pub fn slice(&self, range: impl RangeBounds) -> Self { + let len = self.len(); + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => len, + }; + + assert!( + begin <= end, + "range start must not be greater than end: {:?} <= {:?}", + begin, + end, + ); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + if end == begin { + return Buffer::new(); + } + + let mut ret = self.clone(); + ret.truncate(end); + ret.advance(begin); + ret + } + + /// Combine all bytes together into one single [`Bytes`]. + /// + /// This operation is zero copy if the underlying bytes are contiguous. + /// Otherwise, it will copy all bytes into one single [`Bytes`]. + /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. + #[inline] + pub fn to_bytes(&self) -> Bytes { + match &self.0 { + Inner::Contiguous(bytes) => bytes.clone(), + Inner::NonContiguous { + parts, + size, + idx: _, + offset, + } => { + if parts.len() == 1 { + parts[0].slice(*offset..(*offset + *size)) + } else { + let mut ret = BytesMut::with_capacity(self.len()); + ret.put(self.clone()); + ret.freeze() + } + } + } + } + + /// Combine all bytes together into one single [`Vec`]. + /// + /// This operation is not zero copy, it will copy all bytes into one single [`Vec`]. + /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. + #[inline] + pub fn to_vec(&self) -> Vec { + let mut ret = Vec::with_capacity(self.len()); + ret.put(self.clone()); + ret + } + + /// Convert buffer into a slice of [`IoSlice`] for vectored write. + #[inline] + pub fn to_io_slice(&self) -> Vec> { + match &self.0 { + Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())], + Inner::NonContiguous { + parts, idx, offset, .. + } => { + let mut ret = Vec::with_capacity(parts.len() - *idx); + let mut new_offset = *offset; + for part in parts.iter().skip(*idx) { + ret.push(IoSlice::new(&part[new_offset..])); + new_offset = 0; + } + ret + } + } + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Vec) -> Self { + Self(Inner::Contiguous(bs.into())) + } +} + +impl From for Buffer { + #[inline] + fn from(bs: Bytes) -> Self { + Self(Inner::Contiguous(bs)) + } +} + +impl From for Buffer { + #[inline] + fn from(s: String) -> Self { + Self(Inner::Contiguous(Bytes::from(s))) + } +} + +impl From<&'static [u8]> for Buffer { + #[inline] + fn from(s: &'static [u8]) -> Self { + Self(Inner::Contiguous(Bytes::from_static(s))) + } +} + +impl From<&'static str> for Buffer { + #[inline] + fn from(s: &'static str) -> Self { + Self(Inner::Contiguous(Bytes::from_static(s.as_bytes()))) + } +} + +impl FromIterator for Buffer { + #[inline] + fn from_iter>(iter: T) -> Self { + Self(Inner::Contiguous(Bytes::from_iter(iter))) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: VecDeque) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: Vec::from(bs).into(), + size, + idx: 0, + offset: 0, + }) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Vec) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: bs.into(), + size, + idx: 0, + offset: 0, + }) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Arc<[Bytes]>) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: bs, + size, + idx: 0, + offset: 0, + }) + } +} + +impl FromIterator for Buffer { + #[inline] + fn from_iter>(iter: T) -> Self { + let mut size = 0; + let bs = iter.into_iter().inspect(|v| size += v.len()); + // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead + // of iterator -> `Vec` -> `Arc<[Bytes]>`. + let parts = Arc::from_iter(bs); + Self(Inner::NonContiguous { + parts, + size, + idx: 0, + offset: 0, + }) + } +} + +impl Buf for Buffer { + #[inline] + fn remaining(&self) -> usize { + self.len() + } + + #[inline] + fn chunk(&self) -> &[u8] { + match &self.0 { + Inner::Contiguous(b) => b.chunk(), + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + if *size == 0 { + return &[]; + } + + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + &parts[*idx][*offset..*offset + n] + } + } + } + + #[inline] + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + match &self.0 { + Inner::Contiguous(b) => { + if dst.is_empty() { + return 0; + } + + dst[0] = IoSlice::new(b.chunk()); + 1 + } + Inner::NonContiguous { + parts, idx, offset, .. + } => { + if dst.is_empty() { + return 0; + } + + let mut new_offset = *offset; + parts + .iter() + .skip(*idx) + .zip(dst.iter_mut()) + .map(|(part, dst)| { + *dst = IoSlice::new(&part[new_offset..]); + new_offset = 0; + }) + .count() + } + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match &mut self.0 { + Inner::Contiguous(b) => b.advance(cnt), + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + assert!( + cnt <= *size, + "cannot advance past {cnt} bytes, only {size} bytes left" + ); + + let mut new_idx = *idx; + let mut new_offset = *offset; + let mut remaining_cnt = cnt; + while remaining_cnt > 0 { + let part_len = parts[new_idx].len(); + let remaining_in_part = part_len - new_offset; + + if remaining_cnt < remaining_in_part { + new_offset += remaining_cnt; + break; + } + + remaining_cnt -= remaining_in_part; + new_idx += 1; + new_offset = 0; + } + + *idx = new_idx; + *offset = new_offset; + *size -= cnt; + } + } + } +} + +impl Iterator for Buffer { + type Item = Bytes; + + fn next(&mut self) -> Option { + match &mut self.0 { + Inner::Contiguous(bs) => { + if bs.is_empty() { + None + } else { + Some(mem::take(bs)) + } + } + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + if *size == 0 { + return None; + } + + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + let buf = chunk.slice(*offset..*offset + n); + *size -= n; + *offset += n; + + if *offset == chunk.len() { + *idx += 1; + *offset = 0; + } + + Some(buf) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + match &self.0 { + Inner::Contiguous(bs) => { + if bs.is_empty() { + (0, Some(0)) + } else { + (1, Some(1)) + } + } + Inner::NonContiguous { parts, idx, .. } => { + let remaining = parts.len().saturating_sub(*idx); + (remaining, Some(remaining)) + } + } + } +} + +impl Stream for Buffer { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().next().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + Iterator::size_hint(self) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + + const EMPTY_SLICE: &[u8] = &[]; + + #[test] + fn test_contiguous_buffer() { + let mut buf = Buffer::new(); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + assert_eq!(buf.next(), None); + } + + #[test] + fn test_empty_non_contiguous_buffer() { + let mut buf = Buffer::from(vec![Bytes::new()]); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + assert_eq!(buf.next(), None); + } + + #[test] + fn test_non_contiguous_buffer_with_empty_chunks() { + let mut buf = Buffer::from(vec![Bytes::from("a")]); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"a"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_non_contiguous_buffer_with_next() { + let mut buf = Buffer::from(vec![Bytes::from("a")]); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"a"); + + let bs = buf.next(); + + assert_eq!(bs, Some(Bytes::from("a"))); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_buffer_advance() { + let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 2); + assert_eq!(buf.chunk(), b"b"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"c"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + + buf.advance(0); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_buffer_truncate() { + let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(100); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(2); + + assert_eq!(buf.remaining(), 2); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(0); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + /// This setup will return + /// + /// - A buffer + /// - Total size of this buffer. + /// - Total content of this buffer. + fn setup_buffer() -> (Buffer, usize, Bytes) { + let mut rng = rand::thread_rng(); + + let bs = (0..100) + .map(|_| { + let len = rng.gen_range(1..100); + let mut buf = vec![0; len]; + rng.fill(&mut buf[..]); + Bytes::from(buf) + }) + .collect::>(); + + let total_size = bs.iter().map(|b| b.len()).sum::(); + let total_content = bs.iter().flatten().copied().collect::(); + let buf = Buffer::from(bs); + + (buf, total_size, total_content) + } + + #[test] + fn fuzz_buffer_advance() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + // Loop at most 10000 times. + let mut times = 10000; + while !buf.is_empty() && times > 0 { + times -= 1; + + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } + + #[test] + fn fuzz_buffer_iter() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + while buf.is_empty() { + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + // Before next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + + if let Some(bs) = buf.next() { + assert_eq!(bs, total_content.slice(cur..cur + bs.len())); + cur += bs.len(); + } + + // After next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } + + #[test] + fn fuzz_buffer_truncate() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + while buf.is_empty() { + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + // Before truncate + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + + let truncate_size = rng.gen_range(0..total_size - cur); + buf.truncate(truncate_size); + + // After truncate + assert_eq!(buf.remaining(), truncate_size); + assert_eq!( + buf.to_bytes(), + total_content.slice(cur..cur + truncate_size) + ); + + // Try next after truncate + if let Some(bs) = buf.next() { + assert_eq!(bs, total_content.slice(cur..cur + bs.len())); + cur += bs.len(); + } + + // After next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } +} diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 62a801d9462d..69da24d2b61b 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod bit_vec; +pub mod buffer; pub mod bytes; pub mod plugins; pub mod range_read; diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 904681d5f40a..21e5487d1e42 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::ResultExt; @@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send { async fn range_read(&mut self, offset: u64, size: u32) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result>; /// Retrieves metadata of all inverted indices stored within the blob. async fn metadata(&mut self) -> Result>; diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 371655d535f3..fcaa63773d93 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; @@ -60,9 +61,8 @@ impl InvertedIndexReader for InvertedIndexBlobReader { Ok(buf.into()) } - async fn read_vec(&mut self, ranges: &[Range]) -> Result>> { - let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?; - Ok(bufs.into_iter().map(|buf| buf.into()).collect()) + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + self.source.read_vec(ranges).await.context(CommonIoSnafu) } async fn metadata(&mut self) -> Result> { diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index e25fb22dcbf5..c6625be4f295 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; +use bytes::Bytes; +use common_base::buffer::Buffer; use common_base::BitVec; use index::inverted_index::error::DecodeFstSnafu; use index::inverted_index::format::reader::InvertedIndexReader; @@ -70,7 +72,7 @@ where } // TODO: Can be replaced by an uncontinuous structure like opendal::Buffer. let mut data = Vec::with_capacity(keys.len()); - data.resize(keys.len(), Arc::new(Vec::new())); + data.resize(keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; let mut cache_miss_idx = vec![]; let last_index = keys.len() - 1; @@ -97,24 +99,20 @@ where if !cache_miss_range.is_empty() { let pages = self.inner.read_vec(&cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { - let page = Arc::new(page); + let page = page; let key = keys[i].clone(); data[i] = page.clone(); self.cache.put_index(key, page.clone()); } } - let mut result = Vec::with_capacity(size as usize); - data.iter().enumerate().for_each(|(i, page)| { - let range = if i == 0 { - IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size) - } else if i == last_index { - IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size) - } else { - 0..self.cache.page_size as usize - }; - result.extend_from_slice(&page[range]); - }); - Ok(result) + let buffer = Buffer::from_iter(data.into_iter()); + Ok(buffer + .slice(IndexDataPageKey::calculate_range( + offset, + size, + self.cache.page_size, + )) + .to_vec()) } } @@ -131,7 +129,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead async fn read_vec( &mut self, ranges: &[Range], - ) -> index::inverted_index::error::Result>> { + ) -> index::inverted_index::error::Result> { self.inner.read_vec(ranges).await } @@ -190,31 +188,14 @@ impl IndexDataPageKey { (end_page + 1 - start_page) as u32 } - /// Computes the byte range in the first page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096. - fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range { + /// Computes the byte range based on the offset and size. + /// For example, if offset is 5000 and size is 5000 with PAGE_SIZE of 4096, the range is 904..5904. + fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range { let start = (offset % page_size) as usize; - let end = if size > page_size as u32 - start as u32 { - page_size as usize - } else { - start + size as usize - }; + let end = start + size as usize; start..end } - /// Computes the byte range in the last page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904. - fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range { - let offset = offset as usize; - let size = size as usize; - let page_size = page_size as usize; - if (offset + size) % page_size == 0 { - 0..page_size - } else { - 0..((offset + size) % page_size) - } - } - /// Generates a vector of IndexKey instances for the pages that a given offset and size span. fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec { let start_page = Self::calculate_page_id(offset, page_size); @@ -234,7 +215,7 @@ pub struct InvertedIndexCache { /// Cache for inverted index metadata index_metadata: moka::sync::Cache>, /// Cache for inverted index content. - index: moka::sync::Cache>>, + index: moka::sync::Cache, // Page size for index content. page_size: u64, } @@ -284,11 +265,11 @@ impl InvertedIndexCache { self.index_metadata.insert(key, metadata) } - pub fn get_index(&self, key: &IndexDataPageKey) -> Option>> { + pub fn get_index(&self, key: &IndexDataPageKey) -> Option { self.index.get(key) } - pub fn put_index(&self, key: IndexDataPageKey, value: Arc>) { + pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) { CACHE_BYTES .with_label_values(&[INDEX_CONTENT_TYPE]) .add(index_content_weight(&key, &value).into()); @@ -302,7 +283,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc) -> u } /// Calculates weight for index content. -fn index_content_weight(k: &IndexDataPageKey, v: &Arc>) -> u32 { +fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 { (k.file_id.as_bytes().len() + v.len()) as u32 } @@ -331,6 +312,9 @@ mod test { use crate::sst::index::store::InstrumentedStore; use crate::test_util::TestEnv; + // Repeat times for following little fuzz tests. + const FUZZ_REPEAT_TIMES: usize = 100; + // Fuzz test for index data page key #[test] fn fuzz_index_calculation() { @@ -340,7 +324,7 @@ mod test { rng.fill_bytes(&mut data); let file_id = FileId::random(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..data.len() as u64); let size = rng.gen_range(0..data.len() as u32 - offset as u32); let page_size: usize = rng.gen_range(1..1024); @@ -349,32 +333,24 @@ mod test { IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64); let page_num = indexes.len(); let mut read = Vec::with_capacity(size as usize); - let last_index = indexes.len() - 1; - for (i, key) in indexes.into_iter().enumerate() { + for key in indexes.into_iter() { let start = key.page_id as usize * page_size; let page = if start + page_size < data.len() { &data[start..start + page_size] } else { &data[start..] }; - let range = if i == 0 { - // first page range - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64) - } else if i == last_index { - // last page range. when the first page is the last page, the range is not used. - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64) - } else { - 0..page_size - }; - read.extend_from_slice(&page[range]); + read.extend_from_slice(page); } let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = + read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec(); if read != data.get(expected_range).unwrap() { panic!( - "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}", + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", offset, size, page_size, read.len(), size as usize, - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64), - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num + IndexDataPageKey::calculate_range(offset, size, page_size as u64), + page_num ); } } @@ -519,7 +495,7 @@ mod test { // fuzz test let mut rng = rand::thread_rng(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..file_size); let size = rng.gen_range(0..file_size as u32 - offset as u32); let expected = cached_reader.range_read(offset, size).await.unwrap();