Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dyn fields #4793

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type (
VectorElem interface {
vectorElem()
}
PathElem interface {
pathElem()
}
)

// Exprs
Expand All @@ -27,7 +30,7 @@ type (
}
Assignment struct {
Kind string `json:"kind" unpack:""`
LHS Expr `json:"lhs"`
LHS *Path `json:"lhs"`
RHS Expr `json:"rhs"`
}
BinaryExpr struct {
Expand Down Expand Up @@ -72,6 +75,10 @@ type (
Exprs []Expr `json:"exprs"`
Body Seq `json:"body"`
}
Path struct {
Kind string `json:"kind" unpack:""`
Path []PathElem `json:"path"`
}
RecordExpr struct {
Kind string `json:"kind" unpack:""`
Elems []RecordElem `json:"elems"`
Expand Down Expand Up @@ -123,6 +130,7 @@ func (*Func) ExprDAG() {}
func (*Literal) ExprDAG() {}
func (*MapExpr) ExprDAG() {}
func (*OverExpr) ExprDAG() {}
func (*Path) ExprDAG() {}
func (*RecordExpr) ExprDAG() {}
func (*RegexpMatch) ExprDAG() {}
func (*RegexpSearch) ExprDAG() {}
Expand Down Expand Up @@ -159,6 +167,34 @@ func (*Spread) recordAST() {}
func (*Spread) vectorElem() {}
func (*VectorValue) vectorElem() {}

func (p *Path) StaticPath() *This {
this := &This{Kind: "This"}
for _, elem := range p.Path {
if p, ok := elem.(*StaticPathElem); ok {
this.Path = append(this.Path, p.Name)
} else {
return nil
}
}
return this
}

func NewStaticPath(path ...string) *Path {
p := &Path{Kind: "Path"}
for _, name := range path {
p.Path = append(p.Path, &StaticPathElem{Kind: "StaticPathElem", Name: name})
}
return p
}

type StaticPathElem struct {
Kind string `json:"kind" unpack:""`
Name string `json:"name"`
}

func (*This) pathElem() {}
func (*StaticPathElem) pathElem() {}

func NewBinaryExpr(op string, lhs, rhs Expr) *BinaryExpr {
return &BinaryExpr{
Kind: "BinaryExpr",
Expand Down
5 changes: 3 additions & 2 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ type (
Args []Assignment `json:"args"`
}
Rename struct {
Kind string `json:"kind" unpack:""`
Args []Assignment `json:"args"`
Kind string `json:"kind" unpack:""`
Dsts []*This `json:"dsts"`
Srcs []*This `json:"srcs"`
}
Scatter struct {
Kind string `json:"kind" unpack:""`
Expand Down
2 changes: 2 additions & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var unpacker = unpack.New(
Over{},
OverExpr{},
Pass{},
Path{},
StaticPathElem{},
PoolScan{},
Put{},
RecordExpr{},
Expand Down
28 changes: 23 additions & 5 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (b *Builder) compileExpr(e dag.Expr) (expr.Evaluator, error) {
return expr.NewDottedExpr(b.zctx(), field.Path(e.Path)), nil
case *dag.Dot:
return b.compileDotExpr(e)
case *dag.Path:
// Path only works as a general expression if it is a static path.
if this := e.StaticPath(); this != nil {
return expr.NewDottedExpr(b.zctx(), field.Path(this.Path)), nil
}
return nil, fmt.Errorf("internal error: invalid path expression %s", e)
case *dag.UnaryExpr:
return b.compileUnary(*e)
case *dag.BinaryExpr:
Expand Down Expand Up @@ -263,15 +269,27 @@ func (b *Builder) compileDotExpr(dot *dag.Dot) (expr.Evaluator, error) {
return expr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func compileLval(e dag.Expr) (field.Path, error) {
if this, ok := e.(*dag.This); ok {
return field.Path(this.Path), nil
func (b *Builder) compilePath(e *dag.Path) (*expr.Path, error) {
elems := make([]expr.PathElem, 0, len(e.Path))
for _, elem := range e.Path {
switch e := elem.(type) {
case *dag.This:
eval, err := b.compileExpr(e)
if err != nil {
return nil, err
}
elems = append(elems, expr.NewPathElemExpr(b.octx.Zctx, eval))
case *dag.StaticPathElem:
elems = append(elems, &expr.StaticPathElem{Name: e.Name})
default:
return nil, fmt.Errorf("internal error: invalid lval type %T", e)
}
}
return nil, errors.New("invalid expression on lhs of assignment")
return expr.NewPath(elems), nil
}

func (b *Builder) compileAssignment(node *dag.Assignment) (expr.Assignment, error) {
lhs, err := compileLval(node.LHS)
lhs, err := b.compilePath(node.LHS)
if err != nil {
return expr.Assignment{}, err
}
Expand Down
14 changes: 7 additions & 7 deletions compiler/kernel/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package kernel

import (
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/op/groupby"
"github.com/brimdata/zed/zbuf"
"golang.org/x/exp/slices"
)

func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (*groupby.Op, error) {
keys, err := b.compileAssignments(summarize.Keys)
keyPaths, keyVals, err := b.compileStaticAssignments(summarize.Keys)
if err != nil {
return nil, err
}
Expand All @@ -22,7 +22,7 @@ func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (
return nil, err
}
dir := order.Direction(summarize.InputSortDir)
return groupby.New(b.octx, parent, keys, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
return groupby.New(b.octx, parent, keyPaths, keyVals, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
}

func (b *Builder) compileAggAssignments(assignments []dag.Assignment) (field.List, []*expr.Aggregator, error) {
Expand All @@ -44,12 +44,12 @@ func (b *Builder) compileAggAssignment(assignment dag.Assignment) (field.Path, *
if !ok {
return nil, nil, errors.New("aggregator is not an aggregation expression")
}
lhs, err := compileLval(assignment.LHS)
if err != nil {
return nil, nil, fmt.Errorf("lhs of aggregation: %w", err)
this := assignment.LHS.StaticPath()
if this == nil {
return nil, nil, errors.New("internal error: aggregator assignment must be a static path")
}
m, err := b.compileAgg(aggAST)
return lhs, m, err
return slices.Clone(this.Path), m, err
}

func (b *Builder) compileAgg(agg *dag.Agg) (*expr.Aggregator, error) {
Expand Down
53 changes: 24 additions & 29 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,36 +174,13 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
if err != nil {
return nil, err
}
putter, err := expr.NewPutter(b.octx.Zctx, clauses)
if err != nil {
return nil, err
}
putter := expr.NewPutter(b.octx.Zctx, clauses)
return op.NewApplier(b.octx, parent, putter), nil
case *dag.Rename:
var srcs, dsts field.List
for _, fa := range v.Args {
dst, err := compileLval(fa.LHS)
if err != nil {
return nil, err
}
// We call CompileLval on the RHS because renames are
// restricted to dotted field name expressions.
src, err := compileLval(fa.RHS)
if err != nil {
return nil, err
}
if len(dst) != len(src) {
return nil, fmt.Errorf("cannot rename %s to %s", src, dst)
}
// Check that the prefixes match and, if not, report first place
// that they don't.
for i := 0; i <= len(src)-2; i++ {
if src[i] != dst[i] {
return nil, fmt.Errorf("cannot rename %s to %s (differ in %s vs %s)", src, dst, src[i], dst[i])
}
}
dsts = append(dsts, dst)
srcs = append(srcs, src)
for k := range v.Dsts {
srcs = append(srcs, v.Srcs[k].Path)
dsts = append(dsts, v.Dsts[k].Path)
}
renamer := expr.NewRenamer(b.octx.Zctx, srcs, dsts)
return op.NewApplier(b.octx, parent, renamer), nil
Expand Down Expand Up @@ -376,6 +353,24 @@ func (b *Builder) compileOver(parent zbuf.Puller, over *dag.Over) (zbuf.Puller,
return scope.NewExit(exit), nil
}

func (b *Builder) compileStaticAssignments(assignments []dag.Assignment) ([]field.Path, []expr.Evaluator, error) {
lhs := make([]field.Path, 0, len(assignments))
rhs := make([]expr.Evaluator, 0, len(assignments))
for _, a := range assignments {
this := a.LHS.StaticPath()
if this == nil {
return nil, nil, errors.New("internal error: dynamic lhs assignment when expecting a static path")
}
lhs = append(lhs, slices.Clone(this.Path))
r, err := b.compileExpr(a.RHS)
if err != nil {
return nil, nil, err
}
rhs = append(rhs, r)
}
return lhs, rhs, nil
}

func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assignment, error) {
keys := make([]expr.Assignment, 0, len(assignments))
for _, assignment := range assignments {
Expand All @@ -388,9 +383,9 @@ func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assig
return keys, nil
}

func splitAssignments(assignments []expr.Assignment) (field.List, []expr.Evaluator) {
func splitAssignments(assignments []expr.Assignment) ([]*expr.Path, []expr.Evaluator) {
n := len(assignments)
lhs := make(field.List, 0, n)
lhs := make([]*expr.Path, 0, n)
rhs := make([]expr.Evaluator, 0, n)
for _, a := range assignments {
lhs = append(lhs, a.LHS)
Expand Down
11 changes: 8 additions & 3 deletions compiler/optimizer/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (o *Optimizer) analyzeSortKey(op dag.Op, in order.SortKey) (order.SortKey,
return in, nil
case *dag.Rename:
out := in
for _, assignment := range op.Args {
if fieldOf(assignment.RHS).Equal(key) {
lhs := fieldOf(assignment.LHS)
for k := range op.Dsts {
if fieldOf(op.Srcs[k]).Equal(key) {
lhs := fieldOf(op.Dsts[k])
out = order.NewSortKey(in.Order, field.List{lhs})
}
}
Expand Down Expand Up @@ -211,6 +211,11 @@ func fieldOf(e dag.Expr) field.Path {
if this, ok := e.(*dag.This); ok {
return this.Path
}
if path, ok := e.(*dag.Path); ok {
if this := path.StaticPath(); this != nil {
return this.Path
}
}
return nil
}

Expand Down
Loading