Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round robin polling between tied winners in sort preserving merge #13133

Merged
merged 50 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4926799
first draft
jayzhan211 Oct 17, 2024
666f9fe
add data
jayzhan211 Oct 17, 2024
449c930
fix benchmark
jayzhan211 Oct 17, 2024
2566bab
add more bencmark data
jayzhan211 Oct 18, 2024
6000d87
fix benchmark
jayzhan211 Oct 18, 2024
5cd8733
fmt
jayzhan211 Oct 18, 2024
74d3d9b
get max size
jayzhan211 Oct 18, 2024
e628764
add license
jayzhan211 Oct 18, 2024
4277eec
rm code for merge
jayzhan211 Oct 18, 2024
625b925
cleanup
jayzhan211 Oct 18, 2024
e4f9bfe
cleanup
jayzhan211 Oct 18, 2024
3fa2e32
update poll count only we have tie
jayzhan211 Oct 21, 2024
e8da793
upd comment
jayzhan211 Oct 21, 2024
d22ba25
fix logic
jayzhan211 Oct 21, 2024
aadf69c
Merge branch 'rrt-spm' into rrt-spm-for-merge
jayzhan211 Oct 21, 2024
4b2a4ac
configurable
jayzhan211 Oct 21, 2024
920fe6a
fmt
jayzhan211 Oct 21, 2024
d80286c
add mem limit test
jayzhan211 Oct 22, 2024
750261a
rm test
jayzhan211 Oct 22, 2024
79a6df0
escape bracket
jayzhan211 Oct 22, 2024
afdd981
add test
jayzhan211 Oct 22, 2024
6358843
rm per consumer record
jayzhan211 Oct 22, 2024
c9337c6
repartition limit
jayzhan211 Oct 23, 2024
dbc1037
Merge branch 'main' of https://github.com/apache/datafusion into rrt-…
jayzhan211 Oct 24, 2024
bc81681
add benchmark
jayzhan211 Oct 24, 2024
e4970f5
cleanup
jayzhan211 Oct 24, 2024
e7abf68
benchmark with parameter
jayzhan211 Oct 24, 2024
7cb1198
only calculate consumer pool if the limit is set
jayzhan211 Oct 24, 2024
5f4c83e
combine eq and gt
jayzhan211 Oct 25, 2024
e418d6c
review part 1
berkaysynnada Oct 26, 2024
288e2fe
Update merge.rs
berkaysynnada Oct 26, 2024
bca3bde
Merge branch 'main' of https://github.com/apache/datafusion into rrt-…
jayzhan211 Oct 27, 2024
23d5fc7
upd doc
jayzhan211 Oct 27, 2024
2d02aec
no need index comparison
jayzhan211 Oct 28, 2024
010f869
combine handle tie and eq check
jayzhan211 Oct 28, 2024
3605f85
upd doc
jayzhan211 Oct 28, 2024
18f86e8
fmt
jayzhan211 Oct 28, 2024
8adf14e
add more comment
jayzhan211 Oct 28, 2024
8d6c0a6
remove flag
jayzhan211 Oct 28, 2024
a18cba8
upd comment
jayzhan211 Oct 29, 2024
d2f3a84
Revert "remove flag"
jayzhan211 Oct 29, 2024
905eea7
Revert "upd comment"
jayzhan211 Oct 29, 2024
98135ee
add more comment
jayzhan211 Oct 29, 2024
1b418c4
add more comment
jayzhan211 Oct 29, 2024
8297f58
fmt
jayzhan211 Oct 29, 2024
b652802
simpliy mem pool
jayzhan211 Oct 30, 2024
f68fa9b
clippy
jayzhan211 Oct 30, 2024
b63a631
Update merge.rs
berkaysynnada Oct 30, 2024
666b3fe
minor
berkaysynnada Oct 30, 2024
003fce3
add comment
jayzhan211 Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use hashbrown::HashMap;
use log::debug;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
Expand Down Expand Up @@ -58,7 +58,11 @@ impl MemoryPool for UnboundedMemoryPool {
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
// Pool size limit for each consumer, if one of the consumer exceeds the limit, error is returned
pool_size_per_consumer: HashMap<String, usize>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please document what pool_size_per_consumer means? Specifically, what happens when the pool size is exceed for that consumer?

I see the docs on pub fn with_memory_limit_per_consumer( but I think we should also document the semantics on the pool itself.

used: AtomicUsize,
// Memory usage for each consumer, used to check aginst `pool_size_per_consumer`
used_per_consumer: RwLock<HashMap<String, AtomicUsize>>,
}

impl GreedyMemoryPool {
Expand All @@ -67,21 +71,67 @@ impl GreedyMemoryPool {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
pool_size_per_consumer: Default::default(),
used: AtomicUsize::new(0),
used_per_consumer: RwLock::new(HashMap::new()),
}
}

pub fn with_pool_size_per_consumer(
mut self,
pool_size_per_consumer: HashMap<String, usize>,
) -> Self {
self.pool_size_per_consumer = pool_size_per_consumer;
self
}
}

impl MemoryPool for GreedyMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
let consumer_name = reservation.consumer().name();
self.used.fetch_add(additional, Ordering::Relaxed);

let mut used_per_consumer = self.used_per_consumer.write();
let consumer_usage = used_per_consumer
.entry(consumer_name.to_string())
.or_insert_with(|| AtomicUsize::new(0));
consumer_usage.fetch_add(additional, Ordering::Relaxed);
}

fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
let consumer_name = reservation.consumer().name();

self.used.fetch_sub(shrink, Ordering::Relaxed);

let mut used_per_consumer = self.used_per_consumer.write();
let consumer_usage = used_per_consumer
.entry(consumer_name.to_string())
.or_insert_with(|| AtomicUsize::new(0));
consumer_usage.fetch_sub(shrink, Ordering::Relaxed);
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let consumer_name = reservation.consumer().name();

if let Some(pool_size) = self.pool_size_per_consumer.get(consumer_name) {
let mut used_per_consumer = self.used_per_consumer.write();
let consumer_usage = used_per_consumer
.entry(consumer_name.to_string())
.or_insert_with(|| AtomicUsize::new(0));
consumer_usage
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= *pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
pool_size.saturating_sub(used),
)
})?;
}

self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
Expand Down
18 changes: 18 additions & 0 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{

use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::{DataFusionError, Result};
use hashbrown::HashMap;
use object_store::ObjectStore;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -232,6 +233,23 @@ impl RuntimeEnvBuilder {
)))
}

/// Set memory limit per consumer, if not set, by default is the same as the total pool size
/// For example, if pool size is 4000, repartition is 3000. Total pool size: 4000,
/// RepartitionExec pool size: 3000, SortPreservingMergeExec pool size: 4000
pub fn with_memory_limit_per_consumer(
self,
max_memory: usize,
memory_fraction: f64,
pool_size_per_consumer: HashMap<String, usize>,
) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size)
.with_pool_size_per_consumer(pool_size_per_consumer),
NonZeroUsize::new(5).unwrap(),
)))
}

/// Use the specified path to create any needed temporary files
pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rand = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_futures"] }
datafusion-functions-aggregate = { workspace = true }
rstest = { workspace = true }
rstest_reuse = "0.7.0"
Expand All @@ -76,3 +77,7 @@ tokio = { workspace = true, features = [
"fs",
"parking_lot",
] }

[[bench]]
harness = false
name = "spm"
145 changes: 145 additions & 0 deletions datafusion/physical-plan/benches/spm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::memory::MemoryExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{collect, ExecutionPlan};

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn generate_spm_for_round_robin_tie_breaker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already some SortPreservingMerge benchmarks in

fn merge_sorted(partitions: &[Vec<RecordBatch>]) -> Self {
, I recommend consolidating the benchmarks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we done this in follow on PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have suggested to @jayzhan211 to take the first steps in creating operator-specific benchmarks. I believe there's already a goal for this (I recall an older issue related to it). Perhaps we should extract these benchmarks from core and port them here @alamb ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operator specific benchmarks sounds good to me -- I think it is very valuable to the have the benchmarks for the same operator together so they can be found / don't get duplicated

has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
) -> SortPreservingMergeExec {
let row_size = 256;
let rb = if has_same_value {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size]));
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
} else {
let v = (0i32..row_size as i32).collect::<Vec<_>>();
let a: ArrayRef = Arc::new(Int32Array::from(v));

// Use alphanumeric characters
let charset: Vec<char> =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
.chars()
.collect();

let mut strings = Vec::new();
for i in 0..256 {
let mut s = String::new();
s.push(charset[i % charset.len()]);
s.push(charset[(i / charset.len()) % charset.len()]);
strings.push(Some(s));
}

let b: ArrayRef = Arc::new(StringArray::from_iter(strings));

let v = (0i64..row_size as i64).collect::<Vec<_>>();
let c: ArrayRef = Arc::new(Int64Array::from_iter(v));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
};

let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>();
let partitiones = vec![rbs.clone(); partition_count];

let schema = rb.schema();
let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: Default::default(),
},
];

let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap();
SortPreservingMergeExec::new(sort, Arc::new(exec))
.with_round_robin_repartition(enable_round_robin_repartition)
}

fn run_bench(
c: &mut Criterion,
has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
description: &str,
) {
let task_ctx = TaskContext::default();
let task_ctx = Arc::new(task_ctx);

let spm = Arc::new(generate_spm_for_round_robin_tie_breaker(
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
)) as Arc<dyn ExecutionPlan>;

c.bench_function(description, |b| {
b.to_async(FuturesExecutor)
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx))))
});
}

fn criterion_benchmark(c: &mut Criterion) {
let params = [
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker
];

let batch_counts = [1, 25, 625];
let partition_counts = [2, 8, 32];

for &(has_same_value, enable_round_robin_repartition, cardinality_label) in &params {
for &batch_count in &batch_counts {
for &partition_count in &partition_counts {
let description = format!(
"{}_batch_count_{}_partition_count_{}",
cardinality_label, batch_count, partition_count
);
run_bench(
c,
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
&description,
);
}
}
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
54 changes: 54 additions & 0 deletions datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub trait CursorValues {
/// Returns true if `l[l_idx] == r[r_idx]`
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;

/// Returns true if `row[idx] == row[idx - 1]`
/// Given `idx` should be greater than 0
fn eq_to_previous(cursor: &Self, idx: usize) -> bool;

/// Returns comparison of `l[l_idx]` and `r[r_idx]`
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
}
Expand Down Expand Up @@ -95,6 +99,16 @@ impl<T: CursorValues> Cursor<T> {
self.offset += 1;
t
}

pub fn is_eq_to_prev_one(&self, prev_cursor: Option<&Cursor<T>>) -> bool {
if self.offset > 0 {
self.is_eq_to_prev_row()
} else if let Some(prev_cursor) = prev_cursor {
self.is_eq_to_prev_row_in_prev_batch(prev_cursor)
} else {
false
}
}
}

impl<T: CursorValues> PartialEq for Cursor<T> {
Expand All @@ -103,6 +117,22 @@ impl<T: CursorValues> PartialEq for Cursor<T> {
}
}

impl<T: CursorValues> Cursor<T> {
fn is_eq_to_prev_row(&self) -> bool {
T::eq_to_previous(&self.values, self.offset)
}

fn is_eq_to_prev_row_in_prev_batch(&self, other: &Self) -> bool {
assert_eq!(self.offset, 0);
T::eq(
&self.values,
self.offset,
&other.values,
other.values.len() - 1,
)
}
}

impl<T: CursorValues> Eq for Cursor<T> {}

impl<T: CursorValues> PartialOrd for Cursor<T> {
Expand Down Expand Up @@ -156,6 +186,11 @@ impl CursorValues for RowValues {
l.rows.row(l_idx) == r.rows.row(r_idx)
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.rows.row(idx) == cursor.rows.row(idx - 1)
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.rows.row(l_idx).cmp(&r.rows.row(r_idx))
}
Expand Down Expand Up @@ -188,6 +223,11 @@ impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
l.0[l_idx].is_eq(r.0[r_idx])
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.0[idx].is_eq(cursor.0[idx - 1])
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.0[l_idx].compare(r.0[r_idx])
}
Expand Down Expand Up @@ -219,6 +259,11 @@ impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
l.value(l_idx) == r.value(r_idx)
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.value(idx) == cursor.value(idx - 1)
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.value(l_idx).cmp(r.value(r_idx))
}
Expand Down Expand Up @@ -284,6 +329,15 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
}
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
match (cursor.is_null(idx), cursor.is_null(idx - 1)) {
(true, true) => true,
(false, false) => T::eq(&cursor.values, idx, &cursor.values, idx - 1),
_ => false,
}
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
match (l.is_null(l_idx), r.is_null(r_idx)) {
(true, true) => Ordering::Equal,
Expand Down
Loading