diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 778950cbf926e..c6851f8db705e 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -60,6 +60,7 @@ cargo run --example csv_sql - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es +- ['parquet_index.rs'](examples/parquet_index.rs): Create an index several parquet files and use it to speed up queries - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file - [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs new file mode 100644 index 0000000000000..b9ea383780b49 --- /dev/null +++ b/datafusion-examples/examples/parquet_index.rs @@ -0,0 +1,608 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + new_null_array, Array, ArrayRef, AsArray, Int32Array, RecordBatch, StringArray, + UInt64Array, +}; +use arrow::datatypes::{Int32Type, UInt64Type}; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, FieldRef, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{ + FileGroupPartitioner, FileScanConfig, ParquetExec, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet; +use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::metadata::ColumnChunkMetaData; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion::parquet::file::statistics::ValueStatistics; +use datafusion::parquet::schema::types::SchemaDescriptor; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::config::TableParquetOptions; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, Result, Statistics, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::TableType; +use std::any::Any; +use std::fmt::Display; +use std::fs; +use std::fs::{DirEntry, File}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; +use url::Url; + +/// This example demonstrates building an index across multiple Parquet files and +/// using that index to skip reading ("Prune") files that do not contain relevant data. +/// +/// Note this is a low level example to demonstrate how to build such a custom index. +/// If you want to read a directory of parquet files as a table, you should probably use a higher level API such as +/// [`SessionContext::read_parquet`] or [`ListingTable`] instead. +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // Create some dummy data for this example + let data = DemoData::try_new()?; + + // Create the table provider that knows how to read the parquet files and their metadata + let provider = IndexTableProvider::try_new(data.path())?; + + // Create a SessionContext that knows how to read from the table provider as the "index_table" + let mut config = SessionConfig::new(); + config.options_mut().catalog.information_schema = true; + let ctx = SessionContext::new_with_config(config); + ctx.register_table("index_table", Arc::new(provider))?; + // register file:// object store provider + // Get this error if not there: + // Error: Internal("No suitable object store found for file://") + // TODO: should make the error more helpful (and add an example of how to register local file object store) + // todo add example of how to register local file object store + let url = Url::try_from("file://") + .map_err(|e| internal_datafusion_err!("can't parse file url: {e}"))?; + let object_store = object_store::local::LocalFileSystem::new(); + ctx.runtime_env() + .register_object_store(&url, Arc::new(object_store)); + + println!("Tables in the information schema:"); + ctx.sql("SELECT * FROM information_schema.tables") + .await? + .show() + .await?; + + println!("Schema"); + ctx.sql("describe index_table").await?.show().await?; + + println!("Data in the index table:"); + ctx.sql("SELECT file_name, value FROM index_table LIMIT 10") + .await? + .show() + .await?; + + Ok(()) +} + +/// DataFusion `TableProvider` that uses an index to decide which Parquet files +/// to read and (eventually) what row groups to project using a the predicates +/// and a secondary index +/// +/// It builds data like: +/// ```text +/// data_dir: +/// file1.parquet +/// file2.parquet +/// index: +/// stored index +///``` +pub struct IndexTableProvider { + /// The index of the parquet files in the directory + index: ParquetMetadataIndex, + /// The files (TODO remove) + files: Vec>, +} + +impl IndexTableProvider { + /// Create a new IndexTableProvider + pub fn try_new(dir: impl Into) -> Result { + // Create an index of the parquet files in the directory as we see them. + let mut index_builder = ParquetMetadataIndexBuilder::new(); + + let dir = dir.into(); + + let files = read_dir(&dir)?; + for file in &files { + index_builder.add_file(&file.path())?; + } + + let index = index_builder.build()?; + + println!("Index:\n{index}"); + + // todo make a nicer API for this ("partitioned files" from directory) + let files = files + .iter() + .map(|f| { + let path = fs::canonicalize(f.path())?; + Ok(PartitionedFile::new( + path.display().to_string(), + f.metadata()?.len(), + )) + }) + .collect::>>()?; + let files = vec![files]; + + Ok(Self { index, files }) + } +} + +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.index.schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + // todo: apply index, etc. + + let df_schema = DFSchema::try_from(self.schema())?; + // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2` + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()?; + + let object_store_url = ObjectStoreUrl::parse("file://")?; + + let file_groups = FileGroupPartitioner::new() + .with_target_partitions(state.config().target_partitions()) + .repartition_file_groups(&self.files) + .unwrap_or_else(|| self.files.clone()); + + // for now, simply use ParquetExec + // TODO make a builder for FileScanConfig + let base_config = FileScanConfig { + object_store_url, + file_schema: self.schema(), + file_groups, + statistics: Statistics::new_unknown(self.index.schema()), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: vec![], + }; + + let metadata_size_hint = None; + + let table_parquet_options = TableParquetOptions::default(); + + // TODO make a builder for parquet exec + let exec = ParquetExec::new( + base_config, + predicate, + metadata_size_hint, + table_parquet_options, + ); + + Ok(Arc::new(exec)) + } +} + +/// Simple in memory index for a set of parquet files +/// +/// The index is represented as an arrow `RecordBatch` that can be passed +/// directly by the DataFusion [`PruningPredicate`] API +/// +/// The index looks like +/// ``` +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file_name | file_size | row_count | value_column_min | value_column_max | +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file1.parquet | 6062 | 100 | 0 | 99 | +/// | file2.parquet | 6062 | 100 | 100 | 199 | +/// | file3.parquet | 163310 | 2800 | 200 | 2999 | +/// +---------------+-----------+-----------+------------------+------------------+ +/// ``` +/// +/// Note a more advanced index would store this information for each row group +/// within a file + +#[derive(Debug)] +struct ParquetMetadataIndex { + file_schema: SchemaRef, + index: RecordBatch, +} + +impl Display for ParquetMetadataIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "ParquetMetadataIndex")?; + write!( + f, + "{}", + pretty_format_batches(&[self.index.clone()]).unwrap() + ) + } +} + +impl ParquetMetadataIndex { + fn schema(&self) -> &SchemaRef { + &self.file_schema + } +} + +/// Builds [`ParquetMetadataIndex`] from a set of parquet files +#[derive(Debug, Default)] +struct ParquetMetadataIndexBuilder { + file_schema: Option, + filenames: Vec, + file_sizes: Vec, + row_counts: Vec, + /// Holds the min/max value of the value column + value_column_mins: Vec, + value_column_maxs: Vec, +} + +impl ParquetMetadataIndexBuilder { + fn new() -> Self { + Self::default() + } + + /// Add a file to the index + fn add_file(&mut self, file: &Path) -> Result<()> { + let file_name = file + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + let file_size = file.metadata()?.len(); + + println!("Adding file {file_name}"); + + let file = File::open(file).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {file:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + // Get the schema of the file. A real system might have to handle the + // case where the schema of the file is not the same as the schema of + // the other files e.g. using SchemaAdapter. + if self.file_schema.is_none() { + self.file_schema = Some(reader.schema().clone()); + } + + // extract the statistics from the file + let metadata = reader.metadata(); + + // TODO: extract the min/max values for each row group + let stats = parquet_stats_to_arrow("value", &reader)?; + // our example has no nulls, so this is a sanity check + assert_eq!(stats.row_count.null_count(), 0); + assert_eq!(stats.min.null_count(), 0); + assert_eq!(stats.max.null_count(), 0); + + // compute the total row count, min of the value column and max of the + // value column in this file + let row_count = stats + .row_count + .as_primitive::() + .iter() + .flatten() + .sum::(); + let value_column_min = stats + .min + .as_primitive::() + .iter() + .flatten() + .min() + .unwrap_or_default(); + let value_column_max = stats + .max + .as_primitive::() + .iter() + .flatten() + .max() + .unwrap_or_default(); + + // sanity check the statistics + assert_eq!(row_count, metadata.file_metadata().num_rows() as u64); + self.add_row( + file_name, + file_size, + row_count, + value_column_min, + value_column_max, + ); + Ok(()) + } + + /// Add a single row values to all the in progress rows + fn add_row( + &mut self, + file_name: impl Into, + file_size: u64, + row_count: u64, + value_column_min: i32, + value_column_max: i32, + ) { + self.filenames.push(file_name.into()); + self.file_sizes.push(file_size); + self.row_counts.push(row_count); + self.value_column_mins.push(value_column_min); + self.value_column_maxs.push(value_column_max); + } + + /// Build the index from the files added + fn build(self) -> Result { + let Some(file_schema) = self.file_schema else { + return Err(internal_datafusion_err!("No files added to index")); + }; + + let index = RecordBatch::try_from_iter(vec![ + ( + "file_name", + Arc::new(StringArray::from(self.filenames)) as ArrayRef, + ), + ( + "file_size", + Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef, + ), + ( + "row_count", + Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef, + ), + ( + "value_column_min", + Arc::new(Int32Array::from(self.value_column_mins)) as ArrayRef, + ), + ( + "value_column_max", + Arc::new(Int32Array::from(self.value_column_maxs)) as ArrayRef, + ), + ])?; + + Ok(ParquetMetadataIndex { file_schema, index }) + } +} + +/// TODO use the new +/// API from https://github.com/apache/datafusion/issues/10453 +pub struct ArrowStatistics { + /// min values + min: ArrayRef, + /// max values + max: ArrayRef, + /// Row counts (UInt64Array) + row_count: ArrayRef, + /// Null Counts (UInt64Array) + #[allow(dead_code)] + null_count: ArrayRef, +} + +/// extract the minimum value in the statistics for the given column, if any +pub fn parquet_stats_to_arrow( + column_name: &str, + // todo only take the fields of this we need + reader: &ParquetRecordBatchReaderBuilder, +) -> Result { + let metadata = reader.metadata(); + + let row_counts: Vec = metadata + .row_groups() + .iter() + .map(|rg_metadata| rg_metadata.num_rows() as u64) + .collect(); + + let (parquet_idx, _) = + parquet_column(reader.parquet_schema(), reader.schema(), column_name) + .ok_or_else(|| internal_datafusion_err!("Column not found: {column_name}"))?; + + // Now find the min/max values for the column + let column_meta: Vec<&ColumnChunkMetaData> = metadata + .row_groups() + .iter() + .map(|rg_metadata| rg_metadata.column(parquet_idx)) + .collect(); + + // handle only option i32 for now + let mins: Vec> = column_meta + .iter() + .map(|column_chunk_meta_data| { + let stats = get_stats(column_chunk_meta_data)?; + let stats = try_as_i32(stats)?; + Some(*stats.min()) + }) + .collect(); + + let maxes: Vec> = column_meta + .iter() + .map(|column_chunk_meta_data| { + let stats = get_stats(column_chunk_meta_data)?; + let stats = try_as_i32(stats)?; + Some(*stats.max()) + }) + .collect(); + + // column chunk metadata doesn't have the null counts (that is on the page index) + let null_count = new_null_array(&DataType::UInt64, column_meta.len()); + + Ok(ArrowStatistics { + min: Arc::new(Int32Array::from(mins)), + max: Arc::new(Int32Array::from(maxes)), + row_count: Arc::new(UInt64Array::from(row_counts)), + null_count, + }) +} + +/// Return a Some(stats) if min/max is set, None otherwise +fn get_stats( + column_chunk_meta_data: &ColumnChunkMetaData, +) -> Option<&parquet::file::statistics::Statistics> { + let stats = column_chunk_meta_data.statistics()?; + if stats.has_min_max_set() { + Some(stats) + } else { + None + } +} + +/// Return the statistics as ValuesStatistcs if the column is i32 +/// otherwise return None +fn try_as_i32( + statistics: &parquet::file::statistics::Statistics, +) -> Option<&ValueStatistics> { + if let parquet::file::statistics::Statistics::Int32(statistics) = statistics { + Some(statistics) + } else { + None + } +} + +/// COPIED FROM https://github.com/apache/datafusion/blob/465c89f7f16d48b030d4a384733567b91dab88fa/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L153C1-L176C2 +/// +/// TODO Should we move this to the parquet crate ? +/// +/// Lookups up the parquet column by name +/// +/// Returns the parquet column index and the corresponding arrow field +fn parquet_column<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + name: &str, +) -> Option<(usize, &'a FieldRef)> { + let (root_idx, field) = arrow_schema.fields.find(name)?; + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + Some((parquet_idx, field)) +} + +/// Return a list of the directory entries in the given directory, sorted by name +fn read_dir(dir: &Path) -> Result> { + let mut files = dir + .read_dir() + .map_err(|e| { + DataFusionError::from(e).context(format!("Error reading directory {dir:?}")) + })? + .map(|entry| { + entry.map_err(|e| { + DataFusionError::from(e) + .context(format!("Error reading directory entry in {dir:?}")) + }) + }) + .collect::>>()?; + files.sort_by_key(|entry| entry.file_name()); + Ok(files) +} + +/// Demonstration Data +/// +/// Makes a directory with three files +/// +/// * file1.parquet (values 0..100) +/// * file2.parquet (values 100..200) +/// * file3.parquet (values 200..3000) +struct DemoData { + tmpdir: TempDir, +} + +impl DemoData { + fn try_new() -> Result { + let tmpdir = TempDir::new()?; + make_demo_file(tmpdir.path().join("file1.parquet"), 0..100)?; + make_demo_file(tmpdir.path().join("file2.parquet"), 100..200)?; + make_demo_file(tmpdir.path().join("file3.parquet"), 200..3000)?; + + Ok(Self { tmpdir }) + } + + fn path(&self) -> PathBuf { + self.tmpdir.path().into() + } +} + +/// Creates a new parquet file at the specified path. +/// +/// The file has row groups with at most 10 rows, with values increasing +/// sequentially from `min_value` to `max_value` with the following schema: +/// +/// * file_name: Utf8 +/// * value: Int32 +/// +fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> { + let path = path.as_ref(); + let file = File::create(path)?; + let filename = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + + let num_values = value_range.len(); + let file_names = + StringArray::from_iter_values(std::iter::repeat(&filename).take(num_values)); + let values = Int32Array::from_iter_values(value_range); + let batch = RecordBatch::try_from_iter(vec![ + ("file_name", Arc::new(file_names) as ArrayRef), + ("value", Arc::new(values) as ArrayRef), + ])?; + + let schema = batch.schema(); + + // write the actual values to the file + let props = WriterProperties::builder() + .set_max_row_group_size(10) + .build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props))?; + writer.write(&batch)?; + writer.finish()?; + + Ok(()) +}