Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support fetch ranges in concurrent #2959

Merged
merged 4 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::{ObjectStore, Reader};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
Expand Down Expand Up @@ -178,7 +178,7 @@ impl ParquetReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
parquet_meta,
file_reader: reader,
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_manager: self.cache_manager.clone(),
Expand Down Expand Up @@ -285,8 +285,8 @@ struct RowGroupReaderBuilder {
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Reader to get data.
file_reader: BufReader<Reader>,
/// Object store as an Operator.
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
Expand All @@ -309,10 +309,12 @@ impl RowGroupReaderBuilder {
&self.parquet_meta,
row_group_idx,
self.cache_manager.clone(),
&self.file_path,
self.object_store.clone(),
);
// Fetches data into memory.
row_group
.fetch(&mut self.file_reader, &self.projection, None)
.fetch(&self.projection, None)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
Expand Down
108 changes: 102 additions & 6 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).

use std::ops::Range;
use std::sync::Arc;

use bytes::{Buf, Bytes};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
Expand Down Expand Up @@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> {
///
/// `column_cached_pages.len()` equals to `column_chunks.len()`.
column_cached_pages: Vec<Option<Arc<PageValue>>>,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
}

impl<'a> InMemoryRowGroup<'a> {
Expand All @@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> {
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
file_path: &'a str,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
object_store: ObjectStore,
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
Expand All @@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> {
row_group_idx,
cache_manager,
column_cached_pages: vec![None; metadata.columns().len()],
file_path,
object_store,
}
}

/// Fetches the necessary column data into memory
pub async fn fetch<T: AsyncFileReader + Send>(
pub async fn fetch(
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
Expand All @@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> {
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];

let fetch_ranges = self
let fetch_ranges: Vec<_> = self
QuenKar marked this conversation as resolved.
Show resolved Hide resolved
.column_chunks
.iter()
.zip(self.metadata.columns())
Expand All @@ -119,8 +126,11 @@ impl<'a> InMemoryRowGroup<'a> {
ranges
})
.collect();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
Expand Down Expand Up @@ -165,7 +175,10 @@ impl<'a> InMemoryRowGroup<'a> {
return Ok(());
}

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();

for (idx, (chunk, cached_pages)) in self
.column_chunks
Expand Down Expand Up @@ -336,3 +349,86 @@ impl Iterator for ColumnChunkIterator {
}

impl PageIterator for ColumnChunkIterator {}

/// Fetches data from object store.
/// If the object store supports blocking, use sequence blocking read.
/// Otherwise, use concurrent read.
async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<usize>>,
) -> Result<Vec<Bytes>> {
let ranges: Vec<_> = ranges
.iter()
.map(|range| range.start as u64..range.end as u64)
.collect();
if object_store.info().full_capability().blocking {
fetch_ranges_seq(file_path, object_store, ranges).await
} else {
fetch_ranges_concurrent(file_path, object_store, ranges).await
}
}

/// Fetches data from object store sequentially
async fn fetch_ranges_seq(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
let block_object_store = object_store.blocking();
let file_path = file_path.to_string();

let f = move || -> Result<Vec<Bytes>> {
ranges
.into_iter()
.map(|range| {
let data = block_object_store
.read_with(&file_path)
.range(range)
.call()
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
})
.collect::<Result<Vec<_>>>()
};

maybe_spawn_blocking(f).await
}

/// Fetches data from object store concurrently.
async fn fetch_ranges_concurrent(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
// TODO(QuenKar): may merge small ranges to a bigger range to optimize.
let mut handles = Vec::with_capacity(ranges.len());
for range in ranges {
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read
.range(range.start..range.end)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
});
}
let results = futures::future::try_join_all(handles).await?;
Ok(results)
}

// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83
/// Takes a function and spawns it to a tokio blocking pool if available
pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(f)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?,
Err(_) => f(),
}
}
Loading