diff --git a/compiler/kernel/filter.go b/compiler/kernel/filter.go index 51b50a630c..4c1885d148 100644 --- a/compiler/kernel/filter.go +++ b/compiler/kernel/filter.go @@ -24,7 +24,7 @@ func (f *Filter) AsBufferFilter() (*expr.BufferFilter, error) { if f == nil { return nil, nil } - return CompileBufferFilter(f.builder.rctx.Zctx, f.pushdown) + return CompileBufferFilter(f.builder.zctx(), f.pushdown) } type DeleteFilter struct { diff --git a/compiler/kernel/op.go b/compiler/kernel/op.go index 197381fa44..c47d65259a 100644 --- a/compiler/kernel/op.go +++ b/compiler/kernel/op.go @@ -155,7 +155,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) return nil, err } lhs, rhs := splitAssignments(assignments) - cutter := expr.NewCutter(b.rctx.Zctx, lhs, rhs) + cutter := expr.NewCutter(b.zctx(), lhs, rhs) return op.NewApplier(b.rctx, parent, cutter, b.resetters), nil case *dag.Drop: if len(v.Args) == 0 { @@ -169,7 +169,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) } fields = append(fields, field.Path) } - dropper := expr.NewDropper(b.rctx.Zctx, fields) + dropper := expr.NewDropper(b.zctx(), fields) return op.NewApplier(b.rctx, parent, dropper, expr.Resetters{}), nil case *dag.Sort: b.resetResetters() @@ -208,21 +208,21 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) if err != nil { return nil, fmt.Errorf("compiling filter: %w", err) } - return op.NewApplier(b.rctx, parent, expr.NewFilterApplier(b.rctx.Zctx, f), b.resetters), nil + return op.NewApplier(b.rctx, parent, expr.NewFilterApplier(b.zctx(), f), b.resetters), nil case *dag.Top: b.resetResetters() fields, err := b.compileExprs(v.Args) if err != nil { return nil, fmt.Errorf("compiling top: %w", err) } - return top.New(b.rctx.Zctx, parent, v.Limit, fields, v.Flush, b.resetters), nil + return top.New(b.zctx(), parent, v.Limit, fields, v.Flush, b.resetters), nil case *dag.Put: b.resetResetters() clauses, err := b.compileAssignments(v.Args) if err != nil { return nil, err } - putter := expr.NewPutter(b.rctx.Zctx, clauses) + putter := expr.NewPutter(b.zctx(), clauses) return op.NewApplier(b.rctx, parent, putter, b.resetters), nil case *dag.Rename: b.resetResetters() @@ -230,7 +230,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) if err != nil { return nil, err } - renamer := expr.NewRenamer(b.rctx.Zctx, srcs, dsts) + renamer := expr.NewRenamer(b.zctx(), srcs, dsts) return op.NewApplier(b.rctx, parent, renamer, b.resetters), nil case *dag.Fuse: return fuse.New(b.rctx, parent) @@ -241,7 +241,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) case *dag.Merge: return nil, errors.New("merge: multiple upstream paths required") case *dag.Explode: - typ, err := zson.ParseType(b.rctx.Zctx, v.Type) + typ, err := zson.ParseType(b.zctx(), v.Type) if err != nil { return nil, err } @@ -250,7 +250,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) if err != nil { return nil, err } - return explode.New(b.rctx.Zctx, parent, args, typ, v.As, b.resetters) + return explode.New(b.zctx(), parent, args, typ, v.As, b.resetters) case *dag.Over: return b.compileOver(parent, v) case *dag.Yield: @@ -267,7 +267,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) } return b.compilePoolScan(v) case *dag.PoolMetaScan: - return meta.NewPoolMetaScanner(b.rctx.Context, b.rctx.Zctx, b.source.Lake(), v.ID, v.Meta) + return meta.NewPoolMetaScanner(b.rctx.Context, b.zctx(), b.source.Lake(), v.ID, v.Meta) case *dag.CommitMetaScan: var pruner expr.Evaluator if v.Tap && v.KeyPruner != nil { @@ -277,14 +277,14 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) return nil, err } } - return meta.NewCommitMetaScanner(b.rctx.Context, b.rctx.Zctx, b.source.Lake(), v.Pool, v.Commit, v.Meta, pruner) + return meta.NewCommitMetaScanner(b.rctx.Context, b.zctx(), b.source.Lake(), v.Pool, v.Commit, v.Meta, pruner) case *dag.LakeMetaScan: - return meta.NewLakeMetaScanner(b.rctx.Context, b.rctx.Zctx, b.source.Lake(), v.Meta) + return meta.NewLakeMetaScanner(b.rctx.Context, b.zctx(), b.source.Lake(), v.Meta) case *dag.HTTPScan: body := strings.NewReader(v.Body) - return b.source.OpenHTTP(b.rctx.Context, b.rctx.Zctx, v.URL, v.Format, v.Method, v.Headers, body, demand.All()) + return b.source.OpenHTTP(b.rctx.Context, b.zctx(), v.URL, v.Format, v.Method, v.Headers, body, demand.All()) case *dag.FileScan: - return b.source.Open(b.rctx.Context, b.rctx.Zctx, v.Path, v.Format, b.PushdownOf(v.Filter), demand.All()) + return b.source.Open(b.rctx.Context, b.zctx(), v.Path, v.Format, b.PushdownOf(v.Filter), demand.All()) case *dag.DefaultScan: pushdown := b.PushdownOf(v.Filter) if len(b.readers) == 1 { diff --git a/compiler/kernel/vop.go b/compiler/kernel/vop.go index e815e68f8e..d32980cade 100644 --- a/compiler/kernel/vop.go +++ b/compiler/kernel/vop.go @@ -78,14 +78,14 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller, if err != nil { return nil, err } - return vamop.NewFilter(b.rctx.Zctx, parent, e), nil + return vamop.NewFilter(b.zctx(), parent, e), nil case *dag.Head: return vamop.NewHead(parent, o.Count), nil case *dag.Summarize: if name, ok := optimizer.IsCountByString(o); ok { - return vamop.NewCountByString(b.rctx.Zctx, parent, name), nil + return vamop.NewCountByString(b.zctx(), parent, name), nil } else if name, ok := optimizer.IsSum(o); ok { - return vamop.NewSum(b.rctx.Zctx, parent, name), nil + return vamop.NewSum(b.zctx(), parent, name), nil } else { return nil, fmt.Errorf("internal error: unhandled dag.Summarize: %#v", o) } @@ -112,7 +112,7 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller, if err != nil { return nil, err } - return vamop.NewYield(b.rctx.Zctx, parent, exprs), nil + return vamop.NewYield(b.zctx(), parent, exprs), nil case *dag.Output: // XXX Ignore Output op for vectors for now. return parent, nil