Skip to content

Commit

Permalink
Implement CardinalityAwareRowConverter while doing streaming merge (#…
Browse files Browse the repository at this point in the history
…7401)

* Use CardinalityAwareRow converter

* Move CardinalityAwareRowConverter in df

* Try to add cardinality aware row converter in df

* Move CardinalityAwareRowConverter in df

* Move CardinalityAwareRowConverter in df

* Remove unnecessary clone and make wrapper mod private

* Use as_any_dictionary_opt

* Remove unnecessary comments

* Remove done

* Add test for cardinality aware row converter on high card dict

* Add test for cardinality aware row converter on low card dict

* Ignore the test_dict_merge_infinite test

* Remove phantom Arc import

* Remove the infinite stream test

* Update datafusion/core/src/physical_plan/wrapper.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update convert_rows signature and add empty_rows

* Add comments to the test

* Use Some and take() semantics

* Init with a row converter instance instead of none

* Remove unused variable

* Remove unused imports

* Remove unused imports

* Change GroupValues

* Add comments, run fmt

* Init with a empty row converter

* Use the cardinality aware row converter

* Reconvert the group values

* Rename wrapper to row_converter

* Recovert the group values

* Convert back to dictionary

* fmt

* A fmt pass

* fix: fmt

* Move the reconversion to dict to just consider group by columns

* Reconvert only the correct cols

* Use assert eq

* clippy

* clippy

* Add comment about the reconversion to dict

* Fix the merge issues

* move data type conversion

* fix

* fix docs

* fix bug

* Improve tests

* simplify

* Use cardinality aware row converter in gby order

* clippy

* Adjust memory test

* Add doc comments about row converter

* remove outdated comment

* Rework partition size calculation to make test clearer

* Increase threshold to 512

* Update row converter tests according to new threshold

* fix clippy

* fix panic

* Adjust constant for test

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
JayjeetAtGithub and alamb authored Sep 18, 2023
1 parent 44b3318 commit f4c4ee1
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 69 deletions.
29 changes: 15 additions & 14 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 96 additions & 26 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ async fn symmetric_hash_join() {

#[tokio::test]
async fn sort_preserving_merge() {
let partition_size = batches_byte_size(&dict_batches());
let scenario = Scenario::new_dictionary_strings(2);
let partition_size = scenario.partition_size();

TestCase::new()
// This query uses the exact same ordering as the input table
Expand All @@ -213,7 +214,7 @@ async fn sort_preserving_merge() {
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
// two partitions of data, so a merge is required
.with_scenario(Scenario::DictionaryStrings(2))
.with_scenario(scenario)
.with_expected_plan(
// It is important that this plan only has
// SortPreservingMergeExec (not a Sort which would compete
Expand All @@ -238,7 +239,10 @@ async fn sort_preserving_merge() {

#[tokio::test]
async fn sort_spill_reservation() {
let partition_size = batches_byte_size(&dict_batches());
let scenario = Scenario::new_dictionary_strings(1)
// make the batches small enough to avoid triggering CardinalityAwareRowConverter
.with_single_row_batches(true);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
Expand All @@ -248,30 +252,30 @@ async fn sort_spill_reservation() {
// purposely sorting data that requires non trivial memory to
// sort/merge.
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
.with_query("select * from t ORDER BY a , b DESC")
// enough memory to sort if we don't try to merge it all at once
// enough memory to sort if we don't try to merge it all at once
.with_memory_limit(partition_size)
// use a single partiton so only a sort is needed
.with_scenario(Scenario::DictionaryStrings(1))
// use a single partiton so only a sort is needed
.with_scenario(scenario)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_expected_plan(
// It is important that this plan only has a SortExec, not
// also merge, so we can ensure the sort could finish
// given enough merging memory
&[
"+---------------+--------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |",
"| | MemoryExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+--------------------------------------------------------------------------------------------------------+",
"+---------------+----------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+----------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |",
"| | MemoryExec: partitions=1, partition_sizes=[245], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+----------------------------------------------------------------------------------------------------------+",
]
);

Expand Down Expand Up @@ -471,11 +475,48 @@ enum Scenario {
/// [`StreamingTable`]
AccessLogStreaming,

/// N partitions of of sorted, dictionary encoded strings
DictionaryStrings(usize),
/// N partitions of of sorted, dictionary encoded strings.
DictionaryStrings {
partitions: usize,
/// If true, splits all input batches into 1 row each
single_row_batches: bool,
},
}

impl Scenario {
/// Create a new DictionaryStrings scenario with the number of partitions
fn new_dictionary_strings(partitions: usize) -> Self {
Self::DictionaryStrings {
partitions,
single_row_batches: false,
}
}

/// Should the input be split into 1 row batches?
fn with_single_row_batches(mut self, val: bool) -> Self {
if let Self::DictionaryStrings {
single_row_batches, ..
} = &mut self
{
*single_row_batches = val;
} else {
panic!("Scenario does not support single row batches");
}
self
}

/// return the size, in bytes, of each partition
fn partition_size(&self) -> usize {
if let Self::DictionaryStrings {
single_row_batches, ..
} = self
{
batches_byte_size(&maybe_split_batches(dict_batches(), *single_row_batches))
} else {
panic!("Scenario does not support partition size");
}
}

/// return a TableProvider with data for the test
fn table(&self) -> Arc<dyn TableProvider> {
match self {
Expand All @@ -500,11 +541,17 @@ impl Scenario {
.with_infinite_table(true);
Arc::new(table)
}
Self::DictionaryStrings(num_partitions) => {
Self::DictionaryStrings {
partitions,
single_row_batches,
} => {
use datafusion::physical_expr::expressions::col;
let batches: Vec<Vec<_>> = std::iter::repeat(dict_batches())
.take(*num_partitions)
.collect();
let batches: Vec<Vec<_>> = std::iter::repeat(maybe_split_batches(
dict_batches(),
*single_row_batches,
))
.take(*partitions)
.collect();

let schema = batches[0][0].schema();
let options = SortOptions {
Expand Down Expand Up @@ -544,7 +591,7 @@ impl Scenario {
// first
Some(vec![Arc::new(JoinSelection::new())])
}
Self::DictionaryStrings(_) => {
Self::DictionaryStrings { .. } => {
// Use default rules
None
}
Expand All @@ -559,6 +606,29 @@ fn access_log_batches() -> Vec<RecordBatch> {
.collect()
}

/// If `one_row_batches` is true, then returns new record batches that
/// are one row in size
fn maybe_split_batches(
batches: Vec<RecordBatch>,
one_row_batches: bool,
) -> Vec<RecordBatch> {
if !one_row_batches {
return batches;
}

batches
.into_iter()
.flat_map(|mut batch| {
let mut batches = vec![];
while batch.num_rows() > 1 {
batches.push(batch.slice(0, 1));
batch = batch.slice(1, batch.num_rows() - 1);
}
batches
})
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();

/// Returns 5 sorted string dictionary batches each with 50 rows with
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ tempfile = "3"
#[dev-dependencies]
termtree = "0.4.1"
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
uuid = { version = "^1.2", features = ["v4"] }
Loading

0 comments on commit f4c4ee1

Please sign in to comment.