Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DONT MERGE. Vam. #4823

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions cmd/zed/dev/vcache/agg/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package agg

import (
"errors"
"flag"

"github.com/brimdata/zed/cli/outputflags"
devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

var Agg = &charm.Spec{
Name: "agg",
Usage: "agg [flags] field[,field...] path",
Short: "read a VNG file and run an aggregate as a test",
Long: `
The project command reads VNG vectors from
a VNG storage objects (local files or s3 objects) and outputs
the reconstructed ZNG row data as an aggregate function.

This command is most useful for testing the VNG vector cache.
`,
New: newCommand,
}

func init() {
devvcache.Cmd.Add(Agg)
}

type Command struct {
*root.Command
outputFlags outputflags.Flags
}

func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*root.Command)}
c.outputFlags.SetFlags(f)
return c, nil
}

func (c *Command) Run(args []string) error {
ctx, cleanup, err := c.Init(&c.outputFlags)
if err != nil {
return err
}
defer cleanup()
if len(args) != 2 {
//XXX
return errors.New("VNG read: must be run with a single path argument followed by one or more fields")
}
uri, err := storage.ParseURI(args[0])
if err != nil {
return err
}
field := args[1]
local := storage.NewLocalEngine()
cache := vcache.NewCache(local)
object, err := cache.Fetch(ctx, uri, ksuid.Nil)
if err != nil {
return err
}
defer object.Close()
//XXX nil puller
agg := vam.NewCountByString(object.LocalContext(), nil, field)
writer, err := c.outputFlags.Open(ctx, local)
if err != nil {
return err
}
if err := zio.Copy(writer, zbuf.PullerReader(agg)); err != nil {
writer.Close()
return err
}
return writer.Close()
}
11 changes: 6 additions & 5 deletions cmd/zed/dev/vcache/copy/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -67,9 +66,11 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
/*
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
*/
return writer.Close()
}
3 changes: 2 additions & 1 deletion cmd/zed/dev/vcache/project/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Command) Run(args []string) error {
return err
}
defer object.Close()
projection, err := object.NewProjection(fields)
projection, err := vam.NewProjection(object, fields)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/zed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile"
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile/create"
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile/lookup"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/agg"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/copy"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/project"
"github.com/brimdata/zed/cmd/zed/drop"
Expand Down
6 changes: 6 additions & 0 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ type (
Kind string `json:"kind" unpack:""`
Cflag bool `json:"cflag"`
}
VecScan struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Paths [][]string `json:"paths"` //XXX
}
Yield struct {
Kind string `json:"kind" unpack:""`
Exprs []Expr `json:"exprs"`
Expand Down Expand Up @@ -293,6 +298,7 @@ func (*Merge) OpNode() {}
func (*Combine) OpNode() {}
func (*Scope) OpNode() {}
func (*Load) OpNode() {}
func (*VecScan) OpNode() {}

// NewFilter returns a filter node for e.
func NewFilter(e Expr) *Filter {
Expand Down
4 changes: 4 additions & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ var unpacker = unpack.New(
UnaryExpr{},
Uniq{},
Var{},
VecScan{},
VectorValue{},
Yield{},
//XXX
CountByStringHack{},
SumHack{},
)

// UnmarshalOp transforms a JSON representation of an operator into an Op.
Expand Down
21 changes: 21 additions & 0 deletions compiler/ast/dag/vop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dag

type Vop interface {
vopNode()
}

type CountByStringHack struct {
Kind string `json:"kind" unpack:""`
Field string `json:"field"`
}

func (*CountByStringHack) vopNode() {}
func (*CountByStringHack) OpNode() {}

type SumHack struct {
Kind string `json:"kind" unpack:""`
Field string `json:"field"`
}

func (*SumHack) vopNode() {}
func (*SumHack) OpNode() {}
24 changes: 24 additions & 0 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/brimdata/zed/runtime/op/traverse"
"github.com/brimdata/zed/runtime/op/uniq"
"github.com/brimdata/zed/runtime/op/yield"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zson"
Expand Down Expand Up @@ -325,6 +326,29 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return meta.NewDeleter(b.octx, parent, pool, filter, pruner, b.progress, b.deletes), nil
case *dag.Load:
return load.New(b.octx, b.source.Lake(), parent, v.Pool, v.Branch, v.Author, v.Message, v.Meta), nil
case *dag.VecScan:
pool, err := b.lookupPool(v.Pool)
if err != nil {
return nil, err
}
var paths []field.Path
for _, s := range v.Paths {
paths = append(paths, s)
}
//XXX check VectorCache not nil
return vam.NewVecScanner(b.octx, b.source.Lake().VectorCache(), parent, pool, paths, nil, nil), nil
case *dag.CountByStringHack:
puller, ok := parent.(vam.Puller)
if !ok {
return nil, errors.New("CountByStringHack parent not a vam.Puller") //XXX
}
return vam.NewCountByString(b.octx.Zctx, puller, v.Field), nil
case *dag.SumHack:
puller, ok := parent.(vam.Puller)
if !ok {
return nil, errors.New("CountByStringHack parent not a vam.Puller") //XXX
}
return vam.NewSum(b.octx.Zctx, puller, v.Field), nil
default:
return nil, fmt.Errorf("unknown DAG operator type: %v", v)
}
Expand Down
22 changes: 22 additions & 0 deletions compiler/optimizer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ func (o *Optimizer) parallelizeScan(ops []dag.Op, replicas int) ([]dag.Op, error
}

func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, ops []dag.Op, replicas int) ([]dag.Op, error) {
if vscan := hackCountByString(scan, ops); vscan != nil {
scatter := &dag.Scatter{
Kind: "Scatter",
Paths: make([]dag.Seq, replicas),
}
for k := 0; k < replicas; k++ {
scatter.Paths[k] = copyOps(vscan[0:2])
}
combine := &dag.Combine{Kind: "Combine"}
return []dag.Op{scatter, combine, vscan[2]}, nil
}
if vscan := hackSum(scan, ops); vscan != nil {
scatter := &dag.Scatter{
Kind: "Scatter",
Paths: make([]dag.Seq, replicas),
}
for k := 0; k < replicas; k++ {
scatter.Paths[k] = copyOps(vscan[0:2])
}
combine := &dag.Combine{Kind: "Combine"}
return []dag.Op{scatter, combine, vscan[2], vscan[3]}, nil
}
if len(ops) == 1 && scan.Filter == nil {
// We don't try to parallelize the path if it's simply scanning and does no
// other work. We might want to revisit this down the road if
Expand Down
137 changes: 137 additions & 0 deletions compiler/optimizer/vam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package optimizer

import (
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/pkg/field"
)

func hackCountByString(scan *dag.SeqScan, ops []dag.Op) []dag.Op {
if len(ops) != 2 {
return nil
}
summarize, ok := ops[1].(*dag.Summarize)
if !ok {
return nil
}
if len(summarize.Aggs) != 1 {
return nil
}
if ok := isCount(summarize.Aggs[0]); !ok {
return nil
}
field, ok := isSingleField(summarize.Keys[0])
if !ok {
return nil
}
return []dag.Op{
&dag.VecScan{
Kind: "VecScan",
Pool: scan.Pool,
Paths: [][]string{{field}},
},
&dag.CountByStringHack{
Kind: "CountByStringHack",
Field: field,
},
&dag.Summarize{
Kind: "Summarize",
Keys: []dag.Assignment{{
Kind: "Assignment",
LHS: &dag.This{Kind: "This", Path: []string{field}},
RHS: &dag.This{Kind: "This", Path: []string{field}},
}},
Aggs: []dag.Assignment{{
Kind: "Assignment",
LHS: &dag.This{Kind: "This", Path: []string{"count"}},
RHS: &dag.Agg{
Kind: "Agg",
Name: "count",
},
}},
PartialsIn: true,
},
}
}

func isCount(a dag.Assignment) bool {
this, ok := a.LHS.(*dag.This)
if !ok || len(this.Path) != 1 || this.Path[0] != "count" {
return false
}
agg, ok := a.RHS.(*dag.Agg)
return ok && agg.Name == "count" && agg.Expr == nil && agg.Where == nil
}

func isSum(a dag.Assignment) (field.Path, bool) {
this, ok := a.LHS.(*dag.This)
if !ok || len(this.Path) != 1 || this.Path[0] != "sum" {
return nil, false
}
agg, ok := a.RHS.(*dag.Agg)
if ok && agg.Name == "sum" && agg.Where == nil {
return isThis(agg.Expr)
}
return nil, false
}

func isSingleField(a dag.Assignment) (string, bool) {
lhs := fieldOf(a.LHS)
rhs := fieldOf(a.RHS)
if len(lhs) != 1 || len(rhs) != 1 || !lhs.Equal(rhs) {
return "", false
}
return lhs[0], true
}

func isThis(e dag.Expr) (field.Path, bool) {
if this, ok := e.(*dag.This); ok && len(this.Path) >= 1 {
return this.Path, true
}
return nil, false
}

func hackSum(scan *dag.SeqScan, ops []dag.Op) []dag.Op {
if len(ops) != 3 {
return nil
}
summarize, ok := ops[1].(*dag.Summarize)
if !ok {
return nil
}
if len(summarize.Aggs) != 1 {
return nil
}
if len(summarize.Keys) != 0 {
return nil
}
path, ok := isSum(summarize.Aggs[0])
if !ok {
return nil
}
field := path[len(path)-1] //XXX
return []dag.Op{
&dag.VecScan{
Kind: "VecScan",
Pool: scan.Pool,
Paths: [][]string{path},
},
&dag.SumHack{
Kind: "SumHack",
Field: field, //XXX
},
&dag.Summarize{
Kind: "Summarize",
Aggs: []dag.Assignment{{
Kind: "Assignment",
LHS: &dag.This{Kind: "This", Path: []string{"sum"}},
RHS: &dag.Agg{
Kind: "Agg",
Name: "sum",
Expr: &dag.This{Kind: "This", Path: []string{field}},
},
}},
PartialsIn: true,
},
ops[2],
}
}
Loading
Loading