From 8575331599bdc2989810fd7b62b81ef40fb09044 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Sat, 14 Dec 2024 01:14:38 +0800 Subject: [PATCH 1/4] feat: introduce Buffer for non-continuous bytes --- Cargo.lock | 1 + src/common/base/Cargo.toml | 1 + src/common/base/src/buffer.rs | 812 ++++++++++++++++++ src/common/base/src/lib.rs | 1 + src/index/src/inverted_index/format/reader.rs | 3 +- .../src/inverted_index/format/reader/blob.rs | 6 +- src/mito2/src/cache/index.rs | 90 +- 7 files changed, 853 insertions(+), 61 deletions(-) create mode 100644 src/common/base/src/buffer.rs diff --git a/Cargo.lock b/Cargo.lock index b60615c8e54c..a5c9386dc735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,6 +1918,7 @@ dependencies = [ "futures", "paste", "pin-project", + "rand", "serde", "snafu 0.8.5", "tokio", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 465599974dae..2d35ad5d31ad 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -17,6 +17,7 @@ common-macro.workspace = true futures.workspace = true paste = "1.0" pin-project.workspace = true +rand.workspace = true serde = { version = "1.0", features = ["derive"] } snafu.workspace = true tokio.workspace = true diff --git a/src/common/base/src/buffer.rs b/src/common/base/src/buffer.rs new file mode 100644 index 000000000000..050c1b0e6c93 --- /dev/null +++ b/src/common/base/src/buffer.rs @@ -0,0 +1,812 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copid from https://github.com/apache/opendal/blob/eaf6c1846f725b2bd31d049fab267e3799a3ea27/core/src/types/buffer.rs + +use std::collections::VecDeque; +use std::convert::Infallible; +use std::fmt::{Debug, Formatter}; +use std::io::IoSlice; +use std::mem; +use std::ops::{Bound, RangeBounds}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures::Stream; + +/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`. +/// +/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example, +/// http based storage like s3 could generate non-contiguous bytes by stream. +/// +/// ## Features +/// +/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly. +/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation. +/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write. +/// +/// ## Examples +/// +/// ### As `Buf` +/// +/// `Buffer` implements `Buf` trait: +/// +/// ```rust +/// use bytes::Buf; +/// use opendal::Buffer; +/// use serde_json; +/// +/// fn test(mut buf: Buffer) -> Vec { +/// serde_json::from_reader(buf.reader()).unwrap() +/// } +/// ``` +/// +/// ### As Bytes `Iterator` +/// +/// `Buffer` implements `Iterator` trait: +/// +/// ```rust +/// use bytes::Bytes; +/// use opendal::Buffer; +/// +/// fn test(mut buf: Buffer) -> Vec { +/// buf.into_iter().collect() +/// } +/// ``` +/// +/// ### As Bytes `Stream` +/// +/// `Buffer` implements `Stream>` trait: +/// +/// ```rust +/// use bytes::Bytes; +/// use futures::TryStreamExt; +/// use opendal::Buffer; +/// +/// async fn test(mut buf: Buffer) -> Vec { +/// buf.into_iter().try_collect().await.unwrap() +/// } +/// ``` +/// +/// ### As one contiguous Bytes +/// +/// `Buffer` can make contiguous by transform into `Bytes` or `Vec`. +/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't +/// reuse the same memory region anymore. +/// +/// ```rust +/// use bytes::Bytes; +/// use opendal::Buffer; +/// +/// fn test_to_vec(buf: Buffer) -> Vec { +/// buf.to_vec() +/// } +/// +/// fn test_to_bytes(buf: Buffer) -> Bytes { +/// buf.to_bytes() +/// } +/// ``` +#[derive(Clone)] +pub struct Buffer(Inner); + +#[derive(Clone)] +enum Inner { + Contiguous(Bytes), + NonContiguous { + parts: Arc<[Bytes]>, + size: usize, + idx: usize, + offset: usize, + }, +} + +impl Debug for Buffer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut b = f.debug_struct("Buffer"); + + match &self.0 { + Inner::Contiguous(bs) => { + b.field("type", &"contiguous"); + b.field("size", &bs.len()); + } + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + b.field("type", &"non_contiguous"); + b.field("parts", &parts); + b.field("size", &size); + b.field("idx", &idx); + b.field("offset", &offset); + } + } + b.finish_non_exhaustive() + } +} + +impl Default for Buffer { + fn default() -> Self { + Self::new() + } +} +impl Buffer { + /// Create a new empty buffer. + /// + /// This operation is const and no allocation will be performed. + #[inline] + pub const fn new() -> Self { + Self(Inner::Contiguous(Bytes::new())) + } + + /// Get the length of the buffer. + #[inline] + pub fn len(&self) -> usize { + match &self.0 { + Inner::Contiguous(b) => b.remaining(), + Inner::NonContiguous { size, .. } => *size, + } + } + + /// Check if buffer is empty. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Number of [`Bytes`] in [`Buffer`]. + /// + /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes + /// available for use. + pub fn count(&self) -> usize { + match &self.0 { + Inner::Contiguous(_) => 1, + Inner::NonContiguous { + parts, + idx, + size, + offset, + } => { + parts + .iter() + .skip(*idx) + .fold((0, size + offset), |(count, size), bytes| { + if size == 0 { + (count, 0) + } else { + (count + 1, size.saturating_sub(bytes.len())) + } + }) + .0 + } + } + } + + /// Get current [`Bytes`]. + pub fn current(&self) -> Bytes { + match &self.0 { + Inner::Contiguous(inner) => inner.clone(), + Inner::NonContiguous { + parts, + idx, + offset, + size, + } => { + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + chunk.slice(*offset..*offset + n) + } + } + } + + /// Shortens the buffer, keeping the first `len` bytes and dropping the rest. + /// + /// If `len` is greater than the buffer’s current length, this has no effect. + #[inline] + pub fn truncate(&mut self, len: usize) { + match &mut self.0 { + Inner::Contiguous(bs) => bs.truncate(len), + Inner::NonContiguous { size, .. } => { + *size = (*size).min(len); + } + } + } + + /// Returns a slice of self for the provided range. + /// + /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice. + /// + /// This operation is O(1). + pub fn slice(&self, range: impl RangeBounds) -> Self { + let len = self.len(); + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => len, + }; + + assert!( + begin <= end, + "range start must not be greater than end: {:?} <= {:?}", + begin, + end, + ); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + if end == begin { + return Buffer::new(); + } + + let mut ret = self.clone(); + ret.truncate(end); + ret.advance(begin); + ret + } + + /// Combine all bytes together into one single [`Bytes`]. + /// + /// This operation is zero copy if the underlying bytes are contiguous. + /// Otherwise, it will copy all bytes into one single [`Bytes`]. + /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. + #[inline] + pub fn to_bytes(&self) -> Bytes { + match &self.0 { + Inner::Contiguous(bytes) => bytes.clone(), + Inner::NonContiguous { + parts, + size, + idx: _, + offset, + } => { + if parts.len() == 1 { + parts[0].slice(*offset..(*offset + *size)) + } else { + let mut ret = BytesMut::with_capacity(self.len()); + ret.put(self.clone()); + ret.freeze() + } + } + } + } + + /// Combine all bytes together into one single [`Vec`]. + /// + /// This operation is not zero copy, it will copy all bytes into one single [`Vec`]. + /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. + #[inline] + pub fn to_vec(&self) -> Vec { + let mut ret = Vec::with_capacity(self.len()); + ret.put(self.clone()); + ret + } + + /// Convert buffer into a slice of [`IoSlice`] for vectored write. + #[inline] + pub fn to_io_slice(&self) -> Vec> { + match &self.0 { + Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())], + Inner::NonContiguous { + parts, idx, offset, .. + } => { + let mut ret = Vec::with_capacity(parts.len() - *idx); + let mut new_offset = *offset; + for part in parts.iter().skip(*idx) { + ret.push(IoSlice::new(&part[new_offset..])); + new_offset = 0; + } + ret + } + } + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Vec) -> Self { + Self(Inner::Contiguous(bs.into())) + } +} + +impl From for Buffer { + #[inline] + fn from(bs: Bytes) -> Self { + Self(Inner::Contiguous(bs)) + } +} + +impl From for Buffer { + #[inline] + fn from(s: String) -> Self { + Self(Inner::Contiguous(Bytes::from(s))) + } +} + +impl From<&'static [u8]> for Buffer { + #[inline] + fn from(s: &'static [u8]) -> Self { + Self(Inner::Contiguous(Bytes::from_static(s))) + } +} + +impl From<&'static str> for Buffer { + #[inline] + fn from(s: &'static str) -> Self { + Self(Inner::Contiguous(Bytes::from_static(s.as_bytes()))) + } +} + +impl FromIterator for Buffer { + #[inline] + fn from_iter>(iter: T) -> Self { + Self(Inner::Contiguous(Bytes::from_iter(iter))) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: VecDeque) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: Vec::from(bs).into(), + size, + idx: 0, + offset: 0, + }) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Vec) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: bs.into(), + size, + idx: 0, + offset: 0, + }) + } +} + +impl From> for Buffer { + #[inline] + fn from(bs: Arc<[Bytes]>) -> Self { + let size = bs.iter().map(Bytes::len).sum(); + Self(Inner::NonContiguous { + parts: bs, + size, + idx: 0, + offset: 0, + }) + } +} + +impl FromIterator for Buffer { + #[inline] + fn from_iter>(iter: T) -> Self { + let mut size = 0; + let bs = iter.into_iter().inspect(|v| size += v.len()); + // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead + // of iterator -> `Vec` -> `Arc<[Bytes]>`. + let parts = Arc::from_iter(bs); + Self(Inner::NonContiguous { + parts, + size, + idx: 0, + offset: 0, + }) + } +} + +impl Buf for Buffer { + #[inline] + fn remaining(&self) -> usize { + self.len() + } + + #[inline] + fn chunk(&self) -> &[u8] { + match &self.0 { + Inner::Contiguous(b) => b.chunk(), + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + if *size == 0 { + return &[]; + } + + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + &parts[*idx][*offset..*offset + n] + } + } + } + + #[inline] + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + match &self.0 { + Inner::Contiguous(b) => { + if dst.is_empty() { + return 0; + } + + dst[0] = IoSlice::new(b.chunk()); + 1 + } + Inner::NonContiguous { + parts, idx, offset, .. + } => { + if dst.is_empty() { + return 0; + } + + let mut new_offset = *offset; + parts + .iter() + .skip(*idx) + .zip(dst.iter_mut()) + .map(|(part, dst)| { + *dst = IoSlice::new(&part[new_offset..]); + new_offset = 0; + }) + .count() + } + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match &mut self.0 { + Inner::Contiguous(b) => b.advance(cnt), + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + assert!( + cnt <= *size, + "cannot advance past {cnt} bytes, only {size} bytes left" + ); + + let mut new_idx = *idx; + let mut new_offset = *offset; + let mut remaining_cnt = cnt; + while remaining_cnt > 0 { + let part_len = parts[new_idx].len(); + let remaining_in_part = part_len - new_offset; + + if remaining_cnt < remaining_in_part { + new_offset += remaining_cnt; + break; + } + + remaining_cnt -= remaining_in_part; + new_idx += 1; + new_offset = 0; + } + + *idx = new_idx; + *offset = new_offset; + *size -= cnt; + } + } + } +} + +impl Iterator for Buffer { + type Item = Bytes; + + fn next(&mut self) -> Option { + match &mut self.0 { + Inner::Contiguous(bs) => { + if bs.is_empty() { + None + } else { + Some(mem::take(bs)) + } + } + Inner::NonContiguous { + parts, + size, + idx, + offset, + } => { + if *size == 0 { + return None; + } + + let chunk = &parts[*idx]; + let n = (chunk.len() - *offset).min(*size); + let buf = chunk.slice(*offset..*offset + n); + *size -= n; + *offset += n; + + if *offset == chunk.len() { + *idx += 1; + *offset = 0; + } + + Some(buf) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + match &self.0 { + Inner::Contiguous(bs) => { + if bs.is_empty() { + (0, Some(0)) + } else { + (1, Some(1)) + } + } + Inner::NonContiguous { parts, idx, .. } => { + let remaining = parts.len().saturating_sub(*idx); + (remaining, Some(remaining)) + } + } + } +} + +impl Stream for Buffer { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().next().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + Iterator::size_hint(self) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + + const EMPTY_SLICE: &[u8] = &[]; + + #[test] + fn test_contiguous_buffer() { + let mut buf = Buffer::new(); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + assert_eq!(buf.next(), None); + } + + #[test] + fn test_empty_non_contiguous_buffer() { + let mut buf = Buffer::from(vec![Bytes::new()]); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + assert_eq!(buf.next(), None); + } + + #[test] + fn test_non_contiguous_buffer_with_empty_chunks() { + let mut buf = Buffer::from(vec![Bytes::from("a")]); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"a"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_non_contiguous_buffer_with_next() { + let mut buf = Buffer::from(vec![Bytes::from("a")]); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"a"); + + let bs = buf.next(); + + assert_eq!(bs, Some(Bytes::from("a"))); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_buffer_advance() { + let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 2); + assert_eq!(buf.chunk(), b"b"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 1); + assert_eq!(buf.chunk(), b"c"); + + buf.advance(1); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + + buf.advance(0); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + #[test] + fn test_buffer_truncate() { + let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(100); + + assert_eq!(buf.remaining(), 3); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(2); + + assert_eq!(buf.remaining(), 2); + assert_eq!(buf.chunk(), b"a"); + + buf.truncate(0); + + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), EMPTY_SLICE); + } + + /// This setup will return + /// + /// - A buffer + /// - Total size of this buffer. + /// - Total content of this buffer. + fn setup_buffer() -> (Buffer, usize, Bytes) { + let mut rng = rand::thread_rng(); + + let bs = (0..100) + .map(|_| { + let len = rng.gen_range(1..100); + let mut buf = vec![0; len]; + rng.fill(&mut buf[..]); + Bytes::from(buf) + }) + .collect::>(); + + let total_size = bs.iter().map(|b| b.len()).sum::(); + let total_content = bs.iter().flatten().copied().collect::(); + let buf = Buffer::from(bs); + + (buf, total_size, total_content) + } + + #[test] + fn fuzz_buffer_advance() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + // Loop at most 10000 times. + let mut times = 10000; + while !buf.is_empty() && times > 0 { + times -= 1; + + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } + + #[test] + fn fuzz_buffer_iter() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + while buf.is_empty() { + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + // Before next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + + if let Some(bs) = buf.next() { + assert_eq!(bs, total_content.slice(cur..cur + bs.len())); + cur += bs.len(); + } + + // After next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } + + #[test] + fn fuzz_buffer_truncate() { + let mut rng = rand::thread_rng(); + + let (mut buf, total_size, total_content) = setup_buffer(); + assert_eq!(buf.remaining(), total_size); + assert_eq!(buf.to_bytes(), total_content); + + let mut cur = 0; + while buf.is_empty() { + let cnt = rng.gen_range(0..total_size - cur); + cur += cnt; + buf.advance(cnt); + + // Before truncate + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + + let truncate_size = rng.gen_range(0..total_size - cur); + buf.truncate(truncate_size); + + // After truncate + assert_eq!(buf.remaining(), truncate_size); + assert_eq!( + buf.to_bytes(), + total_content.slice(cur..cur + truncate_size) + ); + + // Try next after truncate + if let Some(bs) = buf.next() { + assert_eq!(bs, total_content.slice(cur..cur + bs.len())); + cur += bs.len(); + } + + // After next + assert_eq!(buf.remaining(), total_size - cur); + assert_eq!(buf.to_bytes(), total_content.slice(cur..)); + } + } +} diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 62a801d9462d..69da24d2b61b 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod bit_vec; +pub mod buffer; pub mod bytes; pub mod plugins; pub mod range_read; diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 904681d5f40a..21e5487d1e42 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::ResultExt; @@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send { async fn range_read(&mut self, offset: u64, size: u32) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result>; /// Retrieves metadata of all inverted indices stored within the blob. async fn metadata(&mut self) -> Result>; diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 371655d535f3..fcaa63773d93 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; @@ -60,9 +61,8 @@ impl InvertedIndexReader for InvertedIndexBlobReader { Ok(buf.into()) } - async fn read_vec(&mut self, ranges: &[Range]) -> Result>> { - let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?; - Ok(bufs.into_iter().map(|buf| buf.into()).collect()) + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + self.source.read_vec(ranges).await.context(CommonIoSnafu) } async fn metadata(&mut self) -> Result> { diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index e25fb22dcbf5..c6625be4f295 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; +use bytes::Bytes; +use common_base::buffer::Buffer; use common_base::BitVec; use index::inverted_index::error::DecodeFstSnafu; use index::inverted_index::format::reader::InvertedIndexReader; @@ -70,7 +72,7 @@ where } // TODO: Can be replaced by an uncontinuous structure like opendal::Buffer. let mut data = Vec::with_capacity(keys.len()); - data.resize(keys.len(), Arc::new(Vec::new())); + data.resize(keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; let mut cache_miss_idx = vec![]; let last_index = keys.len() - 1; @@ -97,24 +99,20 @@ where if !cache_miss_range.is_empty() { let pages = self.inner.read_vec(&cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { - let page = Arc::new(page); + let page = page; let key = keys[i].clone(); data[i] = page.clone(); self.cache.put_index(key, page.clone()); } } - let mut result = Vec::with_capacity(size as usize); - data.iter().enumerate().for_each(|(i, page)| { - let range = if i == 0 { - IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size) - } else if i == last_index { - IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size) - } else { - 0..self.cache.page_size as usize - }; - result.extend_from_slice(&page[range]); - }); - Ok(result) + let buffer = Buffer::from_iter(data.into_iter()); + Ok(buffer + .slice(IndexDataPageKey::calculate_range( + offset, + size, + self.cache.page_size, + )) + .to_vec()) } } @@ -131,7 +129,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead async fn read_vec( &mut self, ranges: &[Range], - ) -> index::inverted_index::error::Result>> { + ) -> index::inverted_index::error::Result> { self.inner.read_vec(ranges).await } @@ -190,31 +188,14 @@ impl IndexDataPageKey { (end_page + 1 - start_page) as u32 } - /// Computes the byte range in the first page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096. - fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range { + /// Computes the byte range based on the offset and size. + /// For example, if offset is 5000 and size is 5000 with PAGE_SIZE of 4096, the range is 904..5904. + fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range { let start = (offset % page_size) as usize; - let end = if size > page_size as u32 - start as u32 { - page_size as usize - } else { - start + size as usize - }; + let end = start + size as usize; start..end } - /// Computes the byte range in the last page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904. - fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range { - let offset = offset as usize; - let size = size as usize; - let page_size = page_size as usize; - if (offset + size) % page_size == 0 { - 0..page_size - } else { - 0..((offset + size) % page_size) - } - } - /// Generates a vector of IndexKey instances for the pages that a given offset and size span. fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec { let start_page = Self::calculate_page_id(offset, page_size); @@ -234,7 +215,7 @@ pub struct InvertedIndexCache { /// Cache for inverted index metadata index_metadata: moka::sync::Cache>, /// Cache for inverted index content. - index: moka::sync::Cache>>, + index: moka::sync::Cache, // Page size for index content. page_size: u64, } @@ -284,11 +265,11 @@ impl InvertedIndexCache { self.index_metadata.insert(key, metadata) } - pub fn get_index(&self, key: &IndexDataPageKey) -> Option>> { + pub fn get_index(&self, key: &IndexDataPageKey) -> Option { self.index.get(key) } - pub fn put_index(&self, key: IndexDataPageKey, value: Arc>) { + pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) { CACHE_BYTES .with_label_values(&[INDEX_CONTENT_TYPE]) .add(index_content_weight(&key, &value).into()); @@ -302,7 +283,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc) -> u } /// Calculates weight for index content. -fn index_content_weight(k: &IndexDataPageKey, v: &Arc>) -> u32 { +fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 { (k.file_id.as_bytes().len() + v.len()) as u32 } @@ -331,6 +312,9 @@ mod test { use crate::sst::index::store::InstrumentedStore; use crate::test_util::TestEnv; + // Repeat times for following little fuzz tests. + const FUZZ_REPEAT_TIMES: usize = 100; + // Fuzz test for index data page key #[test] fn fuzz_index_calculation() { @@ -340,7 +324,7 @@ mod test { rng.fill_bytes(&mut data); let file_id = FileId::random(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..data.len() as u64); let size = rng.gen_range(0..data.len() as u32 - offset as u32); let page_size: usize = rng.gen_range(1..1024); @@ -349,32 +333,24 @@ mod test { IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64); let page_num = indexes.len(); let mut read = Vec::with_capacity(size as usize); - let last_index = indexes.len() - 1; - for (i, key) in indexes.into_iter().enumerate() { + for key in indexes.into_iter() { let start = key.page_id as usize * page_size; let page = if start + page_size < data.len() { &data[start..start + page_size] } else { &data[start..] }; - let range = if i == 0 { - // first page range - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64) - } else if i == last_index { - // last page range. when the first page is the last page, the range is not used. - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64) - } else { - 0..page_size - }; - read.extend_from_slice(&page[range]); + read.extend_from_slice(page); } let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = + read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec(); if read != data.get(expected_range).unwrap() { panic!( - "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}", + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", offset, size, page_size, read.len(), size as usize, - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64), - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num + IndexDataPageKey::calculate_range(offset, size, page_size as u64), + page_num ); } } @@ -519,7 +495,7 @@ mod test { // fuzz test let mut rng = rand::thread_rng(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..file_size); let size = rng.gen_range(0..file_size as u32 - offset as u32); let expected = cached_reader.range_read(offset, size).await.unwrap(); From c5b72fddf0aa30367a8fe1b7f5b682a9d75ee368 Mon Sep 17 00:00:00 2001 From: Yohan Wal <1035325592@qq.com> Date: Tue, 17 Dec 2024 14:55:11 +0800 Subject: [PATCH 2/4] Update src/mito2/src/cache/index.rs Co-authored-by: Weny Xu --- src/mito2/src/cache/index.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index c6625be4f295..cd7c9e30a4a9 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -188,8 +188,13 @@ impl IndexDataPageKey { (end_page + 1 - start_page) as u32 } - /// Computes the byte range based on the offset and size. - /// For example, if offset is 5000 and size is 5000 with PAGE_SIZE of 4096, the range is 904..5904. + /// Calculates the byte range for data retrieval based on the specified offset and size. + /// + /// This function determines the starting and ending byte positions required for reading data. + /// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096, + /// the resulting byte range will be 904..5904. This indicates that: + /// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288). + /// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages. fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range { let start = (offset % page_size) as usize; let end = start + size as usize; From 5000f47bd47c9e7f5113d7402d5307e63a0d25a3 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Tue, 17 Dec 2024 16:40:25 +0800 Subject: [PATCH 3/4] chore: apply review comments --- src/mito2/src/cache/index.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index cd7c9e30a4a9..ad844dfea839 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -70,15 +70,14 @@ where if keys.is_empty() { return Ok(Vec::new()); } - // TODO: Can be replaced by an uncontinuous structure like opendal::Buffer. let mut data = Vec::with_capacity(keys.len()); data.resize(keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; let mut cache_miss_idx = vec![]; let last_index = keys.len() - 1; // TODO: Avoid copy as much as possible. - for (i, index) in keys.clone().into_iter().enumerate() { - match self.cache.get_index(&index) { + for (i, index) in keys.iter().enumerate() { + match self.cache.get_index(index) { Some(page) => { CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); data[i] = page; @@ -99,7 +98,6 @@ where if !cache_miss_range.is_empty() { let pages = self.inner.read_vec(&cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { - let page = page; let key = keys[i].clone(); data[i] = page.clone(); self.cache.put_index(key, page.clone()); From fd058d4c7c5acebb82ffdfde100ded96ac2e8fc2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 18 Dec 2024 11:24:27 +0800 Subject: [PATCH 4/4] refactor: use opendal::Buffer --- src/common/base/src/buffer.rs | 812 ---------------------------------- src/common/base/src/lib.rs | 1 - src/mito2/src/cache/index.rs | 2 +- 3 files changed, 1 insertion(+), 814 deletions(-) delete mode 100644 src/common/base/src/buffer.rs diff --git a/src/common/base/src/buffer.rs b/src/common/base/src/buffer.rs deleted file mode 100644 index 050c1b0e6c93..000000000000 --- a/src/common/base/src/buffer.rs +++ /dev/null @@ -1,812 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Copid from https://github.com/apache/opendal/blob/eaf6c1846f725b2bd31d049fab267e3799a3ea27/core/src/types/buffer.rs - -use std::collections::VecDeque; -use std::convert::Infallible; -use std::fmt::{Debug, Formatter}; -use std::io::IoSlice; -use std::mem; -use std::ops::{Bound, RangeBounds}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::Stream; - -/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`. -/// -/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example, -/// http based storage like s3 could generate non-contiguous bytes by stream. -/// -/// ## Features -/// -/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly. -/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation. -/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write. -/// -/// ## Examples -/// -/// ### As `Buf` -/// -/// `Buffer` implements `Buf` trait: -/// -/// ```rust -/// use bytes::Buf; -/// use opendal::Buffer; -/// use serde_json; -/// -/// fn test(mut buf: Buffer) -> Vec { -/// serde_json::from_reader(buf.reader()).unwrap() -/// } -/// ``` -/// -/// ### As Bytes `Iterator` -/// -/// `Buffer` implements `Iterator` trait: -/// -/// ```rust -/// use bytes::Bytes; -/// use opendal::Buffer; -/// -/// fn test(mut buf: Buffer) -> Vec { -/// buf.into_iter().collect() -/// } -/// ``` -/// -/// ### As Bytes `Stream` -/// -/// `Buffer` implements `Stream>` trait: -/// -/// ```rust -/// use bytes::Bytes; -/// use futures::TryStreamExt; -/// use opendal::Buffer; -/// -/// async fn test(mut buf: Buffer) -> Vec { -/// buf.into_iter().try_collect().await.unwrap() -/// } -/// ``` -/// -/// ### As one contiguous Bytes -/// -/// `Buffer` can make contiguous by transform into `Bytes` or `Vec`. -/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't -/// reuse the same memory region anymore. -/// -/// ```rust -/// use bytes::Bytes; -/// use opendal::Buffer; -/// -/// fn test_to_vec(buf: Buffer) -> Vec { -/// buf.to_vec() -/// } -/// -/// fn test_to_bytes(buf: Buffer) -> Bytes { -/// buf.to_bytes() -/// } -/// ``` -#[derive(Clone)] -pub struct Buffer(Inner); - -#[derive(Clone)] -enum Inner { - Contiguous(Bytes), - NonContiguous { - parts: Arc<[Bytes]>, - size: usize, - idx: usize, - offset: usize, - }, -} - -impl Debug for Buffer { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut b = f.debug_struct("Buffer"); - - match &self.0 { - Inner::Contiguous(bs) => { - b.field("type", &"contiguous"); - b.field("size", &bs.len()); - } - Inner::NonContiguous { - parts, - size, - idx, - offset, - } => { - b.field("type", &"non_contiguous"); - b.field("parts", &parts); - b.field("size", &size); - b.field("idx", &idx); - b.field("offset", &offset); - } - } - b.finish_non_exhaustive() - } -} - -impl Default for Buffer { - fn default() -> Self { - Self::new() - } -} -impl Buffer { - /// Create a new empty buffer. - /// - /// This operation is const and no allocation will be performed. - #[inline] - pub const fn new() -> Self { - Self(Inner::Contiguous(Bytes::new())) - } - - /// Get the length of the buffer. - #[inline] - pub fn len(&self) -> usize { - match &self.0 { - Inner::Contiguous(b) => b.remaining(), - Inner::NonContiguous { size, .. } => *size, - } - } - - /// Check if buffer is empty. - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Number of [`Bytes`] in [`Buffer`]. - /// - /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes - /// available for use. - pub fn count(&self) -> usize { - match &self.0 { - Inner::Contiguous(_) => 1, - Inner::NonContiguous { - parts, - idx, - size, - offset, - } => { - parts - .iter() - .skip(*idx) - .fold((0, size + offset), |(count, size), bytes| { - if size == 0 { - (count, 0) - } else { - (count + 1, size.saturating_sub(bytes.len())) - } - }) - .0 - } - } - } - - /// Get current [`Bytes`]. - pub fn current(&self) -> Bytes { - match &self.0 { - Inner::Contiguous(inner) => inner.clone(), - Inner::NonContiguous { - parts, - idx, - offset, - size, - } => { - let chunk = &parts[*idx]; - let n = (chunk.len() - *offset).min(*size); - chunk.slice(*offset..*offset + n) - } - } - } - - /// Shortens the buffer, keeping the first `len` bytes and dropping the rest. - /// - /// If `len` is greater than the buffer’s current length, this has no effect. - #[inline] - pub fn truncate(&mut self, len: usize) { - match &mut self.0 { - Inner::Contiguous(bs) => bs.truncate(len), - Inner::NonContiguous { size, .. } => { - *size = (*size).min(len); - } - } - } - - /// Returns a slice of self for the provided range. - /// - /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice. - /// - /// This operation is O(1). - pub fn slice(&self, range: impl RangeBounds) -> Self { - let len = self.len(); - - let begin = match range.start_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), - Bound::Unbounded => 0, - }; - - let end = match range.end_bound() { - Bound::Included(&n) => n.checked_add(1).expect("out of range"), - Bound::Excluded(&n) => n, - Bound::Unbounded => len, - }; - - assert!( - begin <= end, - "range start must not be greater than end: {:?} <= {:?}", - begin, - end, - ); - assert!( - end <= len, - "range end out of bounds: {:?} <= {:?}", - end, - len, - ); - - if end == begin { - return Buffer::new(); - } - - let mut ret = self.clone(); - ret.truncate(end); - ret.advance(begin); - ret - } - - /// Combine all bytes together into one single [`Bytes`]. - /// - /// This operation is zero copy if the underlying bytes are contiguous. - /// Otherwise, it will copy all bytes into one single [`Bytes`]. - /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. - #[inline] - pub fn to_bytes(&self) -> Bytes { - match &self.0 { - Inner::Contiguous(bytes) => bytes.clone(), - Inner::NonContiguous { - parts, - size, - idx: _, - offset, - } => { - if parts.len() == 1 { - parts[0].slice(*offset..(*offset + *size)) - } else { - let mut ret = BytesMut::with_capacity(self.len()); - ret.put(self.clone()); - ret.freeze() - } - } - } - } - - /// Combine all bytes together into one single [`Vec`]. - /// - /// This operation is not zero copy, it will copy all bytes into one single [`Vec`]. - /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible. - #[inline] - pub fn to_vec(&self) -> Vec { - let mut ret = Vec::with_capacity(self.len()); - ret.put(self.clone()); - ret - } - - /// Convert buffer into a slice of [`IoSlice`] for vectored write. - #[inline] - pub fn to_io_slice(&self) -> Vec> { - match &self.0 { - Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())], - Inner::NonContiguous { - parts, idx, offset, .. - } => { - let mut ret = Vec::with_capacity(parts.len() - *idx); - let mut new_offset = *offset; - for part in parts.iter().skip(*idx) { - ret.push(IoSlice::new(&part[new_offset..])); - new_offset = 0; - } - ret - } - } - } -} - -impl From> for Buffer { - #[inline] - fn from(bs: Vec) -> Self { - Self(Inner::Contiguous(bs.into())) - } -} - -impl From for Buffer { - #[inline] - fn from(bs: Bytes) -> Self { - Self(Inner::Contiguous(bs)) - } -} - -impl From for Buffer { - #[inline] - fn from(s: String) -> Self { - Self(Inner::Contiguous(Bytes::from(s))) - } -} - -impl From<&'static [u8]> for Buffer { - #[inline] - fn from(s: &'static [u8]) -> Self { - Self(Inner::Contiguous(Bytes::from_static(s))) - } -} - -impl From<&'static str> for Buffer { - #[inline] - fn from(s: &'static str) -> Self { - Self(Inner::Contiguous(Bytes::from_static(s.as_bytes()))) - } -} - -impl FromIterator for Buffer { - #[inline] - fn from_iter>(iter: T) -> Self { - Self(Inner::Contiguous(Bytes::from_iter(iter))) - } -} - -impl From> for Buffer { - #[inline] - fn from(bs: VecDeque) -> Self { - let size = bs.iter().map(Bytes::len).sum(); - Self(Inner::NonContiguous { - parts: Vec::from(bs).into(), - size, - idx: 0, - offset: 0, - }) - } -} - -impl From> for Buffer { - #[inline] - fn from(bs: Vec) -> Self { - let size = bs.iter().map(Bytes::len).sum(); - Self(Inner::NonContiguous { - parts: bs.into(), - size, - idx: 0, - offset: 0, - }) - } -} - -impl From> for Buffer { - #[inline] - fn from(bs: Arc<[Bytes]>) -> Self { - let size = bs.iter().map(Bytes::len).sum(); - Self(Inner::NonContiguous { - parts: bs, - size, - idx: 0, - offset: 0, - }) - } -} - -impl FromIterator for Buffer { - #[inline] - fn from_iter>(iter: T) -> Self { - let mut size = 0; - let bs = iter.into_iter().inspect(|v| size += v.len()); - // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead - // of iterator -> `Vec` -> `Arc<[Bytes]>`. - let parts = Arc::from_iter(bs); - Self(Inner::NonContiguous { - parts, - size, - idx: 0, - offset: 0, - }) - } -} - -impl Buf for Buffer { - #[inline] - fn remaining(&self) -> usize { - self.len() - } - - #[inline] - fn chunk(&self) -> &[u8] { - match &self.0 { - Inner::Contiguous(b) => b.chunk(), - Inner::NonContiguous { - parts, - size, - idx, - offset, - } => { - if *size == 0 { - return &[]; - } - - let chunk = &parts[*idx]; - let n = (chunk.len() - *offset).min(*size); - &parts[*idx][*offset..*offset + n] - } - } - } - - #[inline] - fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { - match &self.0 { - Inner::Contiguous(b) => { - if dst.is_empty() { - return 0; - } - - dst[0] = IoSlice::new(b.chunk()); - 1 - } - Inner::NonContiguous { - parts, idx, offset, .. - } => { - if dst.is_empty() { - return 0; - } - - let mut new_offset = *offset; - parts - .iter() - .skip(*idx) - .zip(dst.iter_mut()) - .map(|(part, dst)| { - *dst = IoSlice::new(&part[new_offset..]); - new_offset = 0; - }) - .count() - } - } - } - - #[inline] - fn advance(&mut self, cnt: usize) { - match &mut self.0 { - Inner::Contiguous(b) => b.advance(cnt), - Inner::NonContiguous { - parts, - size, - idx, - offset, - } => { - assert!( - cnt <= *size, - "cannot advance past {cnt} bytes, only {size} bytes left" - ); - - let mut new_idx = *idx; - let mut new_offset = *offset; - let mut remaining_cnt = cnt; - while remaining_cnt > 0 { - let part_len = parts[new_idx].len(); - let remaining_in_part = part_len - new_offset; - - if remaining_cnt < remaining_in_part { - new_offset += remaining_cnt; - break; - } - - remaining_cnt -= remaining_in_part; - new_idx += 1; - new_offset = 0; - } - - *idx = new_idx; - *offset = new_offset; - *size -= cnt; - } - } - } -} - -impl Iterator for Buffer { - type Item = Bytes; - - fn next(&mut self) -> Option { - match &mut self.0 { - Inner::Contiguous(bs) => { - if bs.is_empty() { - None - } else { - Some(mem::take(bs)) - } - } - Inner::NonContiguous { - parts, - size, - idx, - offset, - } => { - if *size == 0 { - return None; - } - - let chunk = &parts[*idx]; - let n = (chunk.len() - *offset).min(*size); - let buf = chunk.slice(*offset..*offset + n); - *size -= n; - *offset += n; - - if *offset == chunk.len() { - *idx += 1; - *offset = 0; - } - - Some(buf) - } - } - } - - fn size_hint(&self) -> (usize, Option) { - match &self.0 { - Inner::Contiguous(bs) => { - if bs.is_empty() { - (0, Some(0)) - } else { - (1, Some(1)) - } - } - Inner::NonContiguous { parts, idx, .. } => { - let remaining = parts.len().saturating_sub(*idx); - (remaining, Some(remaining)) - } - } - } -} - -impl Stream for Buffer { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().next().map(Ok)) - } - - fn size_hint(&self) -> (usize, Option) { - Iterator::size_hint(self) - } -} - -#[cfg(test)] -mod tests { - use rand::Rng; - - use super::*; - - const EMPTY_SLICE: &[u8] = &[]; - - #[test] - fn test_contiguous_buffer() { - let mut buf = Buffer::new(); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - assert_eq!(buf.next(), None); - } - - #[test] - fn test_empty_non_contiguous_buffer() { - let mut buf = Buffer::from(vec![Bytes::new()]); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - assert_eq!(buf.next(), None); - } - - #[test] - fn test_non_contiguous_buffer_with_empty_chunks() { - let mut buf = Buffer::from(vec![Bytes::from("a")]); - - assert_eq!(buf.remaining(), 1); - assert_eq!(buf.chunk(), b"a"); - - buf.advance(1); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - } - - #[test] - fn test_non_contiguous_buffer_with_next() { - let mut buf = Buffer::from(vec![Bytes::from("a")]); - - assert_eq!(buf.remaining(), 1); - assert_eq!(buf.chunk(), b"a"); - - let bs = buf.next(); - - assert_eq!(bs, Some(Bytes::from("a"))); - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - } - - #[test] - fn test_buffer_advance() { - let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); - - assert_eq!(buf.remaining(), 3); - assert_eq!(buf.chunk(), b"a"); - - buf.advance(1); - - assert_eq!(buf.remaining(), 2); - assert_eq!(buf.chunk(), b"b"); - - buf.advance(1); - - assert_eq!(buf.remaining(), 1); - assert_eq!(buf.chunk(), b"c"); - - buf.advance(1); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - - buf.advance(0); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - } - - #[test] - fn test_buffer_truncate() { - let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]); - - assert_eq!(buf.remaining(), 3); - assert_eq!(buf.chunk(), b"a"); - - buf.truncate(100); - - assert_eq!(buf.remaining(), 3); - assert_eq!(buf.chunk(), b"a"); - - buf.truncate(2); - - assert_eq!(buf.remaining(), 2); - assert_eq!(buf.chunk(), b"a"); - - buf.truncate(0); - - assert_eq!(buf.remaining(), 0); - assert_eq!(buf.chunk(), EMPTY_SLICE); - } - - /// This setup will return - /// - /// - A buffer - /// - Total size of this buffer. - /// - Total content of this buffer. - fn setup_buffer() -> (Buffer, usize, Bytes) { - let mut rng = rand::thread_rng(); - - let bs = (0..100) - .map(|_| { - let len = rng.gen_range(1..100); - let mut buf = vec![0; len]; - rng.fill(&mut buf[..]); - Bytes::from(buf) - }) - .collect::>(); - - let total_size = bs.iter().map(|b| b.len()).sum::(); - let total_content = bs.iter().flatten().copied().collect::(); - let buf = Buffer::from(bs); - - (buf, total_size, total_content) - } - - #[test] - fn fuzz_buffer_advance() { - let mut rng = rand::thread_rng(); - - let (mut buf, total_size, total_content) = setup_buffer(); - assert_eq!(buf.remaining(), total_size); - assert_eq!(buf.to_bytes(), total_content); - - let mut cur = 0; - // Loop at most 10000 times. - let mut times = 10000; - while !buf.is_empty() && times > 0 { - times -= 1; - - let cnt = rng.gen_range(0..total_size - cur); - cur += cnt; - buf.advance(cnt); - - assert_eq!(buf.remaining(), total_size - cur); - assert_eq!(buf.to_bytes(), total_content.slice(cur..)); - } - } - - #[test] - fn fuzz_buffer_iter() { - let mut rng = rand::thread_rng(); - - let (mut buf, total_size, total_content) = setup_buffer(); - assert_eq!(buf.remaining(), total_size); - assert_eq!(buf.to_bytes(), total_content); - - let mut cur = 0; - while buf.is_empty() { - let cnt = rng.gen_range(0..total_size - cur); - cur += cnt; - buf.advance(cnt); - - // Before next - assert_eq!(buf.remaining(), total_size - cur); - assert_eq!(buf.to_bytes(), total_content.slice(cur..)); - - if let Some(bs) = buf.next() { - assert_eq!(bs, total_content.slice(cur..cur + bs.len())); - cur += bs.len(); - } - - // After next - assert_eq!(buf.remaining(), total_size - cur); - assert_eq!(buf.to_bytes(), total_content.slice(cur..)); - } - } - - #[test] - fn fuzz_buffer_truncate() { - let mut rng = rand::thread_rng(); - - let (mut buf, total_size, total_content) = setup_buffer(); - assert_eq!(buf.remaining(), total_size); - assert_eq!(buf.to_bytes(), total_content); - - let mut cur = 0; - while buf.is_empty() { - let cnt = rng.gen_range(0..total_size - cur); - cur += cnt; - buf.advance(cnt); - - // Before truncate - assert_eq!(buf.remaining(), total_size - cur); - assert_eq!(buf.to_bytes(), total_content.slice(cur..)); - - let truncate_size = rng.gen_range(0..total_size - cur); - buf.truncate(truncate_size); - - // After truncate - assert_eq!(buf.remaining(), truncate_size); - assert_eq!( - buf.to_bytes(), - total_content.slice(cur..cur + truncate_size) - ); - - // Try next after truncate - if let Some(bs) = buf.next() { - assert_eq!(bs, total_content.slice(cur..cur + bs.len())); - cur += bs.len(); - } - - // After next - assert_eq!(buf.remaining(), total_size - cur); - assert_eq!(buf.to_bytes(), total_content.slice(cur..)); - } - } -} diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 69da24d2b61b..62a801d9462d 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. pub mod bit_vec; -pub mod buffer; pub mod bytes; pub mod plugins; pub mod range_read; diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index ad844dfea839..de39ea3784b6 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; use bytes::Bytes; -use common_base::buffer::Buffer; use common_base::BitVec; use index::inverted_index::error::DecodeFstSnafu; use index::inverted_index::format::reader::InvertedIndexReader; use index::inverted_index::FstMap; +use object_store::Buffer; use prost::Message; use snafu::ResultExt;