Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinOptimization: Add build side pushdown to probe side #13054

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Lordworms
Copy link
Contributor

Which issue does this PR close?

Closes #7955

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate common Related to common crate functions labels Oct 22, 2024
@Lordworms
Copy link
Contributor Author

Lordworms commented Oct 22, 2024

For query like

select count(*) from part join lineitem on l_partkey=p_partkey where p_partkey > 10;

with

set datafusion.execution.parquet.pushdown_filters = true;

and with data scale like
image

The execution time is 5 times faster

before:
before

after:
after

The script I used to generate test parquet files
Screenshot 2024-10-21 at 6 21 34 PM

Since datafusion is unlike duckdb, the default way to do join is to utilize partitions, I added an extra phase for aggregate build side information.

I tested on my M3Pro Mac, with cpu info like
image

@Lordworms
Copy link
Contributor Author

@Dandandan Sorry for the late response, I have reinvested it and format the code. Really appreciate your suggestion. Thanks a lot.

@Dandandan
Copy link
Contributor

Really nice @Lordworms , will have a good look later today.

return Ok(Transformed::yes(Arc::new(new_hash_join)));
}
Ok(Transformed::no(plan))
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could allow this cool optimization on any ExecutionPlan by adding the "with_dynamic_filter" to the ExecutionPlan?
Maybe also a "supports_dynamic_filter" to know when to call "with_dynamic_filter".
So same principle as the existing static filter pushdown setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense

Copy link
Contributor Author

@Lordworms Lordworms Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll. refactor this, but currently support ParquetExec first? Since I think for other PhysicalScanExec, the way to add dynamic filter is to add a FilterExec above, but in parquet, we could utilize the predicate to add filters dynamically?

@@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec {
)
})
.unwrap_or_default();

let dynamic_filter =
format!("dynamic_filter: {:?}", self.dynamic_filters);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to only write this if available. + Would be good to implement Display for DynamicFilterInfo as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that, this is for debug use, I'll refactor it

}
}

impl PhysicalOptimizerRule for JoinFilterPushdown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed during plan time, won't it be reasonable to execute this during execution of HashJoin after build side is loaded?

data_types: Vec<&DataType>,
total_batches: usize,
) -> Result<Self, DataFusionError> {
let (max_accumulators, min_accumulators) = data_types
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use a logical plan / physical plan instead of manually invoking min/max accumulators? Not sure if that makes sense :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either, First I want to try using min/max ScalarFunction, but the ScalaFunction translation from logical_expr to physical_expr is not implemented yet... I guess using accumulator would be more Intuitive?

}

inner.batch_count = inner.batch_count.saturating_sub(1);
if records.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the input of a join partition be empty, in which case it will never be finalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't try that, I'll add a test for that

Copy link
Contributor Author

@Lordworms Lordworms Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it happens sometime, haven't got a way to solve it yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new judge "if a partition id has been recorded and it return empty batch again, we treat it as an empty partition and return true"

let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar));
let min_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(min_scalar));

let range_condition: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to not create a dynamic filter in certain conditions, eg min is minimum value of data type, max is maximum of datatype (expression will not filter out any rows)?

let max_scalar = max_value.clone();
let min_scalar = min_value.clone();

let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar));
Copy link
Contributor

@Dandandan Dandandan Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability, it's better to use LogicalPlan here and convert to physical? This might also support some optimizations for free (e.g. expression simplification)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.


struct DynamicFilterInfoInner {
max_accumulators: Vec<MaxAccumulator>,
min_accumulators: Vec<MinAccumulator>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just use batches: Vec<Vec<Arc<dyn Array>>> here and compute any filters from that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it pretty easy to support additional filters. It's pretty easy to add InListExpr filter based on this (I have some code locally).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I'll refactor it, after that maybe I can combine your commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, after changes have cooled down a bit I can update and share my branch.
Adding something like that on top is actually quite easy after making the make_set helper public:

+                let col =
+                    Arc::<datafusion_physical_expr::expressions::Column>::clone(column);
+                let batches: Vec<&dyn Array> = inner.batches[i]
+                    .iter()
+                    .map(|x| x as &dyn Array)
+                    .collect();
+
+                // use inlist rather than min/max, use some threshold to avoid big inputs
+                // TODO: big inputs could be using bloom filter
+                if batches.iter().map(|x|x.len()).sum::<usize>() < 100 {
+                    let batches = concat(batches.as_slice())?;
+                    let set = make_set(&batches)?;
+    
+                    let unique_condition = InListExpr::new(col, vec![], false, Some(set));
+                    condition = Arc::new(BinaryExpr::new(
+                        Arc::new(unique_condition),
                         Operator::And,
-                        range_condition,
-                    ))
-                        as Arc<dyn PhysicalExpr>)),
-                    None => Ok(Some(range_condition)),
+                        condition,
+                    ));
                 }
+                Ok(Some(condition))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that would be fun to try is creating a PhysicalExpr from the JoinHashMap - that way we can build a good filter without any overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to do this , but since hash function is one-way function, I can only combine the record batches to get original value... I didn't see the benefits here, I think one way to reduce overhead is just supporting max/min filter here(as what duckdb does)... If there is a better way, please tell me. I am getting a little confused here. Thanks

Copy link
Contributor

@Dandandan Dandandan Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea is just to filter out values where the hash isn't present in the hashmap, that way we can filter out most values.

It's fine to focus on min/max first, the positive side is that evaluation is fast, but it won't filter out a lot of values if the min/max range from the left side is as big as the right side (regardless of number of values being much smaller on left side).

Copy link
Contributor Author

@Lordworms Lordworms Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea is just to filter out values where the hash isn't present in the hashmap, that way we can filter out most values.

I got it, I was think about rebuild original array using hashmap and records 😅...... I'll add the logic

It's fine to focus on min/max first, the positive side is that evaluation is fast, but it won't filter out a lot of values if the min/max range from the left side is as big as the right side (regardless of number of values being much smaller on left side).


let data_type = arrays[0].data_type();
match data_type {
DataType::Int8 => process_min_max!(arrays, Int8Array, Int8, i8),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently support numeric types columns(which adds a range filter), would support string types in next PR(add IN (xx, xx) filter)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String types could perform range as well, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it.

@berkaysynnada
Copy link
Contributor

Could you update the display of sources to include these pushed-down filters? I believe it would be more clear if we could see them clearly in the output (I'm also curious to see how this impacts the test plans). Additionally, are there any benchmark results you could share with us?

@Dandandan
Copy link
Contributor

Could you update the display of sources to include these pushed-down filters? I believe it would be more clear if we could see them clearly in the output (I'm also curious to see how this impacts the test plans). Additionally, are there any benchmark results you could share with us?

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

@berkaysynnada
Copy link
Contributor

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

@Dandandan
Copy link
Contributor

Dandandan commented Oct 24, 2024

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

Yes that's correct, just wanted to stress the actual filter isn't added during the optimization phase, only a "placeholder" that might be filled with some filter expressions during execution for particular columns. So it only shows ParquetExec might be filtered dynamically from the join values on certain columns, but the range or set of values will not be added at that point.

@Lordworms
Copy link
Contributor Author

Lordworms commented Oct 25, 2024

How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side.

Doesn’t JoinFilterPushdown: PhysicalOptimizerRule modify the dynamic_filters in ParquetExec during optimizations? My intention was to ensure the rule is functioning correctly and to prevent this feature from being silently broken in the future. Perhaps we could add some unit tests to verify this.

Maybe I can show them in 'explain analyze'

Something like
image

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Oct 25, 2024
@Lordworms Lordworms force-pushed the dynamic_join_pushdown branch from 1aeb19a to 2ff52e6 Compare October 25, 2024 04:24
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Oct 25, 2024
@Lordworms Lordworms force-pushed the dynamic_join_pushdown branch from 45d70a8 to 85b26d8 Compare November 7, 2024 01:55
@@ -1406,12 +1403,24 @@ impl HashJoinStream {
self.hashes_buffer.resize(batch.num_rows(), 0);
create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?;

let (filtered_batch, filtered_hashes) =
if let Some(dynamic_filter) = &self.dynamic_filter_info {
dynamic_filter.filter_probe_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, filter_probe_batch shouldn't be added as filter on the hash_join but rather as PhysicalExpr (returning boolean) to filter the ParquetExec

Copy link
Contributor Author

@Lordworms Lordworms Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, filter_probe_batch shouldn't be added as filter on the hash_join but rather as PhysicalExpr (returning boolean) to filter the ParquetExec

Sure, I'll refactor that, other than that I found a problem with ParquetExec after rebase the main #13298. If the pushdown cost is much more than do it directly. I am not sure whether it is a 'optimization' or not....

@Lordworms
Copy link
Contributor Author

I repushed with an adaptive way to generate dynamic filter(when unique value exceed a threshold, generate range filter, otherwise inlist filter), and due to #13298 I didn't pushdown the filter down to parquet scan, I pushed the filter down to file stream, also tried with random data like

import pandas as pd
import numpy as np

part = pd.DataFrame({
    'p_partkey': np.random.randint(10, 21, size=1000),
    'p_brand': np.random.choice(['Brand#1', 'Brand#2', 'Brand#3'], 1000),
    'p_container': np.random.choice(['SM BOX', 'LG BOX', 'MED BOX'], 1000)
})

lineitem = pd.DataFrame({
    'l_partkey': np.random.randint(1, 1000001, size=100000000),
    'l_quantity': np.random.uniform(1, 50, size=100000000),
    'l_extendedprice': np.random.uniform(100, 10000, size=100000000)
})

and the result is
image

which only get like 40% of increase, I think we could try to pushdown filter down to parquet scan if the filter for reading parquet is optimized.

Also perhaps we need to improve performance of Inlist since when I tried 20 elements, the performance is 5 times slower than using range filter.

@Lordworms Lordworms force-pushed the dynamic_join_pushdown branch from e5fd1eb to e5a33d8 Compare November 14, 2024 02:13
@Lordworms
Copy link
Contributor Author

Lordworms commented Nov 15, 2024

Not much difference on it since I think this optimization is specialized for specific data (build side contains data in certain range, etc)image

@Lordworms Lordworms force-pushed the dynamic_join_pushdown branch from ca627a0 to 8762266 Compare November 15, 2024 02:14
@github-actions github-actions bot added the proto Related to proto crate label Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation functions logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc)
4 participants