Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create memory table with target partitions #12905

Open
jayzhan211 opened this issue Oct 13, 2024 · 4 comments
Open

Create memory table with target partitions #12905

jayzhan211 opened this issue Oct 13, 2024 · 4 comments
Labels
bug Something isn't working

Comments

@jayzhan211
Copy link
Contributor

Describe the bug

When we create table inside slt, it is created with MemoryExec (Memory table). But it seems like the partitions is always one. If we want to create table with multiple partitions, we need to create multiple table and union with them.

To Reproduce



statement ok
create table t1(a int, b varchar) as values 
(1, 'a'),
(1, 'a'),
(1, 'a');

statement ok
create table t2(a int, b varchar) as values 
(2, 'b'),
(2, 'b'),
(2, 'b');

query TI
With T as
 (select * from t1 UNION ALL select * from t2)
select b, sum(DISTINCT a) from T group by b;
----
a 1
b 2

query TT
explain With T as
 (select * from t1 UNION ALL select * from t2)
select b, sum(DISTINCT a) from T group by b;
----
logical_plan
01)Projection: t.b, sum(alias1) AS sum(DISTINCT t.a)
02)--Aggregate: groupBy=[[t.b]], aggr=[[sum(alias1)]]
03)----Aggregate: groupBy=[[t.b, CAST(t.a AS Int64) AS alias1]], aggr=[[]]
04)------SubqueryAlias: t
05)--------Union
06)----------TableScan: t1 projection=[a, b]
07)----------TableScan: t2 projection=[a, b]
physical_plan
01)ProjectionExec: expr=[b@0 as b, sum(alias1)@1 as sum(DISTINCT t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(alias1)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[sum(alias1)]
06)----------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, alias1@1 as alias1], aggr=[]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([b@0, alias1@1], 4), input_partitions=4
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
10)------------------AggregateExec: mode=Partial, gby=[b@1 as b, CAST(a@0 AS Int64) as alias1], aggr=[]
11)--------------------UnionExec
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
13)----------------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.execution.target_partitions = 2

statement ok
create table t(a int, b varchar) as values 
(1, 'a'),
(1, 'a'),
(1, 'a'),
(2, 'b'),
(2, 'b'),
(2, 'b');

query TI
select b, sum(DISTINCT a) from T group by b;
----
b 2
a 1

query TT
explain select b, sum(DISTINCT a) from T group by b;
----
logical_plan
01)Projection: t.b, sum(alias1) AS sum(DISTINCT t.a)
02)--Aggregate: groupBy=[[t.b]], aggr=[[sum(alias1)]]
03)----Aggregate: groupBy=[[t.b, CAST(t.a AS Int64) AS alias1]], aggr=[[]]
04)------TableScan: t projection=[a, b]
physical_plan
01)ProjectionExec: expr=[b@0 as b, sum(alias1)@1 as sum(DISTINCT t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(alias1)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
05)--------AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[sum(alias1)]
06)----------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, alias1@1 as alias1], aggr=[]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([b@0, alias1@1], 2), input_partitions=2
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
10)------------------AggregateExec: mode=Partial, gby=[b@1 as b, CAST(a@0 AS Int64) as alias1], aggr=[]
11)--------------------MemoryExec: partitions=1, partition_sizes=[1]   <- It is still 1

Expected behavior

I hope we can create arbitrary partitions with setting instead of creating the test with union.

Additional context

I think batch size doesn't not change the MemoryExec too, but I'm not sure about the intended behaviour

@jayzhan211 jayzhan211 added the bug Something isn't working label Oct 13, 2024
@demetribu
Copy link
Contributor

take

@demetribu
Copy link
Contributor

I tried to implement this in a naive way by just adding the following code:

let target_partitions = self.state().config_options().execution.target_partitions;

let physical = DataFrame::new(self.state(), input);
let batches: Vec<_> = physical
    .repartition(Partitioning::RoundRobinBatch(target_partitions))?
    .collect_partitioned().await?;

to

async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result<DataFrame> {

But I found that doing Repartition on top of a simple Projection gets overridden by physical optimization:

PS: When I skip remove_dist_changing_operators, the behavior still looks weird. The number of partitions matches target_partitions, but the data isn’t distributed properly. There’s one partition with all the data and others are empty.

@jayzhan211 Would appreciate any ideas.

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Dec 8, 2024

ValuesExec is not partitionable, I'm thinking about return the partitioned batches based on the configuration

@demetribu
Copy link
Contributor

Unassigning, no longer working on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants