diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 8647b5df90be..c4c2a82d1c4e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -711,13 +711,14 @@ impl ParquetSink { .iter() .map(|(s, _)| s) .collect(); - Arc::new(Schema::new( + Arc::new(Schema::new_with_metadata( schema .fields() .iter() .filter(|f| !partition_names.contains(&f.name())) .map(|f| (**f).clone()) .collect::>(), + schema.metadata().clone(), )) } else { self.config.output_schema().clone() diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1e9f06c20b47..ea2e098ef14e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -719,10 +719,16 @@ impl ListingTable { builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } + let table_schema = Arc::new( + builder + .finish() + .with_metadata(file_schema.metadata().clone()), + ); + let table = Self { table_paths: config.table_paths, file_schema, - table_schema: Arc::new(builder.finish()), + table_schema, options, definition: None, collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 2c438e8b0e78..415ea62b3bb3 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -248,9 +248,10 @@ impl FileScanConfig { column_statistics: table_cols_stats, }; - let projected_schema = Arc::new( - Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()), - ); + let projected_schema = Arc::new(Schema::new_with_metadata( + table_fields, + self.file_schema.metadata().clone(), + )); let projected_output_ordering = get_projected_output_ordering(self, &projected_schema); @@ -281,7 +282,12 @@ impl FileScanConfig { fields.map_or_else( || Arc::clone(&self.file_schema), - |f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())), + |f| { + Arc::new(Schema::new_with_metadata( + f, + self.file_schema.metadata.clone(), + )) + }, ) } diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index fdf3381758a4..131b8c354ce7 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -304,7 +304,8 @@ impl SchemaMapper for SchemaMapping { // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = Arc::new(Schema::new(fields)); + let schema = + Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 918ebccbeb70..cc1fe5fac9fe 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1014,14 +1014,21 @@ impl DefaultPhysicalPlanner { }) .collect(); + let metadata: HashMap<_, _> = left_df_schema + .metadata() + .clone() + .into_iter() + .chain(right_df_schema.metadata().clone()) + .collect(); + // Construct intermediate schemas used for filtering data and // convert logical expression to physical according to filter schema let filter_df_schema = DFSchema::new_with_metadata( filter_df_fields, - HashMap::new(), + metadata.clone(), )?; let filter_schema = - Schema::new_with_metadata(filter_fields, HashMap::new()); + Schema::new_with_metadata(filter_fields, metadata); let filter_expr = create_physical_expr( expr, &filter_df_schema, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 21304068a8ab..fa7de80026c8 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1308,8 +1308,12 @@ pub fn build_join_schema( join_type, left.fields().len(), ); - let mut metadata = left.metadata().clone(); - metadata.extend(right.metadata().clone()); + let metadata = left + .metadata() + .clone() + .into_iter() + .chain(right.metadata().clone()) + .collect(); let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; dfschema.with_functional_dependencies(func_dependencies) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 86562daf6909..bb5496c0f799 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -437,7 +437,10 @@ pub fn expand_qualified_wildcard( return plan_err!("Invalid qualifier {qualifier}"); } - let qualified_schema = Arc::new(Schema::new(fields_with_qualified)); + let qualified_schema = Arc::new(Schema::new_with_metadata( + fields_with_qualified, + schema.metadata().clone(), + )); let qualified_dfschema = DFSchema::try_from_qualified_schema(qualifier.clone(), &qualified_schema)? .with_functional_dependencies(projected_func_dependencies)?; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index c520e4271416..17a32a67c743 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -701,7 +701,13 @@ pub fn build_join_schema( .unzip(), }; - (fields.finish(), column_indices) + let metadata = left + .metadata() + .clone() + .into_iter() + .chain(right.metadata().clone()) + .collect(); + (fields.finish().with_metadata(metadata), column_indices) } /// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls diff --git a/datafusion/physical-plan/src/windows/utils.rs b/datafusion/physical-plan/src/windows/utils.rs index 3cf92daae0fb..13332ea82fa1 100644 --- a/datafusion/physical-plan/src/windows/utils.rs +++ b/datafusion/physical-plan/src/windows/utils.rs @@ -31,5 +31,7 @@ pub(crate) fn create_schema( for expr in window_expr { builder.push(expr.field()?); } - Ok(builder.finish()) + Ok(builder + .finish() + .with_metadata(input_schema.metadata().clone())) }