Skip to content

Commit

Permalink
Support dynamic paths for put and cut op
Browse files Browse the repository at this point in the history
  • Loading branch information
mattnibs committed Oct 11, 2023
1 parent 7ee1443 commit bed68b9
Show file tree
Hide file tree
Showing 27 changed files with 673 additions and 277 deletions.
38 changes: 37 additions & 1 deletion compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ type (
Expr interface {
ExprDAG()
}
PathElem interface {
pathElem()
}
RecordElem interface {
recordAST()
}
Expand All @@ -27,7 +30,7 @@ type (
}
Assignment struct {
Kind string `json:"kind" unpack:""`
LHS Expr `json:"lhs"`
LHS Path `json:"lhs"`
RHS Expr `json:"rhs"`
}
BinaryExpr struct {
Expand Down Expand Up @@ -72,6 +75,10 @@ type (
Exprs []Expr `json:"exprs"`
Body Seq `json:"body"`
}
Path struct {
Kind string `json:"kind" unpack:""`
Path []PathElem `json:"path"`
}
RecordExpr struct {
Kind string `json:"kind" unpack:""`
Elems []RecordElem `json:"elems"`
Expand Down Expand Up @@ -123,6 +130,7 @@ func (*Func) ExprDAG() {}
func (*Literal) ExprDAG() {}
func (*MapExpr) ExprDAG() {}
func (*OverExpr) ExprDAG() {}
func (*Path) ExprDAG() {}
func (*RecordExpr) ExprDAG() {}
func (*RegexpMatch) ExprDAG() {}
func (*RegexpSearch) ExprDAG() {}
Expand Down Expand Up @@ -159,6 +167,34 @@ func (*Spread) recordAST() {}
func (*Spread) vectorElem() {}
func (*VectorValue) vectorElem() {}

func (p *Path) StaticPath() *This {
this := &This{Kind: "This"}
for _, elem := range p.Path {
p, ok := elem.(*StaticPathElem)
if !ok {
return nil
}
this.Path = append(this.Path, p.Name)
}
return this
}

func NewStaticPath(path ...string) *Path {
p := Path{Kind: "Path"}
for _, name := range path {
p.Path = append(p.Path, &StaticPathElem{Kind: "StaticPathElem", Name: name})
}
return &p
}

type StaticPathElem struct {
Kind string `json:"kind" unpack:""`
Name string `json:"name"`
}

func (*This) pathElem() {}
func (*StaticPathElem) pathElem() {}

func NewBinaryExpr(op string, lhs, rhs Expr) *BinaryExpr {
return &BinaryExpr{
Kind: "BinaryExpr",
Expand Down
5 changes: 3 additions & 2 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ type (
Args []Assignment `json:"args"`
}
Rename struct {
Kind string `json:"kind" unpack:""`
Args []Assignment `json:"args"`
Kind string `json:"kind" unpack:""`
Dsts []*This `json:"dsts"`
Srcs []*This `json:"srcs"`
}
Scatter struct {
Kind string `json:"kind" unpack:""`
Expand Down
2 changes: 2 additions & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var unpacker = unpack.New(
Over{},
OverExpr{},
Pass{},
Path{},
PoolScan{},
Put{},
RecordExpr{},
Expand All @@ -51,6 +52,7 @@ var unpacker = unpack.New(
Slicer{},
Sort{},
Spread{},
StaticPathElem{},
Summarize{},
Switch{},
Tail{},
Expand Down
28 changes: 23 additions & 5 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (b *Builder) compileExpr(e dag.Expr) (expr.Evaluator, error) {
return expr.NewDottedExpr(b.zctx(), field.Path(e.Path)), nil
case *dag.Dot:
return b.compileDotExpr(e)
case *dag.Path:
// Path only works as a general expression if it is a static path.
if this := e.StaticPath(); this != nil {
return expr.NewDottedExpr(b.zctx(), field.Path(this.Path)), nil
}
return nil, fmt.Errorf("internal error: invalid path expression %s", e)
case *dag.UnaryExpr:
return b.compileUnary(*e)
case *dag.BinaryExpr:
Expand Down Expand Up @@ -263,15 +269,27 @@ func (b *Builder) compileDotExpr(dot *dag.Dot) (expr.Evaluator, error) {
return expr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func compileLval(e dag.Expr) (field.Path, error) {
if this, ok := e.(*dag.This); ok {
return field.Path(this.Path), nil
func (b *Builder) compilePath(e dag.Path) (*expr.Path, error) {
elems := make([]expr.PathElem, 0, len(e.Path))
for _, elem := range e.Path {
switch e := elem.(type) {
case *dag.This:
eval, err := b.compileExpr(e)
if err != nil {
return nil, err
}
elems = append(elems, expr.NewPathElemExpr(b.octx.Zctx, eval))
case *dag.StaticPathElem:
elems = append(elems, &expr.StaticPathElem{Name: e.Name})
default:
return nil, fmt.Errorf("internal error: invalid lval type %T", e)
}
}
return nil, errors.New("invalid expression on lhs of assignment")
return expr.NewPath(elems), nil
}

func (b *Builder) compileAssignment(node *dag.Assignment) (expr.Assignment, error) {
lhs, err := compileLval(node.LHS)
lhs, err := b.compilePath(node.LHS)
if err != nil {
return expr.Assignment{}, err
}
Expand Down
14 changes: 7 additions & 7 deletions compiler/kernel/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package kernel

import (
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/op/groupby"
"github.com/brimdata/zed/zbuf"
"golang.org/x/exp/slices"
)

func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (*groupby.Op, error) {
keys, err := b.compileAssignments(summarize.Keys)
keyPaths, keyVals, err := b.compileStaticAssignments(summarize.Keys)
if err != nil {
return nil, err
}
Expand All @@ -22,7 +22,7 @@ func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (
return nil, err
}
dir := order.Direction(summarize.InputSortDir)
return groupby.New(b.octx, parent, keys, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
return groupby.New(b.octx, parent, keyPaths, keyVals, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
}

func (b *Builder) compileAggAssignments(assignments []dag.Assignment) (field.List, []*expr.Aggregator, error) {
Expand All @@ -44,12 +44,12 @@ func (b *Builder) compileAggAssignment(assignment dag.Assignment) (field.Path, *
if !ok {
return nil, nil, errors.New("aggregator is not an aggregation expression")
}
lhs, err := compileLval(assignment.LHS)
if err != nil {
return nil, nil, fmt.Errorf("lhs of aggregation: %w", err)
this := assignment.LHS.StaticPath()
if this == nil {
return nil, nil, errors.New("internal error: aggregator assignment must be a static path")
}
m, err := b.compileAgg(aggAST)
return lhs, m, err
return slices.Clone(this.Path), m, err
}

func (b *Builder) compileAgg(agg *dag.Agg) (*expr.Aggregator, error) {
Expand Down
53 changes: 24 additions & 29 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,36 +174,13 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
if err != nil {
return nil, err
}
putter, err := expr.NewPutter(b.octx.Zctx, clauses)
if err != nil {
return nil, err
}
putter := expr.NewPutter(b.octx.Zctx, clauses)
return op.NewApplier(b.octx, parent, putter), nil
case *dag.Rename:
var srcs, dsts field.List
for _, fa := range v.Args {
dst, err := compileLval(fa.LHS)
if err != nil {
return nil, err
}
// We call CompileLval on the RHS because renames are
// restricted to dotted field name expressions.
src, err := compileLval(fa.RHS)
if err != nil {
return nil, err
}
if len(dst) != len(src) {
return nil, fmt.Errorf("cannot rename %s to %s", src, dst)
}
// Check that the prefixes match and, if not, report first place
// that they don't.
for i := 0; i <= len(src)-2; i++ {
if src[i] != dst[i] {
return nil, fmt.Errorf("cannot rename %s to %s (differ in %s vs %s)", src, dst, src[i], dst[i])
}
}
dsts = append(dsts, dst)
srcs = append(srcs, src)
for k := range v.Dsts {
srcs = append(srcs, v.Srcs[k].Path)
dsts = append(dsts, v.Dsts[k].Path)
}
renamer := expr.NewRenamer(b.octx.Zctx, srcs, dsts)
return op.NewApplier(b.octx, parent, renamer), nil
Expand Down Expand Up @@ -376,6 +353,24 @@ func (b *Builder) compileOver(parent zbuf.Puller, over *dag.Over) (zbuf.Puller,
return scope.NewExit(exit), nil
}

func (b *Builder) compileStaticAssignments(assignments []dag.Assignment) ([]field.Path, []expr.Evaluator, error) {
lhs := make([]field.Path, 0, len(assignments))
rhs := make([]expr.Evaluator, 0, len(assignments))
for _, a := range assignments {
this := a.LHS.StaticPath()
if this == nil {
return nil, nil, errors.New("internal error: dynamic path in assignment when expecting a static path")
}
lhs = append(lhs, slices.Clone(this.Path))
r, err := b.compileExpr(a.RHS)
if err != nil {
return nil, nil, err
}
rhs = append(rhs, r)
}
return lhs, rhs, nil
}

func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assignment, error) {
keys := make([]expr.Assignment, 0, len(assignments))
for _, assignment := range assignments {
Expand All @@ -388,9 +383,9 @@ func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assig
return keys, nil
}

func splitAssignments(assignments []expr.Assignment) (field.List, []expr.Evaluator) {
func splitAssignments(assignments []expr.Assignment) ([]*expr.Path, []expr.Evaluator) {
n := len(assignments)
lhs := make(field.List, 0, n)
lhs := make([]*expr.Path, 0, n)
rhs := make([]expr.Evaluator, 0, n)
for _, a := range assignments {
lhs = append(lhs, a.LHS)
Expand Down
17 changes: 11 additions & 6 deletions compiler/optimizer/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (o *Optimizer) analyzeSortKey(op dag.Op, in order.SortKey) (order.SortKey,
return in, nil
case *dag.Rename:
out := in
for _, assignment := range op.Args {
if fieldOf(assignment.RHS).Equal(key) {
lhs := fieldOf(assignment.LHS)
for k := range op.Dsts {
if fieldOf(op.Srcs[k]).Equal(key) {
lhs := fieldOf(op.Dsts[k])
out = order.NewSortKey(in.Order, field.List{lhs})
}
}
Expand All @@ -66,7 +66,7 @@ func (o *Optimizer) analyzeSortKey(op dag.Op, in order.SortKey) (order.SortKey,
return order.Nil, nil
case *dag.Put:
for _, assignment := range op.Args {
if fieldOf(assignment.LHS).Equal(key) {
if fieldOf(&assignment.LHS).Equal(key) {
return order.Nil, nil
}
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func isKeyOfSummarize(summarize *dag.Summarize, in order.SortKey) bool {
}
key := in.Keys[0]
for _, outputKeyExpr := range summarize.Keys {
groupByKey := fieldOf(outputKeyExpr.LHS)
groupByKey := fieldOf(&outputKeyExpr.LHS)
if groupByKey.Equal(key) {
rhsExpr := outputKeyExpr.RHS
rhs := fieldOf(rhsExpr)
Expand Down Expand Up @@ -147,7 +147,7 @@ func analyzeCuts(assignments []dag.Assignment, sortKey order.SortKey) order.Sort
scoreboard := make(map[string]field.Path)
scoreboard[fieldKey(key)] = key
for _, a := range assignments {
lhs := fieldOf(a.LHS)
lhs := fieldOf(&a.LHS)
rhs := fieldOf(a.RHS)
if lhs == nil {
// If we cannot statically determine the data flow,
Expand Down Expand Up @@ -211,6 +211,11 @@ func fieldOf(e dag.Expr) field.Path {
if this, ok := e.(*dag.This); ok {
return this.Path
}
if path, ok := e.(*dag.Path); ok {
if this := path.StaticPath(); this != nil {
return this.Path
}
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or
//XXX handle only primary key for now
key := parent.Primary()
for _, k := range op.Keys {
if groupByKey := fieldOf(k.LHS); groupByKey.Equal(key) {
if groupByKey := fieldOf(&k.LHS); groupByKey.Equal(key) {
rhsExpr := k.RHS
rhs := fieldOf(rhsExpr)
if rhs.Equal(key) || orderPreservingCall(rhsExpr, groupByKey) {
Expand Down
2 changes: 1 addition & 1 deletion compiler/optimizer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (o *Optimizer) liftIntoParPaths(ops []dag.Op) {
// so the ingress aggregator should simply reference the key
// by its name. This loop updates the ingress to do so.
for k := range op.Keys {
op.Keys[k].RHS = op.Keys[k].LHS
op.Keys[k].RHS = &op.Keys[k].LHS
}
case *dag.Sort:
if len(op.Args) != 1 {
Expand Down
Loading

0 comments on commit bed68b9

Please sign in to comment.