From 371bcdb374d4c6a04d08983b0278aa43d18657db Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 19 Dec 2023 14:39:46 +0800 Subject: [PATCH 1/4] feat: concurrent fetch ranges --- src/mito2/src/sst/parquet/reader.rs | 12 ++-- src/mito2/src/sst/parquet/row_group.rs | 89 ++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 45e36786d41d..0882ef82c7e3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use common_telemetry::debug; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; -use object_store::{ObjectStore, Reader}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; @@ -178,7 +178,7 @@ impl ParquetReaderBuilder { file_handle: self.file_handle.clone(), file_path, parquet_meta, - file_reader: reader, + object_store: self.object_store.clone(), projection: projection_mask, field_levels, cache_manager: self.cache_manager.clone(), @@ -285,8 +285,8 @@ struct RowGroupReaderBuilder { file_path: String, /// Metadata of the parquet file. parquet_meta: Arc, - /// Reader to get data. - file_reader: BufReader, + /// Object store as an Operator. + object_store: ObjectStore, /// Projection mask. projection: ProjectionMask, /// Field levels to read. @@ -309,10 +309,12 @@ impl RowGroupReaderBuilder { &self.parquet_meta, row_group_idx, self.cache_manager.clone(), + &self.file_path, + self.object_store.clone(), ); // Fetches data into memory. row_group - .fetch(&mut self.file_reader, &self.projection, None) + .fetch(&self.projection, None) .await .context(ReadParquetSnafu { path: &self.file_path, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 827db8999ae8..9c83d9826922 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -14,11 +14,12 @@ //! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). +use std::ops::Range; use std::sync::Arc; use bytes::{Buf, Bytes}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; @@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> { /// /// `column_cached_pages.len()` equals to `column_chunks.len()`. column_cached_pages: Vec>>, + file_path: &'a str, + /// Object store. + object_store: ObjectStore, } impl<'a> InMemoryRowGroup<'a> { @@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> { parquet_meta: &'a ParquetMetaData, row_group_idx: usize, cache_manager: Option, + file_path: &'a str, + object_store: ObjectStore, ) -> Self { let metadata = parquet_meta.row_group(row_group_idx); // `page_locations` is always `None` if we don't set @@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> { row_group_idx, cache_manager, column_cached_pages: vec![None; metadata.columns().len()], + file_path, + object_store, } } /// Fetches the necessary column data into memory - pub async fn fetch( + pub async fn fetch( &mut self, - input: &mut T, projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { @@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> { // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges = self + let fetch_ranges: Vec<_> = self .column_chunks .iter() .zip(self.metadata.columns()) @@ -119,8 +126,8 @@ impl<'a> InMemoryRowGroup<'a> { ranges }) .collect(); + let mut chunk_data = self.get_byte_ranges(&fetch_ranges).await?.into_iter(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { @@ -165,7 +172,7 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(()); } - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = self.get_byte_ranges(&fetch_ranges).await?.into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -189,6 +196,60 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } + /// Fetches data from object store. + /// If the object store supports blocking, use sequence blocking read. + /// Otherwise, use concurrent read. + async fn get_byte_ranges(&self, ranges: &[Range]) -> Result> { + let ranges: Vec<_> = ranges + .iter() + .map(|range| range.start as u64..range.end as u64) + .collect(); + if self.object_store.info().full_capability().blocking { + self.get_ranges_seq(ranges).await + } else { + self.get_ranges_concurrent(ranges).await + } + } + + /// Fetches data from object store sequentially + async fn get_ranges_seq(&self, ranges: Vec>) -> Result> { + let block_object_store = self.object_store.blocking(); + let file_path = self.file_path.to_string(); + + let f = move || -> Result> { + ranges + .into_iter() + .map(|range| { + let data = block_object_store + .read_with(&file_path) + .range(range) + .call() + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }) + .collect::>>() + }; + + maybe_spawn_blocking(f).await + } + + /// Fetches data from object store concurrently. + async fn get_ranges_concurrent(&self, ranges: Vec>) -> Result> { + let mut handles = Vec::with_capacity(ranges.len()); + for range in ranges { + let future_read = self.object_store.read_with(self.file_path); + handles.push(async move { + let data = future_read + .range(range.start..range.end) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }); + } + let results = futures::future::try_join_all(handles).await?; + Ok(results) + } + /// Fetches pages for columns if cache is enabled. fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { self.column_chunks @@ -267,6 +328,22 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { } } +// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 +/// Takes a function and spawns it to a tokio blocking pool if available +pub async fn maybe_spawn_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime + .spawn_blocking(f) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?, + Err(_) => f(), + } +} + /// An in-memory column chunk #[derive(Clone)] enum ColumnChunkData { From 780a28dd51625c84ae71468b9e03909ec9590fac Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 20 Dec 2023 16:55:40 +0800 Subject: [PATCH 2/4] chore: cr comment --- src/mito2/src/sst/parquet/row_group.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 9c83d9826922..b249c33b8a7c 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -230,11 +230,14 @@ impl<'a> InMemoryRowGroup<'a> { .collect::>>() }; - maybe_spawn_blocking(f).await + common_runtime::spawn_blocking_read(f) + .await + .map_err(|e| ParquetError::External(Box::new(e)))? } /// Fetches data from object store concurrently. async fn get_ranges_concurrent(&self, ranges: Vec>) -> Result> { + // TODO(QuenKar): may merge small ranges to a bigger range to optimize. let mut handles = Vec::with_capacity(ranges.len()); for range in ranges { let future_read = self.object_store.read_with(self.file_path); @@ -328,22 +331,6 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { } } -// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 -/// Takes a function and spawns it to a tokio blocking pool if available -pub async fn maybe_spawn_blocking(f: F) -> Result -where - F: FnOnce() -> Result + Send + 'static, - T: Send + 'static, -{ - match tokio::runtime::Handle::try_current() { - Ok(runtime) => runtime - .spawn_blocking(f) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?, - Err(_) => f(), - } -} - /// An in-memory column chunk #[derive(Clone)] enum ColumnChunkData { From b1a7658a3494f934bf045991353387745fb1134d Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 21 Dec 2023 14:44:03 +0800 Subject: [PATCH 3/4] chore: cr comment --- src/mito2/src/sst/parquet/row_group.rs | 155 +++++++++++++++---------- 1 file changed, 96 insertions(+), 59 deletions(-) diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index b249c33b8a7c..564893b89dfc 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -126,7 +126,13 @@ impl<'a> InMemoryRowGroup<'a> { ranges }) .collect(); - let mut chunk_data = self.get_byte_ranges(&fetch_ranges).await?.into_iter(); + let mut chunk_data = fetch_byte_ranges( + self.file_path.to_string(), + self.object_store.clone(), + fetch_ranges, + ) + .await? + .into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); @@ -172,7 +178,13 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(()); } - let mut chunk_data = self.get_byte_ranges(&fetch_ranges).await?.into_iter(); + let mut chunk_data = fetch_byte_ranges( + self.file_path.to_string(), + self.object_store.clone(), + fetch_ranges, + ) + .await? + .into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -196,63 +208,6 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } - /// Fetches data from object store. - /// If the object store supports blocking, use sequence blocking read. - /// Otherwise, use concurrent read. - async fn get_byte_ranges(&self, ranges: &[Range]) -> Result> { - let ranges: Vec<_> = ranges - .iter() - .map(|range| range.start as u64..range.end as u64) - .collect(); - if self.object_store.info().full_capability().blocking { - self.get_ranges_seq(ranges).await - } else { - self.get_ranges_concurrent(ranges).await - } - } - - /// Fetches data from object store sequentially - async fn get_ranges_seq(&self, ranges: Vec>) -> Result> { - let block_object_store = self.object_store.blocking(); - let file_path = self.file_path.to_string(); - - let f = move || -> Result> { - ranges - .into_iter() - .map(|range| { - let data = block_object_store - .read_with(&file_path) - .range(range) - .call() - .map_err(|e| ParquetError::External(Box::new(e)))?; - Ok::<_, ParquetError>(Bytes::from(data)) - }) - .collect::>>() - }; - - common_runtime::spawn_blocking_read(f) - .await - .map_err(|e| ParquetError::External(Box::new(e)))? - } - - /// Fetches data from object store concurrently. - async fn get_ranges_concurrent(&self, ranges: Vec>) -> Result> { - // TODO(QuenKar): may merge small ranges to a bigger range to optimize. - let mut handles = Vec::with_capacity(ranges.len()); - for range in ranges { - let future_read = self.object_store.read_with(self.file_path); - handles.push(async move { - let data = future_read - .range(range.start..range.end) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?; - Ok::<_, ParquetError>(Bytes::from(data)) - }); - } - let results = futures::future::try_join_all(handles).await?; - Ok(results) - } - /// Fetches pages for columns if cache is enabled. fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { self.column_chunks @@ -400,3 +355,85 @@ impl Iterator for ColumnChunkIterator { } impl PageIterator for ColumnChunkIterator {} + +/// Fetches data from object store. +/// If the object store supports blocking, use sequence blocking read. +/// Otherwise, use concurrent read. +async fn fetch_byte_ranges( + file_path: String, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let ranges: Vec<_> = ranges + .iter() + .map(|range| range.start as u64..range.end as u64) + .collect(); + if object_store.info().full_capability().blocking { + fetch_ranges_seq(file_path, object_store, ranges).await + } else { + fetch_ranges_concurrent(file_path, object_store, ranges).await + } +} + +/// Fetches data from object store sequentially +async fn fetch_ranges_seq( + file_path: String, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let block_object_store = object_store.blocking(); + + let f = move || -> Result> { + ranges + .into_iter() + .map(|range| { + let data = block_object_store + .read_with(&file_path) + .range(range) + .call() + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }) + .collect::>>() + }; + + maybe_spawn_blocking(f).await +} + +/// Fetches data from object store concurrently. +async fn fetch_ranges_concurrent( + file_path: String, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + // TODO(QuenKar): may merge small ranges to a bigger range to optimize. + let mut handles = Vec::with_capacity(ranges.len()); + for range in ranges { + let future_read = object_store.read_with(&file_path); + handles.push(async move { + let data = future_read + .range(range.start..range.end) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }); + } + let results = futures::future::try_join_all(handles).await?; + Ok(results) +} + +// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 +/// Takes a function and spawns it to a tokio blocking pool if available +pub async fn maybe_spawn_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime + .spawn_blocking(f) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?, + Err(_) => f(), + } +} From a6265e492f517106b32f0f03820f27dbc2bc0a91 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 21 Dec 2023 14:50:44 +0800 Subject: [PATCH 4/4] chore: clippy --- src/mito2/src/sst/parquet/row_group.rs | 31 +++++++++++--------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 564893b89dfc..b24413e43f69 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -126,13 +126,10 @@ impl<'a> InMemoryRowGroup<'a> { ranges }) .collect(); - let mut chunk_data = fetch_byte_ranges( - self.file_path.to_string(), - self.object_store.clone(), - fetch_ranges, - ) - .await? - .into_iter(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); @@ -178,13 +175,10 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(()); } - let mut chunk_data = fetch_byte_ranges( - self.file_path.to_string(), - self.object_store.clone(), - fetch_ranges, - ) - .await? - .into_iter(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -360,7 +354,7 @@ impl PageIterator for ColumnChunkIterator {} /// If the object store supports blocking, use sequence blocking read. /// Otherwise, use concurrent read. async fn fetch_byte_ranges( - file_path: String, + file_path: &str, object_store: ObjectStore, ranges: Vec>, ) -> Result> { @@ -377,11 +371,12 @@ async fn fetch_byte_ranges( /// Fetches data from object store sequentially async fn fetch_ranges_seq( - file_path: String, + file_path: &str, object_store: ObjectStore, ranges: Vec>, ) -> Result> { let block_object_store = object_store.blocking(); + let file_path = file_path.to_string(); let f = move || -> Result> { ranges @@ -402,14 +397,14 @@ async fn fetch_ranges_seq( /// Fetches data from object store concurrently. async fn fetch_ranges_concurrent( - file_path: String, + file_path: &str, object_store: ObjectStore, ranges: Vec>, ) -> Result> { // TODO(QuenKar): may merge small ranges to a bigger range to optimize. let mut handles = Vec::with_capacity(ranges.len()); for range in ranges { - let future_read = object_store.read_with(&file_path); + let future_read = object_store.read_with(file_path); handles.push(async move { let data = future_read .range(range.start..range.end)