Skip to content

Commit

Permalink
feat(planner): Allowing setting sort order of parquet files without s…
Browse files Browse the repository at this point in the history
…pecifying the schema (#12466)

* fix(planner): Allowing setting sort order of parquet files without specifying the schema
This PR allows for the following SQL query to be passed without a schema

create external table cpu stored as parquet location 'cpu.parquet' with order (time);

closes #7317

* chore: fmt'ing

* fix: fmt

* fix: remove test that checks for error with schema

* Add some more tests

* fix: use !asc

Co-authored-by: Andrew Lamb <[email protected]>

* feat: clean up some testing and modify statement when building order by expr

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
devanbenz and alamb authored Sep 21, 2024
1 parent e1b992a commit 515a64e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 16 deletions.
31 changes: 26 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Factory for creating ListingTables with default options
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -27,7 +28,7 @@ use crate::datasource::listing::{
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{arrow_datafusion_err, DataFusionError};
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
use datafusion_common::{config_datafusion_err, Result};
use datafusion_expr::CreateExternalTable;

Expand Down Expand Up @@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory {
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());
.with_table_partition_cols(table_partition_cols);

options
.validate_partitions(session_state, &table_path)
.await?;

let resolved_schema = match provided_schema {
None => options.infer_schema(session_state, &table_path).await?,
// We will need to check the table columns against the schema
// this is done so that we can do an ORDER BY for external table creation
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = schema.clone().to_dfschema()?;
let column_refs: HashSet<_> = cmd
.order_exprs
.iter()
.flat_map(|sort| sort.iter())
.flat_map(|s| s.expr.column_refs())
.collect();

for column in &column_refs {
if !df_schema.has_column(column) {
return plan_err!("Column {column} is not in schema");
}
}

schema
}
Some(s) => s,
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
Expand Down
30 changes: 26 additions & 4 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,11 +1136,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchemaRef,
planner_context: &mut PlannerContext,
) -> Result<Vec<Vec<SortExpr>>> {
// Ask user to provide a schema if schema is empty.
if !order_exprs.is_empty() && schema.fields().is_empty() {
return plan_err!(
"Provide a schema before specifying the order while creating a table."
);
let results = order_exprs
.iter()
.map(|lex_order| {
let result = lex_order
.iter()
.map(|order_by_expr| {
let ordered_expr = &order_by_expr.expr;
let ordered_expr = ordered_expr.to_owned();
let ordered_expr = self
.sql_expr_to_logical_expr(
ordered_expr,
schema,
planner_context,
)
.unwrap();
let asc = order_by_expr.asc.unwrap_or(true);
let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc);

SortExpr::new(ordered_expr, asc, nulls_first)
})
.collect::<Vec<SortExpr>>();
result
})
.collect::<Vec<Vec<SortExpr>>>();

return Ok(results);
}

let mut all_results = vec![];
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() {
quick_test(sql, expected);
}

#[test]
fn create_external_table_parquet_no_schema_sort_order() {
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)";
let expected = "CreateExternalTable: Bare { table: \"t\" }";
quick_test(sql, expected);
}

#[test]
fn equijoin_explicit_syntax() {
let sql = "SELECT id, order_id \
Expand Down
47 changes: 47 additions & 0 deletions datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,50 @@ OPTIONS (
format.delimiter '|',
has_header false,
compression gzip);

# Create an external parquet table and infer schema to order by

# query should succeed
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);

## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 ASC]
query TT
EXPLAIN SELECT id FROM t ORDER BY id ASC;
----
logical_plan
01)Sort: t.id ASC NULLS LAST
02)--TableScan: t projection=[id]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]

## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY
query TT
EXPLAIN SELECT id FROM t ORDER BY id DESC;
----
logical_plan
01)Sort: t.id DESC NULLS FIRST
02)--TableScan: t projection=[id]
physical_plan
01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]

statement ok
DROP TABLE t;

# Create table with non default sort order
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST);

## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST]
query TT
EXPLAIN SELECT id FROM t;
----
logical_plan TableScan: t projection=[id]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 DESC]

statement ok
DROP TABLE t;

# query should fail with bad column
statement error DataFusion error: Error during planning: Column foo is not in schema
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo);
7 changes: 0 additions & 7 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te
query error DataFusion error: Error during planning: Column a is not in schema
CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Create external table with DDL ordered columns without schema
# When schema is missing the query is expected to fail
query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Sort with duplicate sort expressions
# Table is sorted multiple times on the same column name and should not fail
statement ok
Expand Down

0 comments on commit 515a64e

Please sign in to comment.