Skip to content

Commit

Permalink
remove unbounded_input option
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Dec 20, 2023
1 parent b925b78 commit 68eed86
Show file tree
Hide file tree
Showing 10 changed files with 3 additions and 64 deletions.
17 changes: 2 additions & 15 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
unbounded_input: bool,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);
Expand All @@ -71,9 +70,6 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
"Unknown error writing to object store".into(),
)
})?;
if unbounded_input {
tokio::task::yield_now().await;
}
}
Err(_) => {
return Err(DataFusionError::Internal(
Expand Down Expand Up @@ -140,7 +136,6 @@ type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
pub(crate) async fn stateless_serialize_and_write_files(
mut rx: Receiver<FileWriteBundle>,
tx: tokio::sync::oneshot::Sender<u64>,
unbounded_input: bool,
) -> Result<()> {
let mut row_count = 0;
// tracks if any writers encountered an error triggering the need to abort
Expand All @@ -153,13 +148,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
let mut join_set = JoinSet::new();
while let Some((data_rx, serializer, writer)) = rx.recv().await {
join_set.spawn(async move {
serialize_rb_stream_to_object_store(
data_rx,
serializer,
writer,
unbounded_input,
)
.await
serialize_rb_stream_to_object_store(data_rx, serializer, writer).await
});
}
let mut finished_writers = Vec::new();
Expand Down Expand Up @@ -241,7 +230,6 @@ pub(crate) async fn stateless_multipart_put(

let single_file_output = config.single_file_output;
let base_output_path = &config.table_paths[0];
let unbounded_input = config.unbounded_input;
let part_cols = if !config.table_partition_cols.is_empty() {
Some(config.table_partition_cols.clone())
} else {
Expand All @@ -266,8 +254,7 @@ pub(crate) async fn stateless_multipart_put(
let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
let write_coordinater_task = tokio::spawn(async move {
stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input)
.await
stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt).await
});
while let Some((location, rb_stream)) = file_stream_rx.recv().await {
let serializer = get_serializer();
Expand Down
9 changes: 1 addition & 8 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::datasource::{
},
get_statistics_with_limit,
listing::ListingTableUrl,
physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
physical_plan::{FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
use crate::{
Expand Down Expand Up @@ -790,13 +790,6 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
// A plan can produce finite number of rows even if it has unbounded sources, like LIMIT
// queries. Thus, we can check if the plan is streaming to ensure file sink input is
// unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now`
// to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data),
// all of the data at the input is sink after execution finishes. See discussion for rationale:
// https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
unbounded_input: is_plan_streaming(&input)?,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
Expand Down
18 changes: 0 additions & 18 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ use arrow::{
use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlan;

use log::debug;
use object_store::path::Path;
Expand All @@ -93,8 +92,6 @@ pub struct FileSinkConfig {
/// regardless of input partitioning. Otherwise, each table path is assumed to be a directory
/// to which each output partition is written to its own output file.
pub single_file_output: bool,
/// If input is unbounded, tokio tasks need to yield to not block execution forever
pub unbounded_input: bool,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size
Expand Down Expand Up @@ -510,21 +507,6 @@ fn get_projected_output_ordering(
all_orderings
}

// Get output (un)boundedness information for the given `plan`.
pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
let result = if plan.children().is_empty() {
plan.unbounded_output(&[])
} else {
let children_unbounded_output = plan
.children()
.iter()
.map(is_plan_streaming)
.collect::<Result<Vec<_>>>();
plan.unbounded_output(&children_unbounded_output?)
};
result
}

#[cfg(test)]
mod tests {
use arrow_array::cast::AsArray;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,6 @@ impl DefaultPhysicalPlanner {
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
unbounded_input: false,
single_file_output: *single_file_output,
overwrite: false,
file_type_writer_options
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,6 @@ message FileSinkConfig {
Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
bool single_file_output = 7;
bool unbounded_input = 8;
bool overwrite = 9;
FileTypeWriterOptions file_type_writer_options = 10;
}
Expand Down
16 changes: 0 additions & 16 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
single_file_output: conf.single_file_output,
unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
file_type_writer_options: convert_required!(conf.file_type_writer_options)?,
})
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
output_schema: Some(conf.output_schema.as_ref().try_into()?),
table_partition_cols,
single_file_output: conf.single_file_output,
unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
file_type_writer_options: Some(file_type_writer_options.try_into()?),
})
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ fn roundtrip_json_sink() -> Result<()> {
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
single_file_output: true,
unbounded_input: false,
overwrite: true,
file_type_writer_options: FileTypeWriterOptions::JSON(JsonWriterOptions::new(
CompressionTypeVariant::UNCOMPRESSED,
Expand Down

0 comments on commit 68eed86

Please sign in to comment.