diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7e3871e6b795..1ecdb0efd2c2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -309,6 +309,8 @@ config_namespace! { /// Currently experimental pub split_file_groups_by_statistics: bool, default = false + /// Should Datafusion keep the columns used for partition_by in the output RecordBatches + pub keep_partition_by_columns: bool, default = false } } @@ -1294,6 +1296,10 @@ impl TableOptions { return ConfigField::set(self, key, value); } + if prefix == "execution" { + return Ok(()); + } + let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 478a11d7e76e..9a3aa2454e27 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -277,6 +277,7 @@ impl DataSink for ArrowFileSink { part_col, self.config.table_paths[0].clone(), "arrow".into(), + self.config.keep_partition_by_columns, ); let mut file_write_tasks: JoinSet> = diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 44c9cc4ec4a9..89e69caffcef 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -626,7 +626,9 @@ impl ParquetSink { /// of hive style partitioning where some columns are removed from the /// underlying files. fn get_writer_schema(&self) -> Arc { - if !self.config.table_partition_cols.is_empty() { + if !self.config.table_partition_cols.is_empty() + && !self.config.keep_partition_by_columns + { let schema = self.config.output_schema(); let partition_names: Vec<_> = self .config @@ -716,6 +718,7 @@ impl DataSink for ParquetSink { part_col, self.config.table_paths[0].clone(), "parquet".into(), + self.config.keep_partition_by_columns, ); let mut file_write_tasks: JoinSet< @@ -1953,6 +1956,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![], overwrite: true, + keep_partition_by_columns: false, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -2047,6 +2051,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning overwrite: true, + keep_partition_by_columns: false, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index d82c2471c596..e29c877442cf 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -75,6 +75,7 @@ pub(crate) fn start_demuxer_task( partition_by: Option>, base_output_path: ListingTableUrl, file_extension: String, + keep_partition_by_columns: bool, ) -> (SpawnedTask>, DemuxedStreamReceiver) { let (tx, rx) = mpsc::unbounded_channel(); let context = context.clone(); @@ -91,6 +92,7 @@ pub(crate) fn start_demuxer_task( parts, base_output_path, file_extension, + keep_partition_by_columns, ) .await }) @@ -111,7 +113,7 @@ pub(crate) fn start_demuxer_task( (task, rx) } -/// Dynamically partitions input stream to acheive desired maximum rows per file +/// Dynamically partitions input stream to achieve desired maximum rows per file async fn row_count_demuxer( mut tx: UnboundedSender<(Path, Receiver)>, mut input: SendableRecordBatchStream, @@ -240,6 +242,7 @@ async fn hive_style_partitions_demuxer( partition_by: Vec<(String, DataType)>, base_output_path: ListingTableUrl, file_extension: String, + keep_partition_by_columns: bool, ) -> Result<()> { let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -298,9 +301,11 @@ async fn hive_style_partitions_demuxer( } }; - // remove partitions columns - let final_batch_to_send = - remove_partition_by_columns(&parted_batch, &partition_by)?; + let final_batch_to_send = if keep_partition_by_columns { + parted_batch + } else { + remove_partition_by_columns(&parted_batch, &partition_by)? + }; // Finally send the partial batch partitioned by distinct value! part_tx.send(final_batch_to_send).await.map_err(|_| { diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 3ae2122de827..a62b5715aeb3 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -224,6 +224,7 @@ pub(crate) async fn stateless_multipart_put( part_cols, base_output_path.clone(), file_extension, + config.keep_partition_by_columns, ); let rb_buffer_size = &context diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b2e81b87fa0d..ea4d396a14cb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -889,6 +889,8 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; + let keep_partition_by_columns = + state.config().options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { @@ -898,6 +900,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), overwrite, + keep_partition_by_columns, }; let unsorted: Vec> = vec![]; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 720e29e35582..a897895246e3 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -85,6 +85,8 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, + /// Controls whether partition columns are kept for the file + pub keep_partition_by_columns: bool, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5b8501baaad8..9dfc62f3a741 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -775,6 +775,16 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); + let keep_partition_by_columns = match source_option_tuples + .get("execution.keep_partition_by_columns") + .map(|v| v.trim()) { + None => session_state.config().options().execution.keep_partition_by_columns, + Some("true") => true, + Some("false") => false, + Some(value) => + return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))), + }; + // Set file sink related options let config = FileSinkConfig { object_store_url, @@ -783,6 +793,7 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols, overwrite: false, + keep_partition_by_columns, }; let sink_format = file_type_to_format(file_type)? diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f87151efd88b..11a8112650d1 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -279,9 +279,9 @@ impl LogicalPlanBuilder { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, + partition_by, file_type, options, - partition_by, }))) } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f2594ba10340..004d7320e21b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -754,6 +754,7 @@ message FileSinkConfig { datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; bool overwrite = 8; + bool keep_partition_by_columns = 9; } message JsonSink { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e8fbe954428a..ebfa783f8561 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5562,6 +5562,9 @@ impl serde::Serialize for FileSinkConfig { if self.overwrite { len += 1; } + if self.keep_partition_by_columns { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; if !self.object_store_url.is_empty() { struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; @@ -5581,6 +5584,9 @@ impl serde::Serialize for FileSinkConfig { if self.overwrite { struct_ser.serialize_field("overwrite", &self.overwrite)?; } + if self.keep_partition_by_columns { + struct_ser.serialize_field("keepPartitionByColumns", &self.keep_partition_by_columns)?; + } struct_ser.end() } } @@ -5602,6 +5608,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "table_partition_cols", "tablePartitionCols", "overwrite", + "keep_partition_by_columns", + "keepPartitionByColumns", ]; #[allow(clippy::enum_variant_names)] @@ -5612,6 +5620,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { OutputSchema, TablePartitionCols, Overwrite, + KeepPartitionByColumns, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5639,6 +5648,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "overwrite" => Ok(GeneratedField::Overwrite), + "keepPartitionByColumns" | "keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5664,6 +5674,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut output_schema__ = None; let mut table_partition_cols__ = None; let mut overwrite__ = None; + let mut keep_partition_by_columns__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ObjectStoreUrl => { @@ -5702,6 +5713,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } overwrite__ = Some(map_.next_value()?); } + GeneratedField::KeepPartitionByColumns => { + if keep_partition_by_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("keepPartitionByColumns")); + } + keep_partition_by_columns__ = Some(map_.next_value()?); + } } } Ok(FileSinkConfig { @@ -5711,6 +5728,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { output_schema: output_schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), overwrite: overwrite__.unwrap_or_default(), + keep_partition_by_columns: keep_partition_by_columns__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 93bf6c060227..1a3514dbd4f7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1128,6 +1128,8 @@ pub struct FileSinkConfig { pub table_partition_cols: ::prost::alloc::vec::Vec, #[prost(bool, tag = "8")] pub overwrite: bool, + #[prost(bool, tag = "9")] + pub keep_partition_by_columns: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 7783c1561185..e94bb3b8efcb 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -649,6 +649,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, overwrite: conf.overwrite, + keep_partition_by_columns: conf.keep_partition_by_columns, }) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 8583900e9fa7..375361261952 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -724,6 +724,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { output_schema: Some(conf.output_schema.as_ref().try_into()?), table_partition_cols, overwrite: conf.overwrite, + keep_partition_by_columns: conf.keep_partition_by_columns, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 03c72cfc32b1..106247b2d441 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -899,6 +899,7 @@ fn roundtrip_json_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + keep_partition_by_columns: true, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -934,6 +935,7 @@ fn roundtrip_csv_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + keep_partition_by_columns: true, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -992,6 +994,7 @@ fn roundtrip_parquet_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], overwrite: true, + keep_partition_by_columns: true, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index d2f3f508a316..5da7f7176509 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -1475,7 +1475,7 @@ mod tests { fn copy_to_multi_options() -> Result<(), ParserError> { // order of options is preserved let sql = - "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)"; + "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'execution.keep_partition_by_columns' true)"; let expected_options = vec![ ( @@ -1486,6 +1486,10 @@ mod tests { "format.compression".to_string(), Value::SingleQuotedString("snappy".to_string()), ), + ( + "execution.keep_partition_by_columns".to_string(), + Value::SingleQuotedString("true".to_string()), + ), ]; let mut statements = DFParser::parse_sql(sql).unwrap(); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 518972545a48..6cdb2f959cd8 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -888,6 +888,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } Some(v) => v, }; + if !(&key.contains('.')) { // If config does not belong to any namespace, assume it is // a format option and apply the format prefix for backwards diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 00bcea7ec154..21c34bc25cee 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -166,6 +166,23 @@ physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--MemoryExec: partitions=1, partition_sizes=[1] +# Copy to directory as partitioned files with keep_partition_by_columns enabled +query TT +COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) +OPTIONS (execution.keep_partition_by_columns true); +---- +3 + +# validate generated file contains tables +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet4 STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table4/column1=1/*.parquet'; + +query TT +select column1, column2 from validate_partitioned_parquet4 order by column1,column2; +---- +1 a + # Copy more files to directory via query query IT COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' STORED AS PARQUET; @@ -589,3 +606,7 @@ COPY (select col2, sum(col1) from source_table # Copy from table with non literal query error DataFusion error: SQL error: ParserError\("Unexpected token \("\) COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); + +# Copy using execution.keep_partition_by_columns with an invalid value +query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value" +COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value); \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 3cc837aa8ee9..ee64f772917c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -173,6 +173,7 @@ datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false datafusion.execution.enable_recursive_ctes true +datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 @@ -255,6 +256,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs +datafusion.execution.keep_partition_by_columns false Should Datafusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c5f22725e0a3..0f0aa8460448 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -86,6 +86,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | +| datafusion.execution.keep_partition_by_columns | false | Should Datafusion keep the columns used for partition_by in the output RecordBatches | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 42e0c8054c9b..dd016cabbfb7 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -39,7 +39,10 @@ TO 'file_name' clause is not specified, it will be inferred from the file extension if possible. `PARTITIONED BY` specifies the columns to use for partitioning the output files into -separate hive-style directories. +separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed +from the output format. If you want to keep the columns, you should provide the option +`execution.keep_partition_by_columns true`. `execution.keep_partition_by_columns` flag can also +be enabled through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 3c4790dd0255..6fb4ef215ff1 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out to a folder of parq ## Available Options +### Execution Specific Options + +The following options are available when executing a `COPY` query. + +| Option | Description | Default Value | +| ----------------------------------- | ---------------------------------------------------------------------------------- | ------------- | +| execution.keep_partition_by_columns | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | + +Note: `execution.keep_partition_by_columns` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. + ### JSON Format Specific Options The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail.