Skip to content

Commit

Permalink
feat: Conditionally allow to keep partition_by columns when using PAR…
Browse files Browse the repository at this point in the history
…TITIONED BY enhancement (#11107)

* feat: conditionally allow to keep partition_by columns

* feat: add flag to file sink config, add tests

* this commit contains:
 - separate options by prefix 'hive.'
 - add hive_options to CopyTo struct
 - add more documentation
 - add session execution flag to enable feature, false by default

* do not add hive_options to CopyTo

* npx prettier

* fmt

* change prefix to execution. , update override order for condition.

* improve handling of flag, added test for config error

* trying to make CI happier

* prettier

* Update test

* update doc

---------

Co-authored-by: Héctor Veiga Ortiz <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Jun 28, 2024
1 parent 09b3c73 commit 330ece8
Show file tree
Hide file tree
Showing 22 changed files with 110 additions and 8 deletions.
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ config_namespace! {
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}

Expand Down Expand Up @@ -1294,6 +1296,10 @@ impl TableOptions {
return ConfigField::set(self, key, value);
}

if prefix == "execution" {
return Ok(());
}

let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl DataSink for ArrowFileSink {
part_col,
self.config.table_paths[0].clone(),
"arrow".into(),
self.config.keep_partition_by_columns,
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,9 @@ impl ParquetSink {
/// of hive style partitioning where some columns are removed from the
/// underlying files.
fn get_writer_schema(&self) -> Arc<Schema> {
if !self.config.table_partition_cols.is_empty() {
if !self.config.table_partition_cols.is_empty()
&& !self.config.keep_partition_by_columns
{
let schema = self.config.output_schema();
let partition_names: Vec<_> = self
.config
Expand Down Expand Up @@ -716,6 +718,7 @@ impl DataSink for ParquetSink {
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
self.config.keep_partition_by_columns,
);

let mut file_write_tasks: JoinSet<
Expand Down Expand Up @@ -1953,6 +1956,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -2047,6 +2051,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) fn start_demuxer_task(
partition_by: Option<Vec<(String, DataType)>>,
base_output_path: ListingTableUrl,
file_extension: String,
keep_partition_by_columns: bool,
) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
let context = context.clone();
Expand All @@ -91,6 +92,7 @@ pub(crate) fn start_demuxer_task(
parts,
base_output_path,
file_extension,
keep_partition_by_columns,
)
.await
})
Expand All @@ -111,7 +113,7 @@ pub(crate) fn start_demuxer_task(
(task, rx)
}

/// Dynamically partitions input stream to acheive desired maximum rows per file
/// Dynamically partitions input stream to achieve desired maximum rows per file
async fn row_count_demuxer(
mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
mut input: SendableRecordBatchStream,
Expand Down Expand Up @@ -240,6 +242,7 @@ async fn hive_style_partitions_demuxer(
partition_by: Vec<(String, DataType)>,
base_output_path: ListingTableUrl,
file_extension: String,
keep_partition_by_columns: bool,
) -> Result<()> {
let write_id =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
Expand Down Expand Up @@ -298,9 +301,11 @@ async fn hive_style_partitions_demuxer(
}
};

// remove partitions columns
let final_batch_to_send =
remove_partition_by_columns(&parted_batch, &partition_by)?;
let final_batch_to_send = if keep_partition_by_columns {
parted_batch
} else {
remove_partition_by_columns(&parted_batch, &partition_by)?
};

// Finally send the partial batch partitioned by distinct value!
part_tx.send(final_batch_to_send).await.map_err(|_| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub(crate) async fn stateless_multipart_put(
part_cols,
base_output_path.clone(),
file_extension,
config.keep_partition_by_columns,
);

let rb_buffer_size = &context
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,8 @@ impl TableProvider for ListingTable {
.await?;

let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
let keep_partition_by_columns =
state.config().options().execution.keep_partition_by_columns;

// Sink related option, apart from format
let config = FileSinkConfig {
Expand All @@ -898,6 +900,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct FileSinkConfig {
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
}

impl FileSinkConfig {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,16 @@ impl DefaultPhysicalPlanner {
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

let keep_partition_by_columns = match source_option_tuples
.get("execution.keep_partition_by_columns")
.map(|v| v.trim()) {
None => session_state.config().options().execution.keep_partition_by_columns,
Some("true") => true,
Some("false") => false,
Some(value) =>
return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
};

// Set file sink related options
let config = FileSinkConfig {
object_store_url,
Expand All @@ -783,6 +793,7 @@ impl DefaultPhysicalPlanner {
output_schema: Arc::new(schema),
table_partition_cols,
overwrite: false,
keep_partition_by_columns,
};

let sink_format = file_type_to_format(file_type)?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
partition_by,
file_type,
options,
partition_by,
})))
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ message FileSinkConfig {
datafusion_common.Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
bool overwrite = 8;
bool keep_partition_by_columns = 9;
}

message JsonSink {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
overwrite: conf.overwrite,
keep_partition_by_columns: conf.keep_partition_by_columns,
})
}
}
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
output_schema: Some(conf.output_schema.as_ref().try_into()?),
table_partition_cols,
overwrite: conf.overwrite,
keep_partition_by_columns: conf.keep_partition_by_columns,
})
}
}
3 changes: 3 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ fn roundtrip_json_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(JsonSink::new(
file_sink_config,
Expand Down Expand Up @@ -934,6 +935,7 @@ fn roundtrip_csv_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(CsvSink::new(
file_sink_config,
Expand Down Expand Up @@ -992,6 +994,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
overwrite: true,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ mod tests {
fn copy_to_multi_options() -> Result<(), ParserError> {
// order of options is preserved
let sql =
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)";
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'execution.keep_partition_by_columns' true)";

let expected_options = vec![
(
Expand All @@ -1486,6 +1486,10 @@ mod tests {
"format.compression".to_string(),
Value::SingleQuotedString("snappy".to_string()),
),
(
"execution.keep_partition_by_columns".to_string(),
Value::SingleQuotedString("true".to_string()),
),
];

let mut statements = DFParser::parse_sql(sql).unwrap();
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
Some(v) => v,
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
Expand Down
21 changes: 21 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--MemoryExec: partitions=1, partition_sizes=[1]

# Copy to directory as partitioned files with keep_partition_by_columns enabled
query TT
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)
OPTIONS (execution.keep_partition_by_columns true);
----
3

# validate generated file contains tables
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet4 STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table4/column1=1/*.parquet';

query TT
select column1, column2 from validate_partitioned_parquet4 order by column1,column2;
----
1 a

# Copy more files to directory via query
query IT
COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' STORED AS PARQUET;
Expand Down Expand Up @@ -589,3 +606,7 @@ COPY (select col2, sum(col1) from source_table
# Copy from table with non literal
query error DataFusion error: SQL error: ParserError\("Unexpected token \("\)
COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102);

# Copy using execution.keep_partition_by_columns with an invalid value
query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value"
COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value);
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.enable_recursive_ctes true
datafusion.execution.keep_partition_by_columns false
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
Expand Down Expand Up @@ -255,6 +256,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.keep_partition_by_columns false Should Datafusion keep the columns used for partition_by in the output RecordBatches
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
Expand Down
Loading

0 comments on commit 330ece8

Please sign in to comment.