Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/better_scalar_api
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 20, 2023
2 parents a7f2903 + b925b78 commit d2fe8e7
Show file tree
Hide file tree
Showing 14 changed files with 1,171 additions and 496 deletions.
16 changes: 16 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ impl DFSchema {
.collect()
}

/// Find all fields indices having the given qualifier
pub fn fields_indices_with_qualified(
&self,
qualifier: &TableReference,
) -> Vec<usize> {
self.fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
field
.qualifier()
.and_then(|q| q.eq(qualifier).then_some(idx))
})
.collect()
}

/// Find all fields match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
self.fields
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl TableProviderFactory for StreamTableFactory {
.with_encoding(encoding)
.with_order(cmd.order_exprs.clone())
.with_header(cmd.has_header)
.with_batch_size(state.config().batch_size());
.with_batch_size(state.config().batch_size())
.with_constraints(cmd.constraints.clone());

Ok(Arc::new(StreamTable(Arc::new(config))))
}
Expand Down
92 changes: 61 additions & 31 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ use crate::physical_plan::{
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;

use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -769,14 +769,16 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{csv_exec_sorted, stream_exec_ordered};
use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

use rstest::rstest;

fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -2140,12 +2142,19 @@ mod tests {
Ok(())
}

#[rstest]
#[tokio::test]
async fn test_with_lost_ordering_unbounded() -> Result<()> {
async fn test_with_lost_ordering_unbounded_bounded(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
// create either bounded or unbounded source
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_ordered(&schema, sort_exprs)
};
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Expand All @@ -2154,50 +2163,71 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = [
// Expected inputs unbounded and bounded
let expected_input_unbounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
let expected_input_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];

// Expected unbounded result (same for with and without flag)
let expected_optimized_unbounded = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
)?) as _;
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = ["SortExec: expr=[a@0 ASC]",
// Expected bounded results with and without flag
let expected_optimized_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized = [
let expected_optimized_bounded_parallelize_sort = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, false);
let (expected_input, expected_optimized, expected_optimized_sort_parallelize) =
if source_unbounded {
(
expected_input_unbounded,
expected_optimized_unbounded.clone(),
expected_optimized_unbounded,
)
} else {
(
expected_input_bounded,
expected_optimized_bounded,
expected_optimized_bounded_parallelize_sort,
)
};
assert_optimized!(
expected_input,
expected_optimized,
physical_plan.clone(),
false
);
assert_optimized!(
expected_input,
expected_optimized_sort_parallelize,
physical_plan,
true
);
Ok(())
}

Expand Down
Loading

0 comments on commit d2fe8e7

Please sign in to comment.