Skip to content

Commit

Permalink
Support dynamic paths for put and cut op
Browse files Browse the repository at this point in the history
  • Loading branch information
mattnibs committed Oct 12, 2023
1 parent 7ee1443 commit 9e6e2c8
Show file tree
Hide file tree
Showing 23 changed files with 537 additions and 247 deletions.
5 changes: 3 additions & 2 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ type (
Args []Assignment `json:"args"`
}
Rename struct {
Kind string `json:"kind" unpack:""`
Args []Assignment `json:"args"`
Kind string `json:"kind" unpack:""`
Dsts []Expr `json:"dsts"`
Srcs []Expr `json:"srcs"`
}
Scatter struct {
Kind string `json:"kind" unpack:""`
Expand Down
37 changes: 32 additions & 5 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,42 @@ func (b *Builder) compileDotExpr(dot *dag.Dot) (expr.Evaluator, error) {
return expr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func compileLval(e dag.Expr) (field.Path, error) {
if this, ok := e.(*dag.This); ok {
return field.Path(this.Path), nil
func (b *Builder) compileLval(e dag.Expr) (*expr.Lval, error) {
switch e := e.(type) {
case *dag.This:
var elems []expr.LvalElem
for _, elem := range e.Path {
elems = append(elems, &expr.StaticLvalElem{Name: elem})
}
return expr.NewLval(elems), nil
case *dag.BinaryExpr:
if e.Op != "[" {
return nil, fmt.Errorf("internal error: invalid path expression %T", e)
}
lhs, err := b.compileLval(e.LHS)
if err != nil {
return nil, err
}
rhs, err := b.compileExpr(e.RHS)
if err != nil {
return nil, err
}
lhs.Elems = append(lhs.Elems, expr.NewExprLvalElem(b.zctx(), rhs))
return lhs, nil
case *dag.Dot:
lhs, err := b.compileLval(e.LHS)
if err != nil {
return nil, err
}
lhs.Elems = append(lhs.Elems, &expr.StaticLvalElem{Name: e.RHS})
return lhs, nil
default:
return nil, fmt.Errorf("internal error: invalid path expression %T", e)
}
return nil, errors.New("invalid expression on lhs of assignment")
}

func (b *Builder) compileAssignment(node *dag.Assignment) (expr.Assignment, error) {
lhs, err := compileLval(node.LHS)
lhs, err := b.compileLval(node.LHS)
if err != nil {
return expr.Assignment{}, err
}
Expand Down
14 changes: 7 additions & 7 deletions compiler/kernel/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package kernel

import (
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/op/groupby"
"github.com/brimdata/zed/zbuf"
"golang.org/x/exp/slices"
)

func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (*groupby.Op, error) {
keys, err := b.compileAssignments(summarize.Keys)
keyPaths, keyVals, err := b.compileStaticAssignments(summarize.Keys)
if err != nil {
return nil, err
}
Expand All @@ -22,7 +22,7 @@ func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (
return nil, err
}
dir := order.Direction(summarize.InputSortDir)
return groupby.New(b.octx, parent, keys, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
return groupby.New(b.octx, parent, keyPaths, keyVals, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
}

func (b *Builder) compileAggAssignments(assignments []dag.Assignment) (field.List, []*expr.Aggregator, error) {
Expand All @@ -44,12 +44,12 @@ func (b *Builder) compileAggAssignment(assignment dag.Assignment) (field.Path, *
if !ok {
return nil, nil, errors.New("aggregator is not an aggregation expression")
}
lhs, err := compileLval(assignment.LHS)
if err != nil {
return nil, nil, fmt.Errorf("lhs of aggregation: %w", err)
this, ok := assignment.LHS.(*dag.This)
if !ok {
return nil, nil, errors.New("internal error: aggregator assignment must be a static path")
}
m, err := b.compileAgg(aggAST)
return lhs, m, err
return slices.Clone(this.Path), m, err
}

func (b *Builder) compileAgg(agg *dag.Agg) (*expr.Aggregator, error) {
Expand Down
53 changes: 24 additions & 29 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,36 +174,13 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
if err != nil {
return nil, err
}
putter, err := expr.NewPutter(b.octx.Zctx, clauses)
if err != nil {
return nil, err
}
putter := expr.NewPutter(b.octx.Zctx, clauses)
return op.NewApplier(b.octx, parent, putter), nil
case *dag.Rename:
var srcs, dsts field.List
for _, fa := range v.Args {
dst, err := compileLval(fa.LHS)
if err != nil {
return nil, err
}
// We call CompileLval on the RHS because renames are
// restricted to dotted field name expressions.
src, err := compileLval(fa.RHS)
if err != nil {
return nil, err
}
if len(dst) != len(src) {
return nil, fmt.Errorf("cannot rename %s to %s", src, dst)
}
// Check that the prefixes match and, if not, report first place
// that they don't.
for i := 0; i <= len(src)-2; i++ {
if src[i] != dst[i] {
return nil, fmt.Errorf("cannot rename %s to %s (differ in %s vs %s)", src, dst, src[i], dst[i])
}
}
dsts = append(dsts, dst)
srcs = append(srcs, src)
for k := range v.Dsts {
srcs = append(srcs, v.Srcs[k].(*dag.This).Path)
dsts = append(dsts, v.Dsts[k].(*dag.This).Path)
}
renamer := expr.NewRenamer(b.octx.Zctx, srcs, dsts)
return op.NewApplier(b.octx, parent, renamer), nil
Expand Down Expand Up @@ -376,6 +353,24 @@ func (b *Builder) compileOver(parent zbuf.Puller, over *dag.Over) (zbuf.Puller,
return scope.NewExit(exit), nil
}

func (b *Builder) compileStaticAssignments(assignments []dag.Assignment) ([]field.Path, []expr.Evaluator, error) {
lhs := make([]field.Path, 0, len(assignments))
rhs := make([]expr.Evaluator, 0, len(assignments))
for _, a := range assignments {
this, ok := a.LHS.(*dag.This)
if !ok {
return nil, nil, errors.New("internal error: dynamic path in assignment when expecting a static path")
}
lhs = append(lhs, slices.Clone(this.Path))
r, err := b.compileExpr(a.RHS)
if err != nil {
return nil, nil, err
}
rhs = append(rhs, r)
}
return lhs, rhs, nil
}

func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assignment, error) {
keys := make([]expr.Assignment, 0, len(assignments))
for _, assignment := range assignments {
Expand All @@ -388,9 +383,9 @@ func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assig
return keys, nil
}

func splitAssignments(assignments []expr.Assignment) (field.List, []expr.Evaluator) {
func splitAssignments(assignments []expr.Assignment) ([]*expr.Lval, []expr.Evaluator) {
n := len(assignments)
lhs := make(field.List, 0, n)
lhs := make([]*expr.Lval, 0, n)
rhs := make([]expr.Evaluator, 0, n)
for _, a := range assignments {
lhs = append(lhs, a.LHS)
Expand Down
6 changes: 3 additions & 3 deletions compiler/optimizer/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (o *Optimizer) analyzeSortKey(op dag.Op, in order.SortKey) (order.SortKey,
return in, nil
case *dag.Rename:
out := in
for _, assignment := range op.Args {
if fieldOf(assignment.RHS).Equal(key) {
lhs := fieldOf(assignment.LHS)
for k := range op.Dsts {
if fieldOf(op.Srcs[k]).Equal(key) {
lhs := fieldOf(op.Dsts[k])
out = order.NewSortKey(in.Order, field.List{lhs})
}
}
Expand Down
110 changes: 64 additions & 46 deletions compiler/semantic/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,10 @@ func (a *analyzer) semExprs(in []ast.Expr) ([]dag.Expr, error) {
return exprs, nil
}

func (a *analyzer) semAssignments(assignments []ast.Assignment, summarize bool) ([]dag.Assignment, error) {
func (a *analyzer) semAssignments(assignments []ast.Assignment) ([]dag.Assignment, error) {
out := make([]dag.Assignment, 0, len(assignments))
for _, e := range assignments {
a, err := a.semAssignment(e, summarize)
a, err := a.semAssignment(e)
if err != nil {
return nil, err
}
Expand All @@ -519,64 +519,82 @@ func (a *analyzer) semAssignments(assignments []ast.Assignment, summarize bool)
return out, nil
}

func (a *analyzer) semAssignment(assign ast.Assignment, summarize bool) (dag.Assignment, error) {
rhs, err := a.semExpr(assign.RHS)
func (a *analyzer) semAssignment(e ast.Assignment) (dag.Assignment, error) {
rhs, err := a.semExpr(e.RHS)
if err != nil {
return dag.Assignment{}, fmt.Errorf("rhs of assignment expression: %w", err)
}
if _, ok := rhs.(*dag.Agg); ok {
summarize = true
return dag.Assignment{}, err
}
var lhs dag.Expr
if assign.LHS != nil {
lhs, err = a.semExpr(assign.LHS)
if e.LHS == nil {
path, err := derriveLHSPath(rhs)
if err != nil {
return dag.Assignment{}, fmt.Errorf("lhs of assigment expression: %w", err)
return dag.Assignment{}, err
}
lhs = &dag.This{Kind: "This", Path: path}
} else {
if lhs, err = a.semExpr(e.LHS); err != nil {
return dag.Assignment{}, err
}
}
if !a.isLval(lhs) {
return dag.Assignment{}, errors.New("illegal left-hand side of assignment")
}
if this, ok := lhs.(*dag.This); ok {
if len(this.Path) == 0 {
return dag.Assignment{}, errors.New("cannot assign to 'this'")
}
}
return dag.Assignment{Kind: "Assignment", LHS: lhs, RHS: rhs}, nil
}

func (a *analyzer) isLval(e dag.Expr) bool {
switch e := e.(type) {
case *dag.This:
return true
case *dag.Dot:
return a.isLval(e.LHS)
case *dag.BinaryExpr:
return e.Op == "[" && a.isLval(e.LHS)
}
return false
}

func assignHasDynamicPath(assignments []dag.Assignment) bool {
for _, a := range assignments {
if _, ok := a.LHS.(*dag.This); !ok {
return true
}
} else if call, ok := assign.RHS.(*ast.Call); ok {
path := []string{call.Name}
switch call.Name {
}
return false
}

func derriveLHSPath(rhs dag.Expr) ([]string, error) {
var path []string
switch rhs := rhs.(type) {
case *dag.Call:
path = []string{rhs.Name}
switch rhs.Name {
case "every":
// If LHS is nil and the call is every() make the LHS field ts since
// field ts assumed with every.
path = []string{"ts"}
case "quiet":
if len(call.Args) > 0 {
if p, ok := rhs.(*dag.Call).Args[0].(*dag.This); ok {
path = p.Path
if len(rhs.Args) > 0 {
if this, ok := rhs.Args[0].(*dag.This); ok {
path = this.Path
}
}
}
lhs = &dag.This{Kind: "This", Path: path}
} else if agg, ok := assign.RHS.(*ast.Agg); ok {
lhs = &dag.This{Kind: "This", Path: []string{agg.Name}}
} else if v, ok := rhs.(*dag.Var); ok {
lhs = &dag.This{Kind: "This", Path: []string{v.Name}}
} else {
lhs, err = a.semExpr(assign.RHS)
if err != nil {
return dag.Assignment{}, errors.New("assignment name could not be inferred from rhs expression")
}
}
if summarize {
// Summarize always outputs its results as new records of "this"
// so if we have an "as" that overrides "this", we just
// convert it back to a local this.
if dot, ok := lhs.(*dag.Dot); ok {
if v, ok := dot.LHS.(*dag.Var); ok && v.Name == "this" {
lhs = &dag.This{Kind: "This", Path: []string{dot.RHS}}
}
}
}
// Make sure we have a valid lval for lhs.
this, ok := lhs.(*dag.This)
if !ok {
return dag.Assignment{}, errors.New("illegal left-hand side of assignment")
}
if len(this.Path) == 0 {
return dag.Assignment{}, errors.New("cannot assign to 'this'")
case *dag.Agg:
path = []string{rhs.Name}
case *dag.Var:
path = []string{rhs.Name}
case *dag.This:
path = rhs.Path
default:
return nil, errors.New("assignment name could not be inferred from rhs expression")
}
return dag.Assignment{Kind: "Assignment", LHS: lhs, RHS: rhs}, nil
return path, nil
}

func (a *analyzer) semFields(exprs []ast.Expr) ([]dag.Expr, error) {
Expand Down
Loading

0 comments on commit 9e6e2c8

Please sign in to comment.