Skip to content

Commit

Permalink
Parallelize more source paths
Browse files Browse the repository at this point in the history
When looking for source paths to parallelize, the optimizer.walkEntries
examines only the first operator in a sequence.  This prevents the
optimizer from parallelizing a pool scan in the right input of a join
expressed in subquery syntax, like

    from left | join (from right | ...) on left_key=right_key

Change walkEntries to examine all operators in a sequence so almost all
source paths are parallelized.  (The exceptions are paths inside
dag.Over and paths containing no operator after the source.)
  • Loading branch information
nwt committed Sep 27, 2023
1 parent d938fba commit c9819d3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
9 changes: 2 additions & 7 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func walk(seq dag.Seq, over bool, post func(dag.Seq) dag.Seq) dag.Seq {
}

func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, error) {
if len(seq) > 0 {
switch op := seq[0].(type) {
for _, op := range seq {
switch op := op.(type) {
case *dag.Fork:
for k := range op.Paths {
seq, err := walkEntries(op.Paths[k], post)
Expand All @@ -102,7 +102,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err
}
op.Paths[k] = seq
}
return seq, nil
case *dag.Scatter:
for k := range op.Paths {
seq, err := walkEntries(op.Paths[k], post)
Expand All @@ -111,7 +110,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err
}
op.Paths[k] = seq
}
return seq, nil
case *dag.Scope:
seq, err := walkEntries(op.Body, post)
if err != nil {
Expand Down Expand Up @@ -253,9 +251,6 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
case *kernel.Reader:
op.Filter = filter
seq = append(dag.Seq{op}, chain...)
case *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.HTTPScan:
default:
return nil, fmt.Errorf("internal error: an entry point to the query is not a source: %T", op)
}
return seq, nil
})
Expand Down
35 changes: 35 additions & 0 deletions compiler/ztests/par-join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
script: |
export ZED_LAKE=test
zed init -q
zed create -q -orderby ts test
# At the time of writing, the where operator is necessary because a pool scan
# is parallelized only when followed by another operator.
zc -C -P 2 "from test | join (from test | where true) on a=b" | sed -e 's/pool .*/.../'
outputs:
- name: stdout
data: |
lister ...
| slicer
| scatter (
=>
seqscan ...
=>
seqscan ...
)
| merge ts:asc
| fork (
=>
pass
=>
lister ...
| slicer
| scatter (
=>
seqscan ...
=>
seqscan ...
)
| merge ts:asc
)
| join on a=b

0 comments on commit c9819d3

Please sign in to comment.