diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 2ae6b70ed1c5..120e27ecf669 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -52,7 +52,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, mut serializer: Box, mut writer: AbortableWrite>, - unbounded_input: bool, ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); @@ -71,9 +70,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store( "Unknown error writing to object store".into(), ) })?; - if unbounded_input { - tokio::task::yield_now().await; - } } Err(_) => { return Err(DataFusionError::Internal( @@ -140,7 +136,6 @@ type FileWriteBundle = (Receiver, SerializerType, WriterType); pub(crate) async fn stateless_serialize_and_write_files( mut rx: Receiver, tx: tokio::sync::oneshot::Sender, - unbounded_input: bool, ) -> Result<()> { let mut row_count = 0; // tracks if any writers encountered an error triggering the need to abort @@ -153,13 +148,7 @@ pub(crate) async fn stateless_serialize_and_write_files( let mut join_set = JoinSet::new(); while let Some((data_rx, serializer, writer)) = rx.recv().await { join_set.spawn(async move { - serialize_rb_stream_to_object_store( - data_rx, - serializer, - writer, - unbounded_input, - ) - .await + serialize_rb_stream_to_object_store(data_rx, serializer, writer).await }); } let mut finished_writers = Vec::new(); @@ -241,7 +230,6 @@ pub(crate) async fn stateless_multipart_put( let single_file_output = config.single_file_output; let base_output_path = &config.table_paths[0]; - let unbounded_input = config.unbounded_input; let part_cols = if !config.table_partition_cols.is_empty() { Some(config.table_partition_cols.clone()) } else { @@ -266,8 +254,7 @@ pub(crate) async fn stateless_multipart_put( let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2); let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); let write_coordinater_task = tokio::spawn(async move { - stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) - .await + stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt).await }); while let Some((location, rb_stream)) = file_stream_rx.recv().await { let serializer = get_serializer(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4c13d9d443ca..21d43dcd56db 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -38,7 +38,7 @@ use crate::datasource::{ }, get_statistics_with_limit, listing::ListingTableUrl, - physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig}, + physical_plan::{FileScanConfig, FileSinkConfig}, TableProvider, TableType, }; use crate::{ @@ -790,13 +790,6 @@ impl TableProvider for ListingTable { file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT - // queries. Thus, we can check if the plan is streaming to ensure file sink input is - // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` - // to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data), - // all of the data at the input is sink after execution finishes. See discussion for rationale: - // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918 - unbounded_input: is_plan_streaming(&input)?, single_file_output: self.options.single_file, overwrite, file_type_writer_options, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 9d1c373aee7c..4a6ebeab09e1 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -69,7 +69,6 @@ use arrow::{ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; -use datafusion_physical_plan::ExecutionPlan; use log::debug; use object_store::path::Path; @@ -93,8 +92,6 @@ pub struct FileSinkConfig { /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. pub single_file_output: bool, - /// If input is unbounded, tokio tasks need to yield to not block execution forever - pub unbounded_input: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size @@ -510,21 +507,6 @@ fn get_projected_output_ordering( all_orderings } -// Get output (un)boundedness information for the given `plan`. -pub(crate) fn is_plan_streaming(plan: &Arc) -> Result { - let result = if plan.children().is_empty() { - plan.unbounded_output(&[]) - } else { - let children_unbounded_output = plan - .children() - .iter() - .map(is_plan_streaming) - .collect::>>(); - plan.unbounded_output(&children_unbounded_output?) - }; - result -} - #[cfg(test)] mod tests { use arrow_array::cast::AsArray; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e5816eb49ebb..31d50be10f70 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -593,7 +593,6 @@ impl DefaultPhysicalPlanner { file_groups: vec![], output_schema: Arc::new(schema), table_partition_cols: vec![], - unbounded_input: false, single_file_output: *single_file_output, overwrite: false, file_type_writer_options diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 76fe449d2fa3..cc802ee95710 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1201,9 +1201,8 @@ message FileSinkConfig { Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; bool single_file_output = 7; - bool unbounded_input = 8; - bool overwrite = 9; - FileTypeWriterOptions file_type_writer_options = 10; + bool overwrite = 8; + FileTypeWriterOptions file_type_writer_options = 9; } message JsonSink { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0671757ad427..fb3a3ad91d06 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7500,9 +7500,6 @@ impl serde::Serialize for FileSinkConfig { if self.single_file_output { len += 1; } - if self.unbounded_input { - len += 1; - } if self.overwrite { len += 1; } @@ -7528,9 +7525,6 @@ impl serde::Serialize for FileSinkConfig { if self.single_file_output { struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?; } - if self.unbounded_input { - struct_ser.serialize_field("unboundedInput", &self.unbounded_input)?; - } if self.overwrite { struct_ser.serialize_field("overwrite", &self.overwrite)?; } @@ -7559,8 +7553,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "tablePartitionCols", "single_file_output", "singleFileOutput", - "unbounded_input", - "unboundedInput", "overwrite", "file_type_writer_options", "fileTypeWriterOptions", @@ -7574,7 +7566,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { OutputSchema, TablePartitionCols, SingleFileOutput, - UnboundedInput, Overwrite, FileTypeWriterOptions, } @@ -7604,7 +7595,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), - "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput), "overwrite" => Ok(GeneratedField::Overwrite), "fileTypeWriterOptions" | "file_type_writer_options" => Ok(GeneratedField::FileTypeWriterOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -7632,7 +7622,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut output_schema__ = None; let mut table_partition_cols__ = None; let mut single_file_output__ = None; - let mut unbounded_input__ = None; let mut overwrite__ = None; let mut file_type_writer_options__ = None; while let Some(k) = map_.next_key()? { @@ -7673,12 +7662,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } single_file_output__ = Some(map_.next_value()?); } - GeneratedField::UnboundedInput => { - if unbounded_input__.is_some() { - return Err(serde::de::Error::duplicate_field("unboundedInput")); - } - unbounded_input__ = Some(map_.next_value()?); - } GeneratedField::Overwrite => { if overwrite__.is_some() { return Err(serde::de::Error::duplicate_field("overwrite")); @@ -7700,7 +7683,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { output_schema: output_schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), single_file_output: single_file_output__.unwrap_or_default(), - unbounded_input: unbounded_input__.unwrap_or_default(), overwrite: overwrite__.unwrap_or_default(), file_type_writer_options: file_type_writer_options__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 771bd715d3c5..9030e90a24c8 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1635,10 +1635,8 @@ pub struct FileSinkConfig { #[prost(bool, tag = "7")] pub single_file_output: bool, #[prost(bool, tag = "8")] - pub unbounded_input: bool, - #[prost(bool, tag = "9")] pub overwrite: bool, - #[prost(message, optional, tag = "10")] + #[prost(message, optional, tag = "9")] pub file_type_writer_options: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 5c0ef615cacd..65f9f139a87b 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -739,7 +739,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, single_file_output: conf.single_file_output, - unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, file_type_writer_options: convert_required!(conf.file_type_writer_options)?, }) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index ea00b726b9d6..e9cdb34cf1b9 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -846,7 +846,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { output_schema: Some(conf.output_schema.as_ref().try_into()?), table_partition_cols, single_file_output: conf.single_file_output, - unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, file_type_writer_options: Some(file_type_writer_options.try_into()?), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 9a9827f2a090..2eb04ab6cbab 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -733,7 +733,6 @@ fn roundtrip_json_sink() -> Result<()> { output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], single_file_output: true, - unbounded_input: false, overwrite: true, file_type_writer_options: FileTypeWriterOptions::JSON(JsonWriterOptions::new( CompressionTypeVariant::UNCOMPRESSED,