From c9819d3d69dc5e001d5b835a704b4ba08d7db1a1 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Wed, 27 Sep 2023 09:04:12 -0400 Subject: [PATCH] Parallelize more source paths When looking for source paths to parallelize, the optimizer.walkEntries examines only the first operator in a sequence. This prevents the optimizer from parallelizing a pool scan in the right input of a join expressed in subquery syntax, like from left | join (from right | ...) on left_key=right_key Change walkEntries to examine all operators in a sequence so almost all source paths are parallelized. (The exceptions are paths inside dag.Over and paths containing no operator after the source.) --- compiler/optimizer/optimizer.go | 9 ++------- compiler/ztests/par-join.yaml | 35 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) create mode 100644 compiler/ztests/par-join.yaml diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index 4010e4167d..24f8f13d4c 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -92,8 +92,8 @@ func walk(seq dag.Seq, over bool, post func(dag.Seq) dag.Seq) dag.Seq { } func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, error) { - if len(seq) > 0 { - switch op := seq[0].(type) { + for _, op := range seq { + switch op := op.(type) { case *dag.Fork: for k := range op.Paths { seq, err := walkEntries(op.Paths[k], post) @@ -102,7 +102,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err } op.Paths[k] = seq } - return seq, nil case *dag.Scatter: for k := range op.Paths { seq, err := walkEntries(op.Paths[k], post) @@ -111,7 +110,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err } op.Paths[k] = seq } - return seq, nil case *dag.Scope: seq, err := walkEntries(op.Body, post) if err != nil { @@ -253,9 +251,6 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { case *kernel.Reader: op.Filter = filter seq = append(dag.Seq{op}, chain...) - case *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.HTTPScan: - default: - return nil, fmt.Errorf("internal error: an entry point to the query is not a source: %T", op) } return seq, nil }) diff --git a/compiler/ztests/par-join.yaml b/compiler/ztests/par-join.yaml new file mode 100644 index 0000000000..e10e00a73a --- /dev/null +++ b/compiler/ztests/par-join.yaml @@ -0,0 +1,35 @@ +script: | + export ZED_LAKE=test + zed init -q + zed create -q -orderby ts test + # At the time of writing, the where operator is necessary because a pool scan + # is parallelized only when followed by another operator. + zc -C -P 2 "from test | join (from test | where true) on a=b" | sed -e 's/pool .*/.../' + +outputs: + - name: stdout + data: | + lister ... + | slicer + | scatter ( + => + seqscan ... + => + seqscan ... + ) + | merge ts:asc + | fork ( + => + pass + => + lister ... + | slicer + | scatter ( + => + seqscan ... + => + seqscan ... + ) + | merge ts:asc + ) + | join on a=b