Skip to content

Commit

Permalink
allow passing in metadata_size_hint on a per-file basis
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Nov 1, 2024
1 parent 6c5823e commit 569f90e
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 1 deletion.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ pub(crate) mod test_util {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}]];

let exec = format
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ pub async fn pruned_partition_list<'a>(
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
})
}));

Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct PartitionedFile {
pub statistics: Option<Statistics>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
/// The estimated size of the parquet metadata, in bytes
pub metadata_size_hint: Option<usize>,
}

impl PartitionedFile {
Expand All @@ -98,6 +100,7 @@ impl PartitionedFile {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}

Expand All @@ -115,10 +118,16 @@ impl PartitionedFile {
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
metadata_size_hint: None,
}
.with_range(start, end)
}

pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}

/// Return a file reference from the given path
pub fn from_path(path: String) -> Result<Self> {
let size = std::fs::metadata(path.clone())?.len();
Expand Down Expand Up @@ -156,6 +165,7 @@ impl From<ObjectMeta> for PartitionedFile {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ mod tests {
.collect::<Vec<_>>(),
}),
extensions: None,
metadata_size_hint: None,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ impl<F: FileOpener> FileStream<F> {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
metadata_size_hint: part_file.metadata_size_hint,
};

Some(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ pub struct FileMeta {
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
/// Size hint for the metadata of this file
pub metadata_size_hint: Option<usize>,
}

impl FileMeta {
Expand All @@ -262,6 +264,7 @@ impl From<ObjectMeta> for FileMeta {
object_meta,
range: None,
extensions: None,
metadata_size_hint: None,
}
}
}
Expand Down Expand Up @@ -776,6 +779,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}
}
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,7 @@ mod tests {
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}

Expand Down Expand Up @@ -1748,6 +1749,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let expected_schema = Schema::new(vec![
Expand Down Expand Up @@ -1835,6 +1837,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let file_schema = Arc::new(Schema::empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ impl FileOpener for ParquetOpener {
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint);

let mut reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
self.metadata_size_hint,
metadata_size_hint,
&self.metrics,
)?;

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let f1 = Field::new("id", DataType::Int32, true);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl TestParquetFile {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
});

let df_schema = self.schema.clone().to_dfschema_ref()?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
range: None,
statistics: None,
extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
metadata_size_hint: None,
})
.collect();

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let df_schema = schema.clone().to_dfschema().unwrap();
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?,
extensions: None,
metadata_size_hint: None,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/src/physical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub async fn from_substrait_rel(
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let part_index = file.partition_index as usize;
Expand Down

0 comments on commit 569f90e

Please sign in to comment.