Skip to content

Commit

Permalink
Parallelize more source paths (#4785)
Browse files Browse the repository at this point in the history
When looking for source paths to parallelize, the optimizer.walkEntries
examines only the first operator in a sequence.  This prevents the
optimizer from parallelizing a pool scan in the right input of a join
expressed in subquery syntax, like

    from left | join (from right | ...) on left_key=right_key

Change walkEntries to examine all operators in a sequence so almost all
source paths are parallelized.  (The exceptions are paths inside
dag.Over and paths containing no operator after the source.)
  • Loading branch information
nwt authored Sep 28, 2023
1 parent 7c1aaa2 commit f662ee1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
9 changes: 2 additions & 7 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func walk(seq dag.Seq, over bool, post func(dag.Seq) dag.Seq) dag.Seq {
}

func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, error) {
if len(seq) > 0 {
switch op := seq[0].(type) {
for _, op := range seq {
switch op := op.(type) {
case *dag.Fork:
for k := range op.Paths {
seq, err := walkEntries(op.Paths[k], post)
Expand All @@ -102,7 +102,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err
}
op.Paths[k] = seq
}
return seq, nil
case *dag.Scatter:
for k := range op.Paths {
seq, err := walkEntries(op.Paths[k], post)
Expand All @@ -111,7 +110,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err
}
op.Paths[k] = seq
}
return seq, nil
case *dag.Scope:
seq, err := walkEntries(op.Body, post)
if err != nil {
Expand Down Expand Up @@ -253,9 +251,6 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
case *kernel.Reader:
op.Filter = filter
seq = append(dag.Seq{op}, chain...)
case *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.HTTPScan:
default:
return nil, fmt.Errorf("internal error: an entry point to the query is not a source: %T", op)
}
return seq, nil
})
Expand Down
35 changes: 35 additions & 0 deletions compiler/ztests/par-join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
script: |
export ZED_LAKE=test
zed init -q
zed create -q -orderby ts test
# At the time of writing, the where operator is necessary because a pool scan
# is parallelized only when followed by another operator.
zc -C -P 2 "from test | join (from test | where true) on a=b" | sed -e 's/pool .*/.../'
outputs:
- name: stdout
data: |
lister ...
| slicer
| scatter (
=>
seqscan ...
=>
seqscan ...
)
| merge ts:asc
| fork (
=>
pass
=>
lister ...
| slicer
| scatter (
=>
seqscan ...
=>
seqscan ...
)
| merge ts:asc
)
| join on a=b

0 comments on commit f662ee1

Please sign in to comment.