diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index 4010e4167d..24f8f13d4c 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -92,8 +92,8 @@ func walk(seq dag.Seq, over bool, post func(dag.Seq) dag.Seq) dag.Seq { } func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, error) { - if len(seq) > 0 { - switch op := seq[0].(type) { + for _, op := range seq { + switch op := op.(type) { case *dag.Fork: for k := range op.Paths { seq, err := walkEntries(op.Paths[k], post) @@ -102,7 +102,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err } op.Paths[k] = seq } - return seq, nil case *dag.Scatter: for k := range op.Paths { seq, err := walkEntries(op.Paths[k], post) @@ -111,7 +110,6 @@ func walkEntries(seq dag.Seq, post func(dag.Seq) (dag.Seq, error)) (dag.Seq, err } op.Paths[k] = seq } - return seq, nil case *dag.Scope: seq, err := walkEntries(op.Body, post) if err != nil { @@ -253,9 +251,6 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { case *kernel.Reader: op.Filter = filter seq = append(dag.Seq{op}, chain...) - case *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.HTTPScan: - default: - return nil, fmt.Errorf("internal error: an entry point to the query is not a source: %T", op) } return seq, nil }) diff --git a/compiler/ztests/par-join.yaml b/compiler/ztests/par-join.yaml new file mode 100644 index 0000000000..e10e00a73a --- /dev/null +++ b/compiler/ztests/par-join.yaml @@ -0,0 +1,35 @@ +script: | + export ZED_LAKE=test + zed init -q + zed create -q -orderby ts test + # At the time of writing, the where operator is necessary because a pool scan + # is parallelized only when followed by another operator. + zc -C -P 2 "from test | join (from test | where true) on a=b" | sed -e 's/pool .*/.../' + +outputs: + - name: stdout + data: | + lister ... + | slicer + | scatter ( + => + seqscan ... + => + seqscan ... + ) + | merge ts:asc + | fork ( + => + pass + => + lister ... + | slicer + | scatter ( + => + seqscan ... + => + seqscan ... + ) + | merge ts:asc + ) + | join on a=b