Skip to content

Commit

Permalink
Merge branch 'main' into vector-rename
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt committed Sep 18, 2024
2 parents 3dac867 + 4460054 commit c7bf0b4
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 45 deletions.
16 changes: 14 additions & 2 deletions compiler/kernel/vexpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) {
return vamexpr.NewDottedExpr(b.zctx(), field.Path(e.Path)), nil
case *dag.Dot:
return b.compileVamDotExpr(e)
case *dag.IndexExpr:
return b.compileVamIndexExpr(e)
case *dag.UnaryExpr:
return b.compileVamUnary(*e)
case *dag.BinaryExpr:
Expand Down Expand Up @@ -101,8 +103,6 @@ func (b *Builder) compileVamBinary(e *dag.BinaryExpr) (vamexpr.Evaluator, error)
return vamexpr.NewCompare(b.zctx(), lhs, rhs, op), nil
case "+", "-", "*", "/", "%":
return vamexpr.NewArith(b.zctx(), lhs, rhs, op), nil
//case "[":
// return vamexpr.NewIndexExpr(b.zctx(), lhs, rhs), nil
default:
return nil, fmt.Errorf("invalid binary operator %s", op)
}
Expand Down Expand Up @@ -132,6 +132,18 @@ func (b *Builder) compileVamDotExpr(dot *dag.Dot) (vamexpr.Evaluator, error) {
return vamexpr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func (b *Builder) compileVamIndexExpr(idx *dag.IndexExpr) (vamexpr.Evaluator, error) {
e, err := b.compileVamExpr(idx.Expr)
if err != nil {
return nil, err
}
index, err := b.compileVamExpr(idx.Index)
if err != nil {
return nil, err
}
return vamexpr.NewIndexExpr(b.zctx(), e, index), nil
}

func (b *Builder) compileVamExprs(in []dag.Expr) ([]vamexpr.Evaluator, error) {
var exprs []vamexpr.Evaluator
for _, e := range in {
Expand Down
32 changes: 20 additions & 12 deletions compiler/kernel/vop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/runtime/vam/expr"
vamexpr "github.com/brimdata/zed/runtime/vam/expr"
vamop "github.com/brimdata/zed/runtime/vam/op"
"github.com/brimdata/zed/vector"
"github.com/brimdata/zed/zbuf"
Expand Down Expand Up @@ -66,6 +67,12 @@ func (b *Builder) compileVamScan(scan *dag.SeqScan, parent zbuf.Puller) (vector.

func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller, error) {
switch o := o.(type) {
case *dag.Cut:
e, err := b.compileVamAssignmentsToRecordExpression(nil, o.Args)
if err != nil {
return nil, err
}
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{e}), nil
case *dag.Filter:
e, err := b.compileVamExpr(o.Expr)
if err != nil {
Expand All @@ -85,7 +92,14 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller,
case *dag.Pass:
return parent, nil
case *dag.Put:
return b.compileVamPut(o, parent)
initial := []dag.RecordElem{
&dag.Spread{Kind: "Spread", Expr: &dag.This{Kind: "This"}},
}
e, err := b.compileVamAssignmentsToRecordExpression(initial, o.Args)
if err != nil {
return nil, err
}
return vamop.NewYield(b.zctx(), parent, []expr.Evaluator{expr.NewPutter(b.zctx(), e)}), nil
case *dag.Rename:
srcs, dsts, err := b.compileAssignmentsToLvals(o.Args)
if err != nil {
Expand All @@ -107,22 +121,16 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller,
}
}

func (b *Builder) compileVamPut(put *dag.Put, parent vector.Puller) (vector.Puller, error) {
elems := []dag.RecordElem{
&dag.Spread{Kind: "Spread", Expr: &dag.This{Kind: "This"}},
}
for _, a := range put.Args {
func (b *Builder) compileVamAssignmentsToRecordExpression(initial []dag.RecordElem, assignments []dag.Assignment) (vamexpr.Evaluator, error) {
elems := initial
for _, a := range assignments {
lhs, ok := a.LHS.(*dag.This)
if !ok {
return nil, fmt.Errorf("internal error: dynamic field name not yet supported in vector runtime: %#v", a.LHS)
return nil, fmt.Errorf("internal error: dynamic field name not supported in vector runtime: %#v", a.LHS)
}
elems = append(elems, newDagRecordExprForPath(lhs.Path, a.RHS).Elems...)
}
e, err := b.compileVamRecordExpr(&dag.RecordExpr{Kind: "RecordExpr", Elems: elems})
if err != nil {
return nil, err
}
return vamop.NewYield(b.rctx.Zctx, parent, []expr.Evaluator{expr.NewPutter(b.zctx(), e)}), nil
return b.compileVamRecordExpr(&dag.RecordExpr{Kind: "RecordExpr", Elems: elems})
}

func newDagRecordExprForPath(path []string, expr dag.Expr) *dag.RecordExpr {
Expand Down
5 changes: 4 additions & 1 deletion runtime/sam/expr/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,10 @@ func (i *Index) Eval(ectx Context, this zed.Value) zed.Value {
func indexVector(zctx *zed.Context, ectx Context, inner zed.Type, vector zcode.Bytes, index zed.Value) zed.Value {
id := index.Type().ID()
if !zed.IsInteger(id) {
return zctx.WrapError("array index is not an integer", index)
return zctx.WrapError("index is not an integer", index)
}
if index.IsNull() {
return zctx.Missing()
}
var idx int
if zed.IsSigned(id) {
Expand Down
7 changes: 0 additions & 7 deletions runtime/sam/expr/ztests/negative-index.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion runtime/vam/expr/function/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f *Fields) Call(args ...vector.Any) vector.Any {
inner := vector.NewArray(f.innerTyp, inOffs, s, nil)
out := vector.NewArray(f.outerTyp, outOffs, inner, nil)
if len(errs) > 0 {
return mix(val.Len(), out, errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
return vector.Combine(out, errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
}
return out
default:
Expand Down
22 changes: 0 additions & 22 deletions runtime/vam/expr/function/function.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package function

import (
"slices"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/sam/expr/function"
Expand Down Expand Up @@ -59,23 +57,3 @@ func underAll(args []vector.Any) []vector.Any {
}
return args
}

func mix(desiredLen uint32, vec vector.Any, index []uint32, add vector.Any) vector.Any {
var tags []uint32
var vecs []vector.Any
if variant, ok := vec.(*vector.Variant); ok {
vecs = variant.Values
tags = slices.Clone(variant.Tags)
if len(tags) < int(desiredLen) {
tags = slices.Grow(tags, int(desiredLen))[:desiredLen]
}
} else {
vecs = []vector.Any{vec}
tags = make([]uint32, desiredLen)
}
n := uint32(len(vecs))
for _, k := range index {
tags[k] = n
}
return vector.NewVariant(tags, append(vecs, add))
}
96 changes: 96 additions & 0 deletions runtime/vam/expr/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/vector"
)

// Index represents an index operator "container[index]" where container is
// either an array or set (with index type integer), or a record
// (with index type string), or a map (with any index type).
type Index struct {
zctx *zed.Context
container Evaluator
index Evaluator
}

func NewIndexExpr(zctx *zed.Context, container, index Evaluator) Evaluator {
return &Index{zctx, container, index}
}

func (i *Index) Eval(this vector.Any) vector.Any {
return vector.Apply(true, i.eval, this)
}

func (i *Index) eval(args ...vector.Any) vector.Any {
this := args[0]
container := i.container.Eval(this)
index := i.index.Eval(this)
switch val := vector.Under(container).(type) {
case *vector.Array:
return indexArrayOrSet(i.zctx, val.Offsets, val.Values, index, val.Nulls)
case *vector.Set:
return indexArrayOrSet(i.zctx, val.Offsets, val.Values, index, val.Nulls)
case *vector.Record:
return indexRecord(i.zctx, val, index)
case *vector.Map:
panic("vector index operations on maps not supported")
default:
return vector.NewMissing(i.zctx, this.Len())
}
}

func indexArrayOrSet(zctx *zed.Context, offsets []uint32, vals, index vector.Any, nulls *vector.Bool) vector.Any {
if !zed.IsInteger(index.Type().ID()) {
return vector.NewWrappedError(zctx, "index is not an integer", index)
}
index = promoteToSigned(index)
var errs []uint32
var viewIndexes []uint32
for i, start := range offsets[:len(offsets)-1] {
idx, idxNull := vector.IntValue(index, uint32(i))
if !nulls.Value(uint32(i)) && !idxNull {
len := int64(offsets[i+1]) - int64(start)
if idx < 0 {
idx = len + idx
}
if idx >= 0 && idx < len {
viewIndexes = append(viewIndexes, start+uint32(idx))
continue
}
}
errs = append(errs, uint32(i))
}
out := vector.Deunion(vector.NewView(viewIndexes, vals))
if len(errs) > 0 {
return vector.Combine(out, errs, vector.NewMissing(zctx, uint32(len(errs))))
}
return out
}

func indexRecord(zctx *zed.Context, record *vector.Record, index vector.Any) vector.Any {
if index.Type().ID() != zed.IDString {
return vector.NewWrappedError(zctx, "record index is not a string", index)
}
var errcnt uint32
tags := make([]uint32, record.Len())
n := len(record.Typ.Fields)
viewIndexes := make([][]uint32, n)
for i := uint32(0); i < record.Len(); i++ {
field, _ := vector.StringValue(index, i)
k, ok := record.Typ.IndexOfField(field)
if !ok {
tags[i] = uint32(n)
errcnt++
continue
}
tags[i] = uint32(k)
viewIndexes[k] = append(viewIndexes[k], i)
}
out := make([]vector.Any, n+1)
out[n] = vector.NewMissing(zctx, errcnt)
for i, field := range record.Fields {
out[i] = vector.NewView(viewIndexes[i], field)
}
return vector.NewVariant(tags, out)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: yield this[0], this[1]

vector: true

input: |
[0((int64,string)),"hi"((int64,string))]
|[0((int64,string)),"hi"((int64,string))]|
Expand Down
42 changes: 42 additions & 0 deletions runtime/ztests/expr/index.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
zed: val[idx]

vector: true

input: |
// array
{val:[1,2,3,"foo"],idx:-1}
{val:[1,2,3,"bar"],idx:1(uint8)}
{val:[1,2,3,"foo"],idx:-4}
{val:[1,2,3,"foo"],idx:-5}
{val:null([(int64,string)]),idx:-5}
{val:[1,2,3,"foo"],idx:null(int64)}
{val:[1,2,3,"foo"],idx:"hi"}
// set
{val:|[1,2,3,"foo"]|,idx:-1}
{val:|[1,2,3,"bar"]|,idx:1}
{val:|[1,2,3,"foo"]|,idx:-4}
{val:|[1,2,3,"foo"]|,idx:-5}
{val:|[1,2,3,"foo"]|,idx:"hi"}
// record
{val:{a:"foo",b:"bar"},idx:"a"}
{val:{a:"bar",b:"baz"},idx:"b"}
{val:{a:"foo",b:"bar"},idx:1.}
{val:{a:"bar",b:"baz"},idx:"doesnotexist"}
output: |
"foo"
2
1
error("missing")
error("missing")
error("missing")
error({message:"index is not an integer",on:"hi"})
"foo"
2
1
error("missing")
error({message:"index is not an integer",on:"hi"})
"foo"
"baz"
error({message:"record index is not a string",on:1.})
error("missing")
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: cut foo,bar

vector: true

input: |
{foo:"foo1",bar:"bar1"}
{foo:"foo2",bar:"bar2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: cut foo

vector: true

input: |
{bar:"bar1"}
{bar:"bar2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: cut foo

vector: true

input: |
{foo:"foo1",bar:"bar1"}
{foo:"foo2",bar:"bar2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: cut foo

vector: true

input: |
{bar:"bar1"}
{bar:"bar2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: cut foo

vector: true

input: &input |
{foo:"foo1"}
{foo:"foo2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: 'cut this:=a'

vector: true

input: |
{a:1(int32)}
Expand Down
25 changes: 25 additions & 0 deletions vector/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,28 @@ type Puller interface {
}

type Builder func(*zcode.Builder) bool

func Combine(vec Any, index []uint32, add Any) Any {
var vecs []Any
tags := make([]uint32, int(vec.Len())+len(index))
if variant, ok := vec.(*Variant); ok {
vecs = variant.Values
varTags := variant.Tags
n := uint32(len(vecs))
for i := uint32(0); i < uint32(len(tags)); i++ {
if len(index) > 0 && i == index[0] {
tags[i] = n
index = index[1:]
} else {
tags[i] = varTags[0]
varTags = varTags[1:]
}
}
} else {
vecs = []Any{vec}
for _, k := range index {
tags[k] = 1
}
}
return NewVariant(tags, append(vecs, add))
}
Loading

0 comments on commit c7bf0b4

Please sign in to comment.