diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e1c5d5424b0..69ca3a0fa3ae 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -365,6 +365,11 @@ config_namespace! { /// multiple parquet files with schemas containing compatible types but different metadata pub skip_metadata: bool, default = true + /// (reading) If true, caches the Parquet file-level metadata so it does not need + /// to be parsed on every query. This should typically be true when running more than + /// one (short) query on a table. + pub cache_metadata: bool, default = false + /// (reading) If specified, the parquet reader will try and fetch the last `size_hint` /// bytes of the parquet file optimistically. If not specified, two reads are required: /// One read to fetch the 8-byte parquet footer and diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 5d553d59da4e..5c25a8e97275 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -168,6 +168,7 @@ impl ParquetOptions { enable_page_index: _, pruning: _, skip_metadata: _, + cache_metadata: _, metadata_size_hint: _, pushdown_filters: _, reorder_filters: _, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 35296b0d7907..e45cb1895cf2 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -52,6 +52,8 @@ use datafusion_common::{ }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::Expr; use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; @@ -60,6 +62,7 @@ use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use dashmap::DashMap; use hashbrown::HashMap; use log::debug; use object_store::buffered::BufWriter; @@ -80,7 +83,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; use crate::datasource::physical_plan::parquet::{ - can_expr_be_pushed_down_with_schemas, ParquetExecBuilder, + can_expr_be_pushed_down_with_schemas, CachedParquetFileReaderFactory, + DefaultParquetFileReaderFactory, ParquetExecBuilder, ParquetFileReaderFactory, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; @@ -140,7 +144,9 @@ impl FileFormatFactory for ParquetFormatFactory { }; Ok(Arc::new( - ParquetFormat::default().with_options(parquet_options), + ParquetFormat::default() + .with_options(parquet_options) + .with_runtime_env(Some(state.runtime_env().clone())), )) } @@ -171,6 +177,8 @@ impl fmt::Debug for ParquetFormatFactory { #[derive(Debug, Default)] pub struct ParquetFormat { options: TableParquetOptions, + runtime_env: Option>, + reader_factory: DashMap>, } impl ParquetFormat { @@ -182,6 +190,7 @@ impl ParquetFormat { /// Activate statistics based row group level pruning /// - If `None`, defaults to value on `config_options` pub fn with_enable_pruning(mut self, enable: bool) -> Self { + self.reader_factory.clear(); self.options.global.pruning = enable; self } @@ -198,6 +207,7 @@ impl ParquetFormat { /// /// - If `None`, defaults to value on `config_options` pub fn with_metadata_size_hint(mut self, size_hint: Option) -> Self { + self.reader_factory.clear(); self.options.global.metadata_size_hint = size_hint; self } @@ -213,6 +223,7 @@ impl ParquetFormat { /// /// - If `None`, defaults to value on `config_options` pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self { + self.reader_factory.clear(); self.options.global.skip_metadata = skip_metadata; self } @@ -225,6 +236,7 @@ impl ParquetFormat { /// Set Parquet options for the ParquetFormat pub fn with_options(mut self, options: TableParquetOptions) -> Self { + self.reader_factory.clear(); self.options = options; self } @@ -234,6 +246,18 @@ impl ParquetFormat { &self.options } + /// Sets the session this factory will create tables for. + pub fn with_runtime_env(mut self, runtime_env: Option>) -> Self { + self.reader_factory.clear(); + self.runtime_env = runtime_env; + self + } + + /// Returns the session this factory creates tables for, if any. + pub fn runtime_env(&self) -> Option<&Arc> { + self.runtime_env.as_ref() + } + /// Return `true` if should use view types. /// /// If this returns true, DataFusion will instruct the parquet reader @@ -253,9 +277,37 @@ impl ParquetFormat { /// /// Refer to [`Self::force_view_types`]. pub fn with_force_view_types(mut self, use_views: bool) -> Self { + self.reader_factory.clear(); self.options.global.schema_force_view_types = use_views; self } + + /// Returns the current [`ParquetFileReaderFactory`], if a [runtime environment is set](`Self::with_runtime_env`) + /// + /// This may create it if it was not accessed before. + pub fn reader_factory( + &self, + object_store_url: ObjectStoreUrl, + ) -> Result>> { + let cache_metadata = self.options.global.cache_metadata; + self.runtime_env + .as_ref() + .map(|runtime_env| { + let store = runtime_env.object_store(&object_store_url)?; + Ok(Arc::clone( + &self + .reader_factory + .entry(object_store_url.clone()) + .or_insert_with(|| match cache_metadata { + false => { + Arc::new(DefaultParquetFileReaderFactory::new(store)) + } + true => Arc::new(CachedParquetFileReaderFactory::new(store)), + }), + )) + }) + .transpose() + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -378,9 +430,14 @@ impl FileFormat for ParquetFormat { conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { + let object_store_url = conf.object_store_url.clone(); let mut builder = ParquetExecBuilder::new_with_options(conf, self.options.clone()); + if let Some(reader_factory) = self.reader_factory(object_store_url)? { + builder = builder.with_parquet_file_reader_factory(reader_factory); + } + // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. @@ -1303,8 +1360,6 @@ mod tests { use datafusion_common::config::ParquetOptions; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::Utf8; - use datafusion_execution::object_store::ObjectStoreUrl; - use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::stream::BoxStream; use log::error; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 7d3342db5ccd..b4da6d4c7f4b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -60,7 +60,10 @@ use crate::datasource::schema_adapter::{ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader::{ + CachedParquetFileReaderFactory, DefaultParquetFileReaderFactory, + ParquetFileReaderFactory, +}; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs index 8a4ba136fc96..dcc9965680ca 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs @@ -20,14 +20,16 @@ use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics}; use bytes::Bytes; +use dashmap::DashMap; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, Either}; +use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::fmt::Debug; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; /// Interface for reading parquet files. /// @@ -65,7 +67,8 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { /// This implementation: /// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. /// 2. Reads the footer and page metadata on demand. -/// 3. Does not cache metadata or coalesce I/O operations. +/// 3. Does not cache metadata +/// 4. Does not coalesce I/O operations. #[derive(Debug)] pub struct DefaultParquetFileReaderFactory { store: Arc, @@ -78,6 +81,34 @@ impl DefaultParquetFileReaderFactory { } } +/// Caching implementation of [`ParquetFileReaderFactory`] +/// +/// This implementation: +/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. +/// 2. Reads the footer and page metadata on demand (but may cache them in the future). +/// 3. Ches metadata +/// 4. Does not coalesce I/O operations. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc, + /// The parquet metadata for each file in the index, keyed by the file name + /// (e.g. `file1.parquet`). + /// + /// There are two layers of Arc. The outer one allows sharing the lock while a future is + /// executing, while the inner one shares the metadata between readers once it is cached + metadata: DashMap>>>, +} + +impl CachedParquetFileReaderFactory { + /// Create a new `CachedParquetFileReaderFactory`. + pub fn new(store: Arc) -> Self { + Self { + store, + metadata: DashMap::new(), + } + } +} + /// Implements [`AsyncFileReader`] for a parquet file in object storage. /// /// This implementation uses the [`ParquetObjectReader`] to read data from the @@ -86,9 +117,12 @@ impl DefaultParquetFileReaderFactory { /// This implementation does not coalesce I/O operations or cache bytes. Such /// optimizations can be done either at the object store level or by providing a /// custom implementation of [`ParquetFileReaderFactory`]. +/// +/// It will cache file metadata if `metadata_cache` is set. pub(crate) struct ParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, + pub metadata_cache: Option>>>, } impl AsyncFileReader for ParquetFileReader { @@ -115,7 +149,24 @@ impl AsyncFileReader for ParquetFileReader { fn get_metadata( &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> { - self.inner.get_metadata() + Box::pin(match &self.metadata_cache { + None => Either::Left(self.inner.get_metadata()), + Some(metadata_cache) => Either::Right(match metadata_cache.get() { + Some(metadata) => { + Either::Left(std::future::ready(Ok(Arc::clone(metadata)))) + } + None => { + let metadata_cache = Arc::clone(&metadata_cache); + Either::Right(self.inner.get_metadata().inspect(move |metadata| { + if let Ok(metadata) = metadata { + // TODO: use metadata.try_insert when + // https://github.com/rust-lang/rust/issues/116693 is stabilized + metadata_cache.get_or_init(|| Arc::clone(metadata)); + } + })) + } + }), + }) } } @@ -142,6 +193,50 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { Ok(Box::new(ParquetFileReader { inner, file_metrics, + metadata_cache: None, + })) + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let filename = file_meta + .location() + .parts() + .last() + .expect("No path in location") + .as_ref() + .to_string(); + + // TODO: cache metrics? + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let object_store = Arc::clone(&self.store); + let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + let metadata = Arc::clone( + self.metadata + .entry(filename) + .or_insert_with(|| Arc::new(OnceLock::new())) + .value(), + ); + Ok(Box::new(ParquetFileReader { + inner, + file_metrics, + metadata_cache: Some(metadata), })) } }