-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Round robin polling between tied winners in sort preserving merge #13133
Changes from 45 commits
4926799
666f9fe
449c930
2566bab
6000d87
5cd8733
74d3d9b
e628764
4277eec
625b925
e4f9bfe
3fa2e32
e8da793
d22ba25
aadf69c
4b2a4ac
920fe6a
d80286c
750261a
79a6df0
afdd981
6358843
c9337c6
dbc1037
bc81681
e4970f5
e7abf68
7cb1198
5f4c83e
e418d6c
288e2fe
bca3bde
23d5fc7
2d02aec
010f869
3605f85
18f86e8
8adf14e
8d6c0a6
a18cba8
d2f3a84
905eea7
98135ee
1b418c4
8297f58
b652802
f68fa9b
b63a631
666b3fe
003fce3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,145 @@ | ||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||
// or more contributor license agreements. See the NOTICE file | ||||
// distributed with this work for additional information | ||||
// regarding copyright ownership. The ASF licenses this file | ||||
// to you under the Apache License, Version 2.0 (the | ||||
// "License"); you may not use this file except in compliance | ||||
// with the License. You may obtain a copy of the License at | ||||
// | ||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||
// | ||||
// Unless required by applicable law or agreed to in writing, | ||||
// software distributed under the License is distributed on an | ||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||
// KIND, either express or implied. See the License for the | ||||
// specific language governing permissions and limitations | ||||
// under the License. | ||||
|
||||
use std::sync::Arc; | ||||
|
||||
use arrow::record_batch::RecordBatch; | ||||
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray}; | ||||
use datafusion_execution::TaskContext; | ||||
use datafusion_physical_expr::expressions::col; | ||||
use datafusion_physical_expr::PhysicalSortExpr; | ||||
use datafusion_physical_plan::memory::MemoryExec; | ||||
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; | ||||
use datafusion_physical_plan::{collect, ExecutionPlan}; | ||||
|
||||
use criterion::async_executor::FuturesExecutor; | ||||
use criterion::{black_box, criterion_group, criterion_main, Criterion}; | ||||
|
||||
fn generate_spm_for_round_robin_tie_breaker( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are already some SortPreservingMerge benchmarks in datafusion/datafusion/core/benches/sort.rs Line 161 in 223bb02
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we done this in follow on PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have suggested to @jayzhan211 to take the first steps in creating operator-specific benchmarks. I believe there's already a goal for this (I recall an older issue related to it). Perhaps we should extract these benchmarks from core and port them here @alamb ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Operator specific benchmarks sounds good to me -- I think it is very valuable to the have the benchmarks for the same operator together so they can be found / don't get duplicated |
||||
has_same_value: bool, | ||||
enable_round_robin_repartition: bool, | ||||
batch_count: usize, | ||||
partition_count: usize, | ||||
) -> SortPreservingMergeExec { | ||||
let row_size = 256; | ||||
let rb = if has_same_value { | ||||
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); | ||||
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size])); | ||||
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size])); | ||||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||||
} else { | ||||
let v = (0i32..row_size as i32).collect::<Vec<_>>(); | ||||
let a: ArrayRef = Arc::new(Int32Array::from(v)); | ||||
|
||||
// Use alphanumeric characters | ||||
let charset: Vec<char> = | ||||
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | ||||
.chars() | ||||
.collect(); | ||||
|
||||
let mut strings = Vec::new(); | ||||
for i in 0..256 { | ||||
let mut s = String::new(); | ||||
s.push(charset[i % charset.len()]); | ||||
s.push(charset[(i / charset.len()) % charset.len()]); | ||||
strings.push(Some(s)); | ||||
} | ||||
|
||||
let b: ArrayRef = Arc::new(StringArray::from_iter(strings)); | ||||
|
||||
let v = (0i64..row_size as i64).collect::<Vec<_>>(); | ||||
let c: ArrayRef = Arc::new(Int64Array::from_iter(v)); | ||||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||||
}; | ||||
|
||||
let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>(); | ||||
let partitiones = vec![rbs.clone(); partition_count]; | ||||
|
||||
let schema = rb.schema(); | ||||
let sort = vec![ | ||||
PhysicalSortExpr { | ||||
expr: col("b", &schema).unwrap(), | ||||
options: Default::default(), | ||||
}, | ||||
PhysicalSortExpr { | ||||
expr: col("c", &schema).unwrap(), | ||||
options: Default::default(), | ||||
}, | ||||
]; | ||||
|
||||
let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap(); | ||||
SortPreservingMergeExec::new(sort, Arc::new(exec)) | ||||
.with_round_robin_repartition(enable_round_robin_repartition) | ||||
} | ||||
|
||||
fn run_bench( | ||||
c: &mut Criterion, | ||||
has_same_value: bool, | ||||
enable_round_robin_repartition: bool, | ||||
batch_count: usize, | ||||
partition_count: usize, | ||||
description: &str, | ||||
) { | ||||
let task_ctx = TaskContext::default(); | ||||
let task_ctx = Arc::new(task_ctx); | ||||
|
||||
let spm = Arc::new(generate_spm_for_round_robin_tie_breaker( | ||||
has_same_value, | ||||
enable_round_robin_repartition, | ||||
batch_count, | ||||
partition_count, | ||||
)) as Arc<dyn ExecutionPlan>; | ||||
|
||||
c.bench_function(description, |b| { | ||||
b.to_async(FuturesExecutor) | ||||
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx)))) | ||||
}); | ||||
} | ||||
|
||||
fn criterion_benchmark(c: &mut Criterion) { | ||||
let params = [ | ||||
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker | ||||
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker | ||||
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker | ||||
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker | ||||
]; | ||||
|
||||
let batch_counts = [1, 25, 625]; | ||||
let partition_counts = [2, 8, 32]; | ||||
|
||||
for &(has_same_value, enable_round_robin_repartition, cardinality_label) in ¶ms { | ||||
for &batch_count in &batch_counts { | ||||
for &partition_count in &partition_counts { | ||||
let description = format!( | ||||
"{}_batch_count_{}_partition_count_{}", | ||||
cardinality_label, batch_count, partition_count | ||||
); | ||||
run_bench( | ||||
c, | ||||
has_same_value, | ||||
enable_round_robin_repartition, | ||||
batch_count, | ||||
partition_count, | ||||
&description, | ||||
); | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
criterion_group!(benches, criterion_benchmark); | ||||
criterion_main!(benches); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please document what
pool_size_per_consumer
means? Specifically, what happens when the pool size is exceed for that consumer?I see the docs on
pub fn with_memory_limit_per_consumer(
but I think we should also document the semantics on the pool itself.