Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jamii committed Nov 9, 2023
1 parent d30c7d7 commit b2abf39
Show file tree
Hide file tree
Showing 16 changed files with 412 additions and 95 deletions.
21 changes: 17 additions & 4 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dag
import (
"slices"

"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -161,6 +162,11 @@ type (
Commit ksuid.KSUID `json:"commit"`
KeyPruner Expr `json:"key_pruner"`
}
VecLister struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Commit ksuid.KSUID `json:"commit"`
}
Slicer struct {
Kind string `json:"kind" unpack:""`
}
Expand All @@ -170,6 +176,11 @@ type (
Filter Expr `json:"filter"`
KeyPruner Expr `json:"key_pruner"`
}
VecSeqScan struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Demand demand.Demand `json:"demand"`
}
Deleter struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Expand Down Expand Up @@ -251,10 +262,12 @@ func (*LakeMetaScan) OpNode() {}
func (*PoolMetaScan) OpNode() {}
func (*CommitMetaScan) OpNode() {}

func (*Lister) OpNode() {}
func (*Slicer) OpNode() {}
func (*SeqScan) OpNode() {}
func (*Deleter) OpNode() {}
func (*Lister) OpNode() {}
func (*VecLister) OpNode() {}
func (*Slicer) OpNode() {}
func (*SeqScan) OpNode() {}
func (*VecSeqScan) OpNode() {}
func (*Deleter) OpNode() {}

// Various Op fields

Expand Down
17 changes: 16 additions & 1 deletion compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
}
}
return meta.NewSortedLister(b.octx.Context, b.mctx, b.source.Lake(), pool, v.Commit, pruner)
case *dag.VecLister:
if parent != nil {
return nil, errors.New("internal error: data source cannot have a parent operator")
}
pool, err := b.lookupPool(v.Pool)
if err != nil {
return nil, err
}
return meta.NewVecLister(b.octx.Context, b.mctx, b.source.Lake(), pool, v.Commit)
case *dag.Slicer:
return meta.NewSlicer(parent, b.mctx), nil
case *dag.SeqScan:
Expand All @@ -286,6 +295,12 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
}
}
return meta.NewSequenceScanner(b.octx, parent, pool, b.PushdownOf(v.Filter), pruner, b.progress), nil
case *dag.VecSeqScan:
pool, err := b.lookupPool(v.Pool)
if err != nil {
return nil, err
}
return meta.NewVecSequenceScanner(b.octx, parent, pool, b.progress, v.Demand), nil
case *dag.Deleter:
pool, err := b.lookupPool(v.Pool)
if err != nil {
Expand Down Expand Up @@ -663,7 +678,7 @@ func isEntry(seq dag.Seq) bool {
return false
}
switch op := seq[0].(type) {
case *Reader, *dag.Lister, *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan:
case *Reader, *dag.Lister, *dag.VecLister, *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan:
return true
case *dag.Scope:
return isEntry(op.Body)
Expand Down
71 changes: 49 additions & 22 deletions compiler/optimizer/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ import (
"github.com/brimdata/zed/compiler/optimizer/demand"
)

func insertDemand(seq dag.Seq) dag.Seq {
demands := InferDemandSeqOut(seq)
return walk(seq, true, func(seq dag.Seq) dag.Seq {
for _, op := range seq {
switch vecOp := op.(type) {
case *dag.VecSeqScan:
vecOp.Demand = demands[op]
}
}
return seq
})
}

// Returns a map from op to the demand on the output of that op.
func InferDemandSeqOut(seq dag.Seq) map[dag.Op]demand.Demand {
demands := make(map[dag.Op]demand.Demand)
Expand All @@ -29,73 +42,87 @@ func inferDemandSeqOutWith(demands map[dag.Op]demand.Demand, demandSeqOut demand
// Infer the demand that `op` places on it's input.
var demandOpIn demand.Demand
switch op := op.(type) {
case *dag.FileScan:
demandOpIn = demand.None()
case *dag.Filter:
demandOpIn = demand.Union(
// Everything that downstream operations need.
demandOpOut,
// Everything that affects the outcome of this filter.
inferDemandExprIn(demand.All(), op.Expr),
)
case *dag.Summarize:
demandOpIn = demand.None()
// TODO If LHS not in demandOut, we can ignore RHS
for _, assignment := range op.Keys {
demandOpIn = demand.Union(demandOpIn, inferDemandExprIn(demand.All(), assignment.RHS))
}
for _, assignment := range op.Aggs {
demandOpIn = demand.Union(demandOpIn, inferDemandExprIn(demand.All(), assignment.RHS))
}
case *dag.Yield:
demandOpIn = demand.None()
for _, expr := range op.Exprs {
demandOpIn = demand.Union(demandOpIn, inferDemandExprIn(demandOpOut, expr))
}
default:
// Conservatively assume that `op` uses it's entire input, regardless of output demand.
_ = op
demandOpIn = demand.All()
}
demandOpOut = demandOpIn
}
}

func inferDemandExprIn(demandExprOut demand.Demand, expr dag.Expr) demand.Demand {
if demand.IsNone(demandExprOut) {
return demandExprOut
func inferDemandExprIn(demandOut demand.Demand, expr dag.Expr) demand.Demand {
if demand.IsNone(demandOut) {
return demand.None()
}
if expr == nil {
return demand.None()
}
var demandIn demand.Demand
switch expr := expr.(type) {
case *dag.Agg:
// Since we don't know how the expr.Name will transform the inputs, we have to assume demand.All.
demandIn = demand.Union(
inferDemandExprIn(demand.All(), expr.Expr),
inferDemandExprIn(demand.All(), expr.Where),
)
case *dag.BinaryExpr:
// Since we don't know how the expr.Op will transform the inputs, we have to assume demand.All.
return demand.Union(
demandIn = demand.Union(
inferDemandExprIn(demand.All(), expr.LHS),
inferDemandExprIn(demand.All(), expr.RHS),
)
case *dag.Dot:
return demand.Key(expr.RHS, inferDemandExprIn(demandExprOut, expr.LHS))
demandIn = demand.Key(expr.RHS, inferDemandExprIn(demandOut, expr.LHS))
case *dag.Literal:
return demand.None()
demandIn = demand.None()
case *dag.MapExpr:
demandExprIn := demand.None()
demandIn = demand.None()
for _, entry := range expr.Entries {
demandExprIn = demand.Union(demandExprIn, inferDemandExprIn(demand.All(), entry.Key))
demandExprIn = demand.Union(demandExprIn, inferDemandExprIn(demand.All(), entry.Value))
demandIn = demand.Union(demandIn, inferDemandExprIn(demand.All(), entry.Key))
demandIn = demand.Union(demandIn, inferDemandExprIn(demand.All(), entry.Value))
}
return demandExprIn
case *dag.RecordExpr:
demandExprIn := demand.None()
demandIn = demand.None()
for _, elem := range expr.Elems {
switch elem := elem.(type) {
case *dag.Field:
demandValueOut := demand.GetKey(demandExprOut, elem.Name)
demandValueOut := demand.GetKey(demandOut, elem.Name)
if !demand.IsNone(demandValueOut) {
demandExprIn = demand.Union(demandExprIn, inferDemandExprIn(demandValueOut, elem.Value))
demandIn = demand.Union(demandIn, inferDemandExprIn(demandValueOut, elem.Value))
}
case *dag.Spread:
demandExprIn = demand.Union(demandExprIn, inferDemandExprIn(demand.All(), elem.Expr))
demandIn = demand.Union(demandIn, inferDemandExprIn(demand.All(), elem.Expr))
}
}
return demandExprIn
case *dag.This:
demandExprIn := demandExprOut
demandIn = demandOut
for i := len(expr.Path) - 1; i >= 0; i-- {
demandExprIn = demand.Key(expr.Path[i], demandExprIn)
demandIn = demand.Key(expr.Path[i], demandIn)
}
return demandExprIn
default:
// Conservatively assume that `expr` uses it's entire input, regardless of output demand.
return demand.All()
demandIn = demand.All()
}
return demandIn
}
3 changes: 3 additions & 0 deletions compiler/optimizer/demand/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type all struct{}
type keys map[string]Demand // No empty values.

func IsValid(demand Demand) bool {
if demand == nil {
return false
}
switch demand := demand.(type) {
case all:
return true
Expand Down
82 changes: 54 additions & 28 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
Expand Down Expand Up @@ -136,6 +137,7 @@ func (o *Optimizer) Optimize(seq dag.Seq) (dag.Seq, error) {
if err != nil {
return nil, err
}
seq = insertDemand(seq)
seq = removePassOps(seq)
return seq, nil
}
Expand Down Expand Up @@ -203,36 +205,60 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
filter, chain := matchFilter(chain)
switch op := seq[0].(type) {
case *dag.PoolScan:
// Here we transform a PoolScan into a Lister followed by one or more chains
// of slicers and sequence scanners. We'll eventually choose other configurations
// here based on metadata and availability of VNG.
lister := &dag.Lister{
Kind: "Lister",
Pool: op.ID,
Commit: op.Commit,
}
// Check to see if we can add a range pruner when the pool key is used
// in a normal filtering operation.
sortKey, err := o.sortKeyOfSource(op)
if err != nil {
return nil, err
}
lister.KeyPruner = maybeNewRangePruner(filter, sortKey)
seq = dag.Seq{lister}
_, _, orderRequired, _, err := o.concurrentPath(chain, sortKey)
if err != nil {
return nil, err
}
if orderRequired {
seq = append(seq, &dag.Slicer{Kind: "Slicer"})
seq = dag.Seq{}
if os.Getenv("ZED_USE_VECTOR") != "" {
// TODO Decide whether to use vectors based on whether zng files exist and whether sorting is needed.
seq = dag.Seq{
&dag.VecLister{
Kind: "VecLister",
Pool: op.ID,
Commit: op.Commit,
},
&dag.VecSeqScan{
Kind: "VecSeqScan",
Pool: op.ID,
},
}
if filter != nil {
// TODO Push filter into VecLister and VecSeqScan where possible.
seq = append(seq,
&dag.Filter{
Kind: "Filter",
Expr: filter,
})
}
} else {
// Here we transform a PoolScan into a Lister followed by one or more chains
// of slicers and sequence scanners.
lister := &dag.Lister{
Kind: "Lister",
Pool: op.ID,
Commit: op.Commit,
}
// Check to see if we can add a range pruner when the pool key is used
// in a normal filtering operation.
sortKey, err := o.sortKeyOfSource(op)
if err != nil {
return nil, err
}
lister.KeyPruner = maybeNewRangePruner(filter, sortKey)
seq = dag.Seq{lister}
_, _, orderRequired, _, err := o.concurrentPath(chain, sortKey)
if err != nil {
return nil, err
}
if orderRequired {
seq = append(seq, &dag.Slicer{Kind: "Slicer"})
}
seq = append(seq, &dag.SeqScan{
Kind: "SeqScan",
Pool: op.ID,
Filter: filter,
KeyPruner: lister.KeyPruner,
})
}
seq = append(seq, &dag.SeqScan{
Kind: "SeqScan",
Pool: op.ID,
Filter: filter,
KeyPruner: lister.KeyPruner,
})
seq = append(seq, chain...)

case *dag.FileScan:
op.Filter = filter
seq = append(dag.Seq{op}, chain...)
Expand Down
4 changes: 4 additions & 0 deletions lake/commits/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (p *Patch) SelectAllIndexes() []*index.Object {
return append(p.base.SelectAllIndexes(), p.diff.SelectAllIndexes()...)
}

func (p *Patch) SelectAllVectors() []ksuid.KSUID {
return append(p.base.SelectAllVectors(), p.diff.SelectAllVectors()...)
}

func (p *Patch) DataObjects() []ksuid.KSUID {
var ids []ksuid.KSUID
for _, dataObject := range p.diff.SelectAll() {
Expand Down
6 changes: 6 additions & 0 deletions lake/commits/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commits
import (
"errors"
"fmt"
"golang.org/x/exp/maps"
"io"

"github.com/brimdata/zed/lake/data"
Expand All @@ -25,6 +26,7 @@ type View interface {
SelectAll() DataObjects
SelectIndexes(extent.Span, order.Which) []*index.Object
SelectAllIndexes() []*index.Object
SelectAllVectors() []ksuid.KSUID
}

type Writeable interface {
Expand Down Expand Up @@ -188,6 +190,10 @@ func (s *Snapshot) SelectAllIndexes() []*index.Object {
return s.indexes.All()
}

func (s *Snapshot) SelectAllVectors() []ksuid.KSUID {
return maps.Keys(s.vectors)
}

func (s *Snapshot) Unindexed(rules []index.Rule) map[ksuid.KSUID][]index.Rule {
unindexed := make(map[ksuid.KSUID][]index.Rule)
for id := range s.objects {
Expand Down
3 changes: 1 addition & 2 deletions runtime/op/combine/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type Op struct {
nblocked int
}

func New(octx *op.Context, parents []zbuf.Puller) *Op {
ctx := octx.Context
func New(ctx context.Context, parents []zbuf.Puller) *Op {
queue := make(chan *puller, len(parents))
pullers := make([]*puller, 0, len(parents))
waitCh := make(chan struct{})
Expand Down
4 changes: 2 additions & 2 deletions runtime/op/meta/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (d *Deleter) nextDeletion() (zbuf.Puller, error) {
}
// Use a no-op progress so stats are not inflated.
var progress zbuf.Progress
scanner, object, err := newScanner(d.octx.Context, d.octx.Zctx, d.pool, d.unmarshaler, d.pruner, d.filter, &progress, &vals[0])
scanner, object, err := newSequenceScanner(d.octx.Context, d.octx.Zctx, d.pool, d.unmarshaler, d.pruner, d.filter, &progress, &vals[0])
if err != nil {
return nil, err
}
Expand All @@ -104,7 +104,7 @@ func (d *Deleter) nextDeletion() (zbuf.Puller, error) {
}

func (d *Deleter) hasDeletes(val *zed.Value) (bool, error) {
scanner, object, err := newScanner(d.octx.Context, d.octx.Zctx, d.pool, d.unmarshaler, d.pruner, d.filter, d.progress, val)
scanner, object, err := newSequenceScanner(d.octx.Context, d.octx.Zctx, d.pool, d.unmarshaler, d.pruner, d.filter, d.progress, val)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit b2abf39

Please sign in to comment.