Skip to content

Commit

Permalink
Merge branch 'main' into use-kernel.Builder.zctx
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt committed Sep 18, 2024
2 parents ed09733 + a9e605d commit 0abd3df
Show file tree
Hide file tree
Showing 26 changed files with 327 additions and 61 deletions.
32 changes: 20 additions & 12 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,9 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return op.NewApplier(b.rctx, parent, putter, b.resetters), nil
case *dag.Rename:
b.resetResetters()
var srcs, dsts []*expr.Lval
for _, a := range v.Args {
src, err := b.compileLval(a.RHS)
if err != nil {
return nil, err
}
dst, err := b.compileLval(a.LHS)
if err != nil {
return nil, err
}
srcs = append(srcs, src)
dsts = append(dsts, dst)
srcs, dsts, err := b.compileAssignmentsToLvals(v.Args)
if err != nil {
return nil, err
}
renamer := expr.NewRenamer(b.zctx(), srcs, dsts)
return op.NewApplier(b.rctx, parent, renamer, b.resetters), nil
Expand Down Expand Up @@ -447,6 +438,23 @@ func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assig
return keys, nil
}

func (b *Builder) compileAssignmentsToLvals(assignments []dag.Assignment) ([]*expr.Lval, []*expr.Lval, error) {
var srcs, dsts []*expr.Lval
for _, a := range assignments {
src, err := b.compileLval(a.RHS)
if err != nil {
return nil, nil, err
}
dst, err := b.compileLval(a.LHS)
if err != nil {
return nil, nil, err
}
srcs = append(srcs, src)
dsts = append(dsts, dst)
}
return srcs, dsts, nil
}

func splitAssignments(assignments []expr.Assignment) ([]*expr.Lval, []expr.Evaluator) {
n := len(assignments)
lhs := make([]*expr.Lval, 0, n)
Expand Down
16 changes: 14 additions & 2 deletions compiler/kernel/vexpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) {
return vamexpr.NewDottedExpr(b.zctx(), field.Path(e.Path)), nil
case *dag.Dot:
return b.compileVamDotExpr(e)
case *dag.IndexExpr:
return b.compileVamIndexExpr(e)
case *dag.UnaryExpr:
return b.compileVamUnary(*e)
case *dag.BinaryExpr:
Expand Down Expand Up @@ -101,8 +103,6 @@ func (b *Builder) compileVamBinary(e *dag.BinaryExpr) (vamexpr.Evaluator, error)
return vamexpr.NewCompare(b.zctx(), lhs, rhs, op), nil
case "+", "-", "*", "/", "%":
return vamexpr.NewArith(b.zctx(), lhs, rhs, op), nil
//case "[":
// return vamexpr.NewIndexExpr(b.zctx(), lhs, rhs), nil
default:
return nil, fmt.Errorf("invalid binary operator %s", op)
}
Expand Down Expand Up @@ -132,6 +132,18 @@ func (b *Builder) compileVamDotExpr(dot *dag.Dot) (vamexpr.Evaluator, error) {
return vamexpr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func (b *Builder) compileVamIndexExpr(idx *dag.IndexExpr) (vamexpr.Evaluator, error) {
e, err := b.compileVamExpr(idx.Expr)
if err != nil {
return nil, err
}
index, err := b.compileVamExpr(idx.Index)
if err != nil {
return nil, err
}
return vamexpr.NewIndexExpr(b.zctx(), e, index), nil
}

func (b *Builder) compileVamExprs(in []dag.Expr) ([]vamexpr.Evaluator, error) {
var exprs []vamexpr.Evaluator
for _, e := range in {
Expand Down
39 changes: 27 additions & 12 deletions compiler/kernel/vop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/runtime/vam/expr"
vamexpr "github.com/brimdata/zed/runtime/vam/expr"
vamop "github.com/brimdata/zed/runtime/vam/op"
"github.com/brimdata/zed/vector"
"github.com/brimdata/zed/zbuf"
Expand Down Expand Up @@ -66,6 +67,12 @@ func (b *Builder) compileVamScan(scan *dag.SeqScan, parent zbuf.Puller) (vector.

func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller, error) {
switch o := o.(type) {
case *dag.Cut:
e, err := b.compileVamAssignmentsToRecordExpression(nil, o.Args)
if err != nil {
return nil, err
}
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{e}), nil
case *dag.Filter:
e, err := b.compileVamExpr(o.Expr)
if err != nil {
Expand All @@ -85,7 +92,21 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller,
case *dag.Pass:
return parent, nil
case *dag.Put:
return b.compileVamPut(o, parent)
initial := []dag.RecordElem{
&dag.Spread{Kind: "Spread", Expr: &dag.This{Kind: "This"}},
}
e, err := b.compileVamAssignmentsToRecordExpression(initial, o.Args)
if err != nil {
return nil, err
}
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{expr.NewPutter(b.zctx(), e)}), nil
case *dag.Rename:
srcs, dsts, err := b.compileAssignmentsToLvals(o.Args)
if err != nil {
return nil, err
}
renamer := expr.NewRenamer(b.zctx(), srcs, dsts)
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{renamer}), nil
case *dag.Yield:
exprs, err := b.compileVamExprs(o.Exprs)
if err != nil {
Expand All @@ -100,22 +121,16 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller,
}
}

func (b *Builder) compileVamPut(put *dag.Put, parent vector.Puller) (vector.Puller, error) {
elems := []dag.RecordElem{
&dag.Spread{Kind: "Spread", Expr: &dag.This{Kind: "This"}},
}
for _, a := range put.Args {
func (b *Builder) compileVamAssignmentsToRecordExpression(initial []dag.RecordElem, assignments []dag.Assignment) (vamexpr.Evaluator, error) {
elems := initial
for _, a := range assignments {
lhs, ok := a.LHS.(*dag.This)
if !ok {
return nil, fmt.Errorf("internal error: dynamic field name not yet supported in vector runtime: %#v", a.LHS)
return nil, fmt.Errorf("internal error: dynamic field name not supported in vector runtime: %#v", a.LHS)
}
elems = append(elems, newDagRecordExprForPath(lhs.Path, a.RHS).Elems...)
}
e, err := b.compileVamRecordExpr(&dag.RecordExpr{Kind: "RecordExpr", Elems: elems})
if err != nil {
return nil, err
}
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{expr.NewPutter(b.zctx(), e)}), nil
return b.compileVamRecordExpr(&dag.RecordExpr{Kind: "RecordExpr", Elems: elems})
}

func newDagRecordExprForPath(path []string, expr dag.Expr) *dag.RecordExpr {
Expand Down
5 changes: 4 additions & 1 deletion runtime/sam/expr/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,10 @@ func (i *Index) Eval(ectx Context, this zed.Value) zed.Value {
func indexVector(zctx *zed.Context, ectx Context, inner zed.Type, vector zcode.Bytes, index zed.Value) zed.Value {
id := index.Type().ID()
if !zed.IsInteger(id) {
return zctx.WrapError("array index is not an integer", index)
return zctx.WrapError("index is not an integer", index)
}
if index.IsNull() {
return zctx.Missing()
}
var idx int
if zed.IsSigned(id) {
Expand Down
16 changes: 12 additions & 4 deletions runtime/sam/expr/renamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@ func NewRenamer(zctx *zed.Context, srcs, dsts []*Lval) *Renamer {
}

func (r *Renamer) Eval(ectx Context, this zed.Value) zed.Value {
val, err := r.EvalToValAndError(ectx, this)
if err != nil {
return r.zctx.WrapError(err.Error(), this)
}
return val
}

func (r *Renamer) EvalToValAndError(ectx Context, this zed.Value) (zed.Value, error) {
if !zed.IsRecordType(this.Type()) {
return this
return this, nil
}
srcs, dsts, err := r.evalFields(ectx, this)
if err != nil {
return r.zctx.WrapError(fmt.Sprintf("rename: %s", err), this)
return zed.Null, fmt.Errorf("rename: %w", err)
}
id := this.Type().ID()
m, ok := r.typeMap[id]
Expand All @@ -48,11 +56,11 @@ func (r *Renamer) Eval(ectx Context, this zed.Value) zed.Value {
var err error
typ, err = r.computeType(zed.TypeRecordOf(this.Type()), srcs, dsts)
if err != nil {
return r.zctx.WrapError(fmt.Sprintf("rename: %s", err), this)
return zed.Null, fmt.Errorf("rename: %w", err)
}
m[string(r.fieldsStr)] = typ
}
return zed.NewValue(typ, this.Bytes())
return zed.NewValue(typ, this.Bytes()), nil
}

func CheckRenameField(src, dst field.Path) error {
Expand Down
7 changes: 0 additions & 7 deletions runtime/sam/expr/ztests/negative-index.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion runtime/vam/expr/function/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f *Fields) Call(args ...vector.Any) vector.Any {
inner := vector.NewArray(f.innerTyp, inOffs, s, nil)
out := vector.NewArray(f.outerTyp, outOffs, inner, nil)
if len(errs) > 0 {
return mix(val.Len(), out, errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
return vector.Combine(out, errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
}
return out
default:
Expand Down
22 changes: 0 additions & 22 deletions runtime/vam/expr/function/function.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package function

import (
"slices"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/sam/expr/function"
Expand Down Expand Up @@ -59,23 +57,3 @@ func underAll(args []vector.Any) []vector.Any {
}
return args
}

func mix(desiredLen uint32, vec vector.Any, index []uint32, add vector.Any) vector.Any {
var tags []uint32
var vecs []vector.Any
if variant, ok := vec.(*vector.Variant); ok {
vecs = variant.Values
tags = slices.Clone(variant.Tags)
if len(tags) < int(desiredLen) {
tags = slices.Grow(tags, int(desiredLen))[:desiredLen]
}
} else {
vecs = []vector.Any{vec}
tags = make([]uint32, desiredLen)
}
n := uint32(len(vecs))
for _, k := range index {
tags[k] = n
}
return vector.NewVariant(tags, append(vecs, add))
}
96 changes: 96 additions & 0 deletions runtime/vam/expr/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/vector"
)

// Index represents an index operator "container[index]" where container is
// either an array or set (with index type integer), or a record
// (with index type string), or a map (with any index type).
type Index struct {
zctx *zed.Context
container Evaluator
index Evaluator
}

func NewIndexExpr(zctx *zed.Context, container, index Evaluator) Evaluator {
return &Index{zctx, container, index}
}

func (i *Index) Eval(this vector.Any) vector.Any {
return vector.Apply(true, i.eval, this)
}

func (i *Index) eval(args ...vector.Any) vector.Any {
this := args[0]
container := i.container.Eval(this)
index := i.index.Eval(this)
switch val := vector.Under(container).(type) {
case *vector.Array:
return indexArrayOrSet(i.zctx, val.Offsets, val.Values, index, val.Nulls)
case *vector.Set:
return indexArrayOrSet(i.zctx, val.Offsets, val.Values, index, val.Nulls)
case *vector.Record:
return indexRecord(i.zctx, val, index)
case *vector.Map:
panic("vector index operations on maps not supported")
default:
return vector.NewMissing(i.zctx, this.Len())
}
}

func indexArrayOrSet(zctx *zed.Context, offsets []uint32, vals, index vector.Any, nulls *vector.Bool) vector.Any {
if !zed.IsInteger(index.Type().ID()) {
return vector.NewWrappedError(zctx, "index is not an integer", index)
}
index = promoteToSigned(index)
var errs []uint32
var viewIndexes []uint32
for i, start := range offsets[:len(offsets)-1] {
idx, idxNull := vector.IntValue(index, uint32(i))
if !nulls.Value(uint32(i)) && !idxNull {
len := int64(offsets[i+1]) - int64(start)
if idx < 0 {
idx = len + idx
}
if idx >= 0 && idx < len {
viewIndexes = append(viewIndexes, start+uint32(idx))
continue
}
}
errs = append(errs, uint32(i))
}
out := vector.Deunion(vector.NewView(viewIndexes, vals))
if len(errs) > 0 {
return vector.Combine(out, errs, vector.NewMissing(zctx, uint32(len(errs))))
}
return out
}

func indexRecord(zctx *zed.Context, record *vector.Record, index vector.Any) vector.Any {
if index.Type().ID() != zed.IDString {
return vector.NewWrappedError(zctx, "record index is not a string", index)
}
var errcnt uint32
tags := make([]uint32, record.Len())
n := len(record.Typ.Fields)
viewIndexes := make([][]uint32, n)
for i := uint32(0); i < record.Len(); i++ {
field, _ := vector.StringValue(index, i)
k, ok := record.Typ.IndexOfField(field)
if !ok {
tags[i] = uint32(n)
errcnt++
continue
}
tags[i] = uint32(k)
viewIndexes[k] = append(viewIndexes[k], i)
}
out := make([]vector.Any, n+1)
out[n] = vector.NewMissing(zctx, errcnt)
for i, field := range record.Fields {
out[i] = vector.NewView(viewIndexes[i], field)
}
return vector.NewVariant(tags, out)
}
35 changes: 35 additions & 0 deletions runtime/vam/expr/renamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/runtime/sam/expr"
"github.com/brimdata/zed/vector"
)

// Renamer renames one or more fields in a record. See [expr.Renamer], on which
// it relies, for more detail.
type Renamer struct {
zctx *zed.Context
renamer *expr.Renamer
}

func NewRenamer(zctx *zed.Context, srcs, dsts []*expr.Lval) *Renamer {
return &Renamer{zctx, expr.NewRenamer(zctx, srcs, dsts)}
}

func (r *Renamer) Eval(vec vector.Any) vector.Any {
return vector.Apply(false, r.eval, vec)
}

func (r *Renamer) eval(vecs ...vector.Any) vector.Any {
vec := vecs[0]
recVec, ok := vector.Under(vec).(*vector.Record)
if !ok {
return vec
}
val, err := r.renamer.EvalToValAndError(nil, zed.NewValue(vec.Type(), nil))
if err != nil {
return vector.NewWrappedError(r.zctx, err.Error(), vec)
}
return vector.NewRecord(val.Type().(*zed.TypeRecord), recVec.Fields, recVec.Len(), recVec.Nulls)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: yield this[0], this[1]

vector: true

input: |
[0((int64,string)),"hi"((int64,string))]
|[0((int64,string)),"hi"((int64,string))]|
Expand Down
Loading

0 comments on commit 0abd3df

Please sign in to comment.