Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix more instances of schema missing metadata #13068

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,14 @@ impl ParquetSink {
.iter()
.map(|(s, _)| s)
.collect();
Arc::new(Schema::new(
Arc::new(Schema::new_with_metadata(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
schema.metadata().clone(),
))
} else {
self.config.output_schema().clone()
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,16 @@ impl ListingTable {
builder.push(Field::new(part_col_name, part_col_type.clone(), false));
}

let table_schema = Arc::new(
builder
.finish()
.with_metadata(file_schema.metadata().clone()),
);

let table = Self {
table_paths: config.table_paths,
file_schema,
table_schema: Arc::new(builder.finish()),
table_schema,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ impl FileScanConfig {
column_statistics: table_cols_stats,
};

let projected_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);
let projected_schema = Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
));

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);
Expand Down Expand Up @@ -281,7 +282,12 @@ impl FileScanConfig {

fields.map_or_else(
|| Arc::clone(&self.file_schema),
|f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())),
|f| {
Arc::new(Schema::new_with_metadata(
f,
self.file_schema.metadata.clone(),
))
},
)
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ impl SchemaMapper for SchemaMapping {
// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new(fields));
let schema =
Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,14 +1014,21 @@ impl DefaultPhysicalPlanner {
})
.collect();

let metadata: HashMap<_, _> = left_df_schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one we should be able to write a test for -- perhaps with a query with a JOIN in https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/metadata.slt

.metadata()
.clone()
.into_iter()
.chain(right_df_schema.metadata().clone())
.collect();

// Construct intermediate schemas used for filtering data and
// convert logical expression to physical according to filter schema
let filter_df_schema = DFSchema::new_with_metadata(
filter_df_fields,
HashMap::new(),
metadata.clone(),
)?;
let filter_schema =
Schema::new_with_metadata(filter_fields, HashMap::new());
Schema::new_with_metadata(filter_fields, metadata);
let filter_expr = create_physical_expr(
expr,
&filter_df_schema,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,8 +1308,12 @@ pub fn build_join_schema(
join_type,
left.fields().len(),
);
let mut metadata = left.metadata().clone();
metadata.extend(right.metadata().clone());
let metadata = left
.metadata()
.clone()
.into_iter()
.chain(right.metadata().clone())
.collect();
let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
dfschema.with_functional_dependencies(func_dependencies)
}
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,10 @@ pub fn expand_qualified_wildcard(
return plan_err!("Invalid qualifier {qualifier}");
}

let qualified_schema = Arc::new(Schema::new(fields_with_qualified));
let qualified_schema = Arc::new(Schema::new_with_metadata(
fields_with_qualified,
schema.metadata().clone(),
));
let qualified_dfschema =
DFSchema::try_from_qualified_schema(qualifier.clone(), &qualified_schema)?
.with_functional_dependencies(projected_func_dependencies)?;
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,13 @@ pub fn build_join_schema(
.unzip(),
};

(fields.finish(), column_indices)
let metadata = left
.metadata()
.clone()
.into_iter()
.chain(right.metadata().clone())
.collect();
(fields.finish().with_metadata(metadata), column_indices)
}

/// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/windows/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ pub(crate) fn create_schema(
for expr in window_expr {
builder.push(expr.field()?);
}
Ok(builder.finish())
Ok(builder
.finish()
.with_metadata(input_schema.metadata().clone()))
}
Loading