Skip to content

Commit

Permalink
Remove more duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak committed Jan 14, 2025
1 parent bac23bf commit 713537d
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 93 deletions.
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -307,10 +306,6 @@ impl DataSink for ArrowFileSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -725,10 +724,6 @@ impl DataSink for CsvSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;

use async_trait::async_trait;
Expand Down Expand Up @@ -372,10 +371,6 @@ impl DataSink for JsonSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -878,10 +877,6 @@ impl DataSink for ParquetSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
Expand Down
55 changes: 24 additions & 31 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,48 +93,41 @@ pub(crate) fn start_demuxer_task(
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
let base_output_path = &config.table_paths[0];
let part_cols = if !config.table_partition_cols.is_empty() {
Some(config.table_partition_cols.clone())
} else {
None
};

let (tx, rx) = mpsc::unbounded_channel();
let context = Arc::clone(context);
let single_file_output =
!base_output_path.is_collection() && base_output_path.file_extension().is_some();
let base_output_path_clone = base_output_path.clone();
let keep_partition_by_columns = config.keep_partition_by_columns;
let file_extension = config.file_extension.clone();
let task = match part_cols {
Some(parts) => {
// There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
// bound this channel without risking a deadlock.
SpawnedTask::spawn(async move {
hive_style_partitions_demuxer(
tx,
data,
context,
parts,
base_output_path_clone,
file_extension,
keep_partition_by_columns,
)
.await
})
}
None => SpawnedTask::spawn(async move {
let base_output_path = config.table_paths[0].clone();
let task = if config.table_partition_cols.is_empty() {
let single_file_output = !base_output_path.is_collection()
&& base_output_path.file_extension().is_some();
SpawnedTask::spawn(async move {
row_count_demuxer(
tx,
data,
context,
base_output_path_clone,
base_output_path,
file_extension,
single_file_output,
)
.await
}),
})
} else {
// There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
// bound this channel without risking a deadlock.
let partition_by = config.table_partition_cols.clone();
let keep_partition_by_columns = config.keep_partition_by_columns;
SpawnedTask::spawn(async move {
hive_style_partitions_demuxer(
tx,
data,
context,
partition_by,
base_output_path,
file_extension,
keep_partition_by_columns,
)
.await
})
};

(task, rx)
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,15 @@ pub(crate) async fn spawn_writer_tasks_and_join(
let writer =
create_writer(compression, &location, Arc::clone(&object_store)).await?;

tx_file_bundle
if tx_file_bundle
.send((rb_stream, Arc::clone(&serializer), writer))
.await
.map_err(|_| {
internal_datafusion_err!(
"Writer receive file bundle channel closed unexpectedly!"
)
})?;
.is_err()
{
internal_datafusion_err!(
"Writer receive file bundle channel closed unexpectedly!"
);
}
}

// Signal to the write coordinator that no more files are coming
Expand All @@ -280,9 +281,8 @@ pub(crate) async fn spawn_writer_tasks_and_join(
r1.map_err(DataFusionError::ExecutionJoin)??;
r2.map_err(DataFusionError::ExecutionJoin)??;

let total_count = rx_row_cnt.await.map_err(|_| {
// Return total row count:
rx_row_cnt.await.map_err(|_| {
internal_datafusion_err!("Did not receive row count from write coordinator")
})?;

Ok(total_count)
})
}
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,17 +1055,17 @@ impl TableProvider for ListingTable {

let order_requirements = if !self.options().file_sort_order.is_empty() {
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
.try_create_output_ordering()?
.first()
.ok_or(DataFusionError::Internal(
"Expected ListingTable to have a sort order, but none found!".into(),
))?
.clone();
let orderings = self.try_create_output_ordering()?;
let Some(ordering) = orderings.first() else {
return internal_err!(
"Expected ListingTable to have a sort order, but none found!"
);
};
// Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
Some(LexRequirement::new(
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
))
Expand Down
12 changes: 2 additions & 10 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use futures::StreamExt;
Expand Down Expand Up @@ -293,11 +292,8 @@ impl TableProvider for MemTable {
if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
let sink = Arc::new(MemSink::try_new(
self.batches.clone(),
Arc::clone(&self.schema),
)?);
Ok(Arc::new(DataSinkExec::new(input, sink, None)))
let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?;
Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
}

fn get_column_default(&self, column: &str) -> Option<&Expr> {
Expand Down Expand Up @@ -349,10 +345,6 @@ impl DataSink for MemSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
&self.schema
}
Expand Down
12 changes: 3 additions & 9 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ pub trait FileSink: DataSink {
context: &Arc<TaskContext>,
) -> Result<u64> {
let config = self.config();
let object_store = config.get_object_store(context)?;
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
self.spawn_writer_tasks_and_join(
context,
Expand Down Expand Up @@ -139,14 +141,6 @@ impl FileSinkConfig {
pub fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}

/// Get object store from task context
pub fn get_object_store(
&self,
context: &Arc<TaskContext>,
) -> Result<Arc<dyn ObjectStore>> {
context.runtime_env().object_store(&self.object_store_url)
}
}

impl Debug for FileScanConfig {
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
Expand Down Expand Up @@ -412,10 +411,6 @@ impl DataSink for StreamWrite {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn schema(&self) -> &SchemaRef {
self.0.source.schema()
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync {
/// [DataSink].
///
/// See [ExecutionPlan::metrics()] for more details
fn metrics(&self) -> Option<MetricsSet>;
fn metrics(&self) -> Option<MetricsSet> {
None
}

/// Returns the sink schema
fn schema(&self) -> &SchemaRef;
Expand Down

0 comments on commit 713537d

Please sign in to comment.