diff --git a/cmd/zed/dev/vcache/agg/command.go b/cmd/zed/dev/vcache/agg/command.go new file mode 100644 index 0000000000..1d0752d5f1 --- /dev/null +++ b/cmd/zed/dev/vcache/agg/command.go @@ -0,0 +1,81 @@ +package agg + +import ( + "errors" + "flag" + + "github.com/brimdata/zed/cli/outputflags" + devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache" + "github.com/brimdata/zed/cmd/zed/root" + "github.com/brimdata/zed/pkg/charm" + "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/runtime/vam" + "github.com/brimdata/zed/runtime/vcache" + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zio" + "github.com/segmentio/ksuid" +) + +var Agg = &charm.Spec{ + Name: "agg", + Usage: "agg [flags] field[,field...] path", + Short: "read a VNG file and run an aggregate as a test", + Long: ` +The project command reads VNG vectors from +a VNG storage objects (local files or s3 objects) and outputs +the reconstructed ZNG row data as an aggregate function. + +This command is most useful for testing the VNG vector cache. +`, + New: newCommand, +} + +func init() { + devvcache.Cmd.Add(Agg) +} + +type Command struct { + *root.Command + outputFlags outputflags.Flags +} + +func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { + c := &Command{Command: parent.(*root.Command)} + c.outputFlags.SetFlags(f) + return c, nil +} + +func (c *Command) Run(args []string) error { + ctx, cleanup, err := c.Init(&c.outputFlags) + if err != nil { + return err + } + defer cleanup() + if len(args) != 2 { + //XXX + return errors.New("VNG read: must be run with a single path argument followed by one or more fields") + } + uri, err := storage.ParseURI(args[0]) + if err != nil { + return err + } + field := args[1] + local := storage.NewLocalEngine() + cache := vcache.NewCache(local) + object, err := cache.Fetch(ctx, uri, ksuid.Nil) + if err != nil { + return err + } + defer object.Close() + //XXX nil puller + agg := vam.NewCountByString(object.LocalContext(), nil, field) + writer, err := c.outputFlags.Open(ctx, local) + if err != nil { + return err + } + if err := zio.Copy(writer, zbuf.PullerReader(agg)); err != nil { + writer.Close() + return err + } + return writer.Close() +} diff --git a/cmd/zed/dev/vcache/copy/command.go b/cmd/zed/dev/vcache/copy/command.go index 3c1c7960f9..9d65d99a54 100644 --- a/cmd/zed/dev/vcache/copy/command.go +++ b/cmd/zed/dev/vcache/copy/command.go @@ -10,7 +10,6 @@ import ( "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/storage" "github.com/brimdata/zed/runtime/vcache" - "github.com/brimdata/zed/zio" "github.com/segmentio/ksuid" ) @@ -67,9 +66,11 @@ func (c *Command) Run(args []string) error { if err != nil { return err } - if err := zio.Copy(writer, object.NewReader()); err != nil { - writer.Close() - return err - } + /* + if err := zio.Copy(writer, object.NewReader()); err != nil { + writer.Close() + return err + } + */ return writer.Close() } diff --git a/cmd/zed/dev/vcache/project/command.go b/cmd/zed/dev/vcache/project/command.go index 6c41fbd90e..224f65f38c 100644 --- a/cmd/zed/dev/vcache/project/command.go +++ b/cmd/zed/dev/vcache/project/command.go @@ -9,6 +9,7 @@ import ( "github.com/brimdata/zed/cmd/zed/root" "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/runtime/vam" "github.com/brimdata/zed/runtime/vcache" "github.com/brimdata/zed/zio" "github.com/segmentio/ksuid" @@ -64,7 +65,7 @@ func (c *Command) Run(args []string) error { return err } defer object.Close() - projection, err := object.NewProjection(fields) + projection, err := vam.NewProjection(object, fields) if err != nil { return err } diff --git a/cmd/zed/main.go b/cmd/zed/main.go index c62c372413..c51f1ba481 100644 --- a/cmd/zed/main.go +++ b/cmd/zed/main.go @@ -18,6 +18,7 @@ import ( _ "github.com/brimdata/zed/cmd/zed/dev/indexfile" _ "github.com/brimdata/zed/cmd/zed/dev/indexfile/create" _ "github.com/brimdata/zed/cmd/zed/dev/indexfile/lookup" + _ "github.com/brimdata/zed/cmd/zed/dev/vcache/agg" _ "github.com/brimdata/zed/cmd/zed/dev/vcache/copy" _ "github.com/brimdata/zed/cmd/zed/dev/vcache/project" "github.com/brimdata/zed/cmd/zed/drop" diff --git a/compiler/ast/dag/op.go b/compiler/ast/dag/op.go index 42330204c4..e724f45d97 100644 --- a/compiler/ast/dag/op.go +++ b/compiler/ast/dag/op.go @@ -145,6 +145,11 @@ type ( Kind string `json:"kind" unpack:""` Cflag bool `json:"cflag"` } + VecScan struct { + Kind string `json:"kind" unpack:""` + Pool ksuid.KSUID `json:"pool"` + Paths [][]string `json:"paths"` //XXX + } Yield struct { Kind string `json:"kind" unpack:""` Exprs []Expr `json:"exprs"` @@ -293,6 +298,7 @@ func (*Merge) OpNode() {} func (*Combine) OpNode() {} func (*Scope) OpNode() {} func (*Load) OpNode() {} +func (*VecScan) OpNode() {} // NewFilter returns a filter node for e. func NewFilter(e Expr) *Filter { diff --git a/compiler/ast/dag/unpack.go b/compiler/ast/dag/unpack.go index 0a03034870..093e04756c 100644 --- a/compiler/ast/dag/unpack.go +++ b/compiler/ast/dag/unpack.go @@ -59,8 +59,12 @@ var unpacker = unpack.New( UnaryExpr{}, Uniq{}, Var{}, + VecScan{}, VectorValue{}, Yield{}, + //XXX + CountByStringHack{}, + SumHack{}, ) // UnmarshalOp transforms a JSON representation of an operator into an Op. diff --git a/compiler/ast/dag/vop.go b/compiler/ast/dag/vop.go new file mode 100644 index 0000000000..4d05521c03 --- /dev/null +++ b/compiler/ast/dag/vop.go @@ -0,0 +1,21 @@ +package dag + +type Vop interface { + vopNode() +} + +type CountByStringHack struct { + Kind string `json:"kind" unpack:""` + Field string `json:"field"` +} + +func (*CountByStringHack) vopNode() {} +func (*CountByStringHack) OpNode() {} + +type SumHack struct { + Kind string `json:"kind" unpack:""` + Field string `json:"field"` +} + +func (*SumHack) vopNode() {} +func (*SumHack) OpNode() {} diff --git a/compiler/kernel/op.go b/compiler/kernel/op.go index e51e960209..541e96f891 100644 --- a/compiler/kernel/op.go +++ b/compiler/kernel/op.go @@ -35,6 +35,7 @@ import ( "github.com/brimdata/zed/runtime/op/traverse" "github.com/brimdata/zed/runtime/op/uniq" "github.com/brimdata/zed/runtime/op/yield" + "github.com/brimdata/zed/runtime/vam" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zson" @@ -325,6 +326,29 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error) return meta.NewDeleter(b.octx, parent, pool, filter, pruner, b.progress, b.deletes), nil case *dag.Load: return load.New(b.octx, b.source.Lake(), parent, v.Pool, v.Branch, v.Author, v.Message, v.Meta), nil + case *dag.VecScan: + pool, err := b.lookupPool(v.Pool) + if err != nil { + return nil, err + } + var paths []field.Path + for _, s := range v.Paths { + paths = append(paths, s) + } + //XXX check VectorCache not nil + return vam.NewVecScanner(b.octx, b.source.Lake().VectorCache(), parent, pool, paths, nil, nil), nil + case *dag.CountByStringHack: + puller, ok := parent.(vam.Puller) + if !ok { + return nil, errors.New("CountByStringHack parent not a vam.Puller") //XXX + } + return vam.NewCountByString(b.octx.Zctx, puller, v.Field), nil + case *dag.SumHack: + puller, ok := parent.(vam.Puller) + if !ok { + return nil, errors.New("CountByStringHack parent not a vam.Puller") //XXX + } + return vam.NewSum(b.octx.Zctx, puller, v.Field), nil default: return nil, fmt.Errorf("unknown DAG operator type: %v", v) } diff --git a/compiler/optimizer/parallelize.go b/compiler/optimizer/parallelize.go index b1f034fc30..bf74e6416e 100644 --- a/compiler/optimizer/parallelize.go +++ b/compiler/optimizer/parallelize.go @@ -29,6 +29,28 @@ func (o *Optimizer) parallelizeScan(ops []dag.Op, replicas int) ([]dag.Op, error } func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, ops []dag.Op, replicas int) ([]dag.Op, error) { + if vscan := hackCountByString(scan, ops); vscan != nil { + scatter := &dag.Scatter{ + Kind: "Scatter", + Paths: make([]dag.Seq, replicas), + } + for k := 0; k < replicas; k++ { + scatter.Paths[k] = copyOps(vscan[0:2]) + } + combine := &dag.Combine{Kind: "Combine"} + return []dag.Op{scatter, combine, vscan[2]}, nil + } + if vscan := hackSum(scan, ops); vscan != nil { + scatter := &dag.Scatter{ + Kind: "Scatter", + Paths: make([]dag.Seq, replicas), + } + for k := 0; k < replicas; k++ { + scatter.Paths[k] = copyOps(vscan[0:2]) + } + combine := &dag.Combine{Kind: "Combine"} + return []dag.Op{scatter, combine, vscan[2], vscan[3]}, nil + } if len(ops) == 1 && scan.Filter == nil { // We don't try to parallelize the path if it's simply scanning and does no // other work. We might want to revisit this down the road if diff --git a/compiler/optimizer/vam.go b/compiler/optimizer/vam.go new file mode 100644 index 0000000000..959bb2744a --- /dev/null +++ b/compiler/optimizer/vam.go @@ -0,0 +1,137 @@ +package optimizer + +import ( + "github.com/brimdata/zed/compiler/ast/dag" + "github.com/brimdata/zed/pkg/field" +) + +func hackCountByString(scan *dag.SeqScan, ops []dag.Op) []dag.Op { + if len(ops) != 2 { + return nil + } + summarize, ok := ops[1].(*dag.Summarize) + if !ok { + return nil + } + if len(summarize.Aggs) != 1 { + return nil + } + if ok := isCount(summarize.Aggs[0]); !ok { + return nil + } + field, ok := isSingleField(summarize.Keys[0]) + if !ok { + return nil + } + return []dag.Op{ + &dag.VecScan{ + Kind: "VecScan", + Pool: scan.Pool, + Paths: [][]string{{field}}, + }, + &dag.CountByStringHack{ + Kind: "CountByStringHack", + Field: field, + }, + &dag.Summarize{ + Kind: "Summarize", + Keys: []dag.Assignment{{ + Kind: "Assignment", + LHS: &dag.This{Kind: "This", Path: []string{field}}, + RHS: &dag.This{Kind: "This", Path: []string{field}}, + }}, + Aggs: []dag.Assignment{{ + Kind: "Assignment", + LHS: &dag.This{Kind: "This", Path: []string{"count"}}, + RHS: &dag.Agg{ + Kind: "Agg", + Name: "count", + }, + }}, + PartialsIn: true, + }, + } +} + +func isCount(a dag.Assignment) bool { + this, ok := a.LHS.(*dag.This) + if !ok || len(this.Path) != 1 || this.Path[0] != "count" { + return false + } + agg, ok := a.RHS.(*dag.Agg) + return ok && agg.Name == "count" && agg.Expr == nil && agg.Where == nil +} + +func isSum(a dag.Assignment) (field.Path, bool) { + this, ok := a.LHS.(*dag.This) + if !ok || len(this.Path) != 1 || this.Path[0] != "sum" { + return nil, false + } + agg, ok := a.RHS.(*dag.Agg) + if ok && agg.Name == "sum" && agg.Where == nil { + return isThis(agg.Expr) + } + return nil, false +} + +func isSingleField(a dag.Assignment) (string, bool) { + lhs := fieldOf(a.LHS) + rhs := fieldOf(a.RHS) + if len(lhs) != 1 || len(rhs) != 1 || !lhs.Equal(rhs) { + return "", false + } + return lhs[0], true +} + +func isThis(e dag.Expr) (field.Path, bool) { + if this, ok := e.(*dag.This); ok && len(this.Path) >= 1 { + return this.Path, true + } + return nil, false +} + +func hackSum(scan *dag.SeqScan, ops []dag.Op) []dag.Op { + if len(ops) != 3 { + return nil + } + summarize, ok := ops[1].(*dag.Summarize) + if !ok { + return nil + } + if len(summarize.Aggs) != 1 { + return nil + } + if len(summarize.Keys) != 0 { + return nil + } + path, ok := isSum(summarize.Aggs[0]) + if !ok { + return nil + } + field := path[len(path)-1] //XXX + return []dag.Op{ + &dag.VecScan{ + Kind: "VecScan", + Pool: scan.Pool, + Paths: [][]string{path}, + }, + &dag.SumHack{ + Kind: "SumHack", + Field: field, //XXX + }, + &dag.Summarize{ + Kind: "Summarize", + Aggs: []dag.Assignment{{ + Kind: "Assignment", + LHS: &dag.This{Kind: "This", Path: []string{"sum"}}, + RHS: &dag.Agg{ + Kind: "Agg", + Name: "sum", + Expr: &dag.This{Kind: "This", Path: []string{field}}, + }, + }}, + PartialsIn: true, + }, + ops[2], + } +} diff --git a/lake/root.go b/lake/root.go index 260e0e651e..07741a7d94 100644 --- a/lake/root.go +++ b/lake/root.go @@ -17,6 +17,7 @@ import ( "github.com/brimdata/zed/order" "github.com/brimdata/zed/pkg/storage" "github.com/brimdata/zed/runtime/expr" + "github.com/brimdata/zed/runtime/vcache" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio/zngio" "github.com/brimdata/zed/zngbytes" @@ -49,6 +50,7 @@ type Root struct { poolCache *lru.ARCCache[ksuid.KSUID, *Pool] pools *pools.Store indexRules *index.Store + vCache *vcache.Cache } type LakeMagic struct { @@ -66,6 +68,7 @@ func newRoot(engine storage.Engine, logger *zap.Logger, path *storage.URI) *Root logger: logger, path: path, poolCache: poolCache, + vCache: vcache.NewCache(engine), } } @@ -487,3 +490,7 @@ func (r *Root) BatchifyIndexRules(ctx context.Context, zctx *zed.Context, f expr func (r *Root) Open(context.Context, *zed.Context, string, string, zbuf.Filter) (zbuf.Puller, error) { return nil, errors.New("cannot use 'file' or 'http' source in a lake query") } + +func (r *Root) VectorCache() *vcache.Cache { + return r.vCache +} diff --git a/runtime/vam/agg.go b/runtime/vam/agg.go new file mode 100644 index 0000000000..2effcad281 --- /dev/null +++ b/runtime/vam/agg.go @@ -0,0 +1,158 @@ +package vam + +import ( + "fmt" + + "github.com/brimdata/zed" + "github.com/brimdata/zed/vector" + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed/zson" +) + +//XXX need to make sure vam operator objects are returned to GC as they are finished + +type CountByString struct { + parent Puller + zctx *zed.Context + name string + table countByString + done bool +} + +func NewCountByString(zctx *zed.Context, parent Puller, name string) *CountByString { + return &CountByString{ + parent: parent, + zctx: zctx, + name: name, + table: countByString{table: make(map[string]uint64)}, //XXX + } +} + +func (c *CountByString) Pull(done bool) (zbuf.Batch, error) { + if done { + _, err := c.parent.PullVec(done) + return nil, err + } + if c.done { + return nil, nil + } + for { + //XXX check context Done + vec, err := c.parent.PullVec(false) + if err != nil { + return nil, err + } + if vec == nil { + c.done = true + return c.table.materialize(c.zctx, c.name), nil + } + if vec, ok := vec.(*vector.String); ok { + c.table.count(vec) + continue + } + //xxx + fmt.Println("vector.CountByString: bad vec", zson.String(vec.Type())) + } +} + +/* +// XXX Let's use Pull() here... read whole column into Batch for better perf +func (c *CountByString) AsReader() zio.Reader { + cbs := countByString{make(map[string]uint64)} + for _, vec := range c.vecs { + cbs.count(vec) + } + return cbs.materialize(c.zctx, c.name) +} +*/ + +type countByString struct { + table map[string]uint64 +} + +func (c *countByString) count(vec *vector.String) { + for _, s := range vec.Values { + c.table[s] += 1 + } +} + +func (c *countByString) materialize(zctx *zed.Context, name string) *zbuf.Array { + typ := zctx.MustLookupTypeRecord([]zed.Field{ + {Type: zed.TypeString, Name: name}, + {Type: zed.TypeUint64, Name: "count"}, + }) + var b zcode.Builder + vals := make([]zed.Value, len(c.table)) + var off int + for key, count := range c.table { + b.Reset() + b.BeginContainer() + b.Append(zed.EncodeString(key)) + b.Append(zed.EncodeUint(count)) + b.EndContainer() + vals[off] = *zed.NewValue(typ, b.Bytes().Body()) + off++ + } + return zbuf.NewArray(vals) +} + +type Sum struct { + parent Puller + zctx *zed.Context + name string + sum int64 + done bool +} + +func NewSum(zctx *zed.Context, parent Puller, name string) *Sum { + return &Sum{ + parent: parent, + zctx: zctx, + name: name, + } +} + +func (c *Sum) Pull(done bool) (zbuf.Batch, error) { + if done { + _, err := c.parent.PullVec(done) + return nil, err + } + if c.done { + return nil, nil + } + for { + //XXX check context Done + // XXX PullVec returns a single vector and enumerates through the + // different underlying types that match a particular projection + vec, err := c.parent.PullVec(false) + if err != nil { + return nil, err + } + if vec == nil { + c.done = true + return c.materialize(c.zctx, c.name), nil + } + if vec, ok := vec.(*vector.Int); ok { + for _, x := range vec.Values { + c.sum += x + } + } + if vec, ok := vec.(*vector.Uint); ok { + for _, x := range vec.Values { + c.sum += int64(x) + } + } + } +} + +func (c *Sum) materialize(zctx *zed.Context, name string) *zbuf.Array { + typ := zctx.MustLookupTypeRecord([]zed.Field{ + {Type: zed.TypeInt64, Name: "sum"}, + }) + var b zcode.Builder + b.BeginContainer() + b.Append(zed.EncodeInt(c.sum)) + b.EndContainer() + return zbuf.NewArray([]zed.Value{*zed.NewValue(typ, b.Bytes().Body())}) +} diff --git a/runtime/vam/index.go b/runtime/vam/index.go new file mode 100644 index 0000000000..f96d4b932f --- /dev/null +++ b/runtime/vam/index.go @@ -0,0 +1,30 @@ +package vam + +// XXX for now this is a list of slots, but it probably should be a roaring bitmap +type Index []int32 + +func (i Index) And(with Index) Index { + var head, tail, from int + for { + for i[tail] < with[from] { + tail++ + if tail >= len(i) { + break + } + } + if i[tail] == with[from] { + i[head] = i[tail] + head++ + } else { + from++ + if from >= len(with) { + break + } + } + } + return i[:head] +} + +func (i Index) Or(with Index) Index { + panic("Index.Or TBD") +} diff --git a/runtime/vam/materialize.go b/runtime/vam/materialize.go new file mode 100644 index 0000000000..f8368d7494 --- /dev/null +++ b/runtime/vam/materialize.go @@ -0,0 +1,73 @@ +package vam + +/* no slots +func newIntBuilderIndexed(vec *vector.Int, index Index) builder { + slots := vec.Slots + vals := vec.Vals + nulls := vec.Nulls + var voff, ioff int + return func(b *zcode.Builder) bool { + for voff < len(index) && ioff < len(vals) { + if slots[voff] < index[ioff] { + voff++ + continue + } + if slots[voff] > index[ioff] { + ioff++ + } + if !nulls.Has(uint32(voff)) { + b.Append(zed.EncodeInt(vals[voff])) + } else { + b.Append(nil) + } + return true + + } + return false + } +} +*/ + +/* no slots +func newUintBuilderIndexed(vec *vector.Uint, index Index) builder { + slots := vec.Slots + vals := vec.Vals + var voff, ioff int + return func(b *zcode.Builder) bool { + for voff < len(index) && ioff < len(vals) { + if slots[voff] < index[ioff] { + voff++ + continue + } + if slots[voff] > index[ioff] { + ioff++ + } + b.Append(zed.EncodeUint(vals[voff])) + return true + } + return false + } +} +*/ + +/* no slots +func newStringBuilderIndexed(vec *vector.String, index Index) builder { + slots := vec.Slots + vals := vec.Vals + var voff, ioff int + return func(b *zcode.Builder) bool { + for voff < len(index) && ioff < len(vals) { + if slots[voff] < index[ioff] { + voff++ + continue + } + if slots[voff] > index[ioff] { + ioff++ + } + b.Append(zed.EncodeString(vals[voff])) + return true + } + return false + } +} +*/ diff --git a/runtime/vam/projection.go b/runtime/vam/projection.go new file mode 100644 index 0000000000..a7c872e95c --- /dev/null +++ b/runtime/vam/projection.go @@ -0,0 +1,133 @@ +package vam + +import ( + "fmt" + "strings" + "sync" + + "github.com/brimdata/zed" + "github.com/brimdata/zed/runtime/vcache" + "github.com/brimdata/zed/vector" + "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed/zio" + "golang.org/x/sync/errgroup" +) + +type Projection struct { + object *vcache.Object + typeKeys []int32 + projectors []projector + builder zcode.Builder + off int +} + +// One projector per top-level type +type projector struct { + recType *zed.TypeRecord + build vector.Builder +} + +var _ zio.Reader = (*Projection)(nil) + +func NewProjection(o *vcache.Object, names []string) (*Projection, error) { + //XXX just handles top-level names for now, fix this to build records + // with "as field.Path"... use compiler to compile a cut proc. + var group errgroup.Group + projectors := make([]projector, len(o.Types())) + //XXX need concurrency over the typekeys too + // For each type, we create a record vector comprising each of the fields + // that are present in that type (or we skip the type if no such fields are + // found). When we encounter a partial projection, we fill the missing fields + // with a const vector of error("missing"). Then, if we have no matching fields + // in any of the types we return an error; if we have one matching type, we + // use the builder on the corresponding record vector; if we have more than one, + // we create a union vector and map the type keys from the vector object to + // the tags of the computed union. + for typeKey := range o.Types() { + typeKey := uint32(typeKey) + //XXX instead of doing this we should just make vector.Records that + // represent the projection and call NewBuilder on that. We still have + // to load the underlying fields. + vecs := make([]vector.Any, len(names)) + var mu sync.Mutex + for pos, name := range names { + pos := pos + name := name + group.Go(func() error { + vec, err := o.Load(typeKey, []string{name}) //XXX need full path + if err != nil { + return err + } + mu.Lock() + vecs[pos] = vec + mu.Unlock() + return nil + }) + } + if err := group.Wait(); err != nil { + return nil, err + } + var fields []zed.Field + for k, vec := range vecs { + if vec != nil { + fields = append(fields, zed.Field{Type: vec.Type(), Name: names[k]}) + } + } + if len(fields) == 0 { + continue + } + recType, err := o.LocalContext().LookupTypeRecord(fields) + if err != nil { + return nil, err + } + projectors[typeKey] = projector{ + recType: recType, + build: func(b *zcode.Builder) bool { + b.BeginContainer() + for _, materialize := range packed { + if ok := materialize(b); !ok { + return false + } + } + b.EndContainer() + return true + }, + } + } + empty := true + for k := 0; k < len(projectors); k++ { + if projectors[k].build != nil { + empty = false + break + } + } + if empty { + return nil, fmt.Errorf("none of the specified fields were found: %s", strings.Join(names, ", ")) + } + return &Projection{ + object: o, + /* XXX this is Jamie's UnionVector idea though + it's not quite the same as a zed union */ + typeKeys: o.TypeKeys(), + projectors: projectors, + }, nil +} + +// XXX Let's use Pull() here... read whole column into Batch for better perf +func (p *Projection) Read() (*zed.Value, error) { + for { + if p.off >= len(p.typeKeys) { + return nil, nil + } + typeKey := p.typeKeys[p.off] + p.off++ + projector := p.projectors[typeKey] + if projector.build != nil { + p.builder.Truncate() + if ok := projector.build(&p.builder); !ok { + return nil, nil + } + return zed.NewValue(projector.recType, p.builder.Bytes().Body()), nil + } + } +} diff --git a/runtime/vam/scan.go b/runtime/vam/scan.go new file mode 100644 index 0000000000..32e74b31d4 --- /dev/null +++ b/runtime/vam/scan.go @@ -0,0 +1,214 @@ +package vam + +import ( + "errors" + "fmt" + "sync" + + "github.com/brimdata/zed" + "github.com/brimdata/zed/lake" + "github.com/brimdata/zed/lake/data" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/runtime/expr" + "github.com/brimdata/zed/runtime/op" + "github.com/brimdata/zed/runtime/vcache" + "github.com/brimdata/zed/vector" + "github.com/brimdata/zed/zbuf" + "github.com/brimdata/zed/zson" + "golang.org/x/sync/errgroup" +) + +//XXX change PullVec to Pull and lets partition the dag operators into +// vecops and ops and have a materlize op that has a vop parent but it +// an op. + +type Puller interface { + PullVec(done bool) (vector.Any, error) +} + +// XXX need a semaphore pattern here we scanner can run ahead and load objects +// and vectors concurrently but be limited by the semaphore so there is a reasonable +// amount of locality while still being highly parallel. + +// project (pull from downstream) => iterator +// filter+project (pull from downstram) +// agg +// filter+agg->project-partials (pull from group-by) + +type VecScanner struct { + parent zbuf.Puller + pruner expr.Evaluator + octx *op.Context + pool *lake.Pool + once sync.Once + paths []field.Path + cache *vcache.Cache + progress *zbuf.Progress + unmarshaler *zson.UnmarshalZNGContext + resultCh chan result + doneCh chan struct{} +} + +func NewVecScanner(octx *op.Context, cache *vcache.Cache, parent zbuf.Puller, pool *lake.Pool, paths []field.Path, pruner expr.Evaluator, progress *zbuf.Progress) *VecScanner { + return &VecScanner{ + cache: cache, + octx: octx, + parent: parent, + pruner: pruner, + pool: pool, + paths: paths, + progress: progress, + unmarshaler: zson.NewZNGUnmarshaler(), + doneCh: make(chan struct{}), + resultCh: make(chan result), + } +} + +// XXX this is here for the compiler to be ablet o create it as a zbuf.Puller, +// but we will fix the compiler to understand vops and vam/vector.Puller soon. +func (v *VecScanner) Pull(done bool) (zbuf.Batch, error) { + panic("VecScanner.Pull") +} + +// XXX we need vector scannerstats and means to update them here. + +// XXX change this to pull/load vector by each type within an object and +// return an object containing the overall projection, which might be a record +// or could just be a single vector. the downstream operator has to be +// configured to expect it, e.g., project x:=a.b,y:=a.b.c (like cut but in vspace) +// this would be Record{x:(proj a.b),y:(proj:a.b.c)} so the elements would be +// single fields. For each object/type that matches the projection we would make +// a Record vec and let GC reclaim them. Note if a col is missing, it's a constant +// vector of error("missing"). + +func (v *VecScanner) PullVec(done bool) (vector.Any, error) { + v.once.Do(func() { + // Block p.ctx's cancel function until p.run finishes its + // cleanup. + v.octx.WaitGroup.Add(1) + go v.run() + }) + if done { + select { + case v.doneCh <- struct{}{}: + return nil, nil + case <-v.octx.Done(): + return nil, v.octx.Err() + } + } + if r, ok := <-v.resultCh; ok { + return r.vector, r.err + } + return nil, v.octx.Err() +} + +func (v *VecScanner) run() { + defer func() { + v.octx.WaitGroup.Done() + }() + for { + //XXX should make an object puller that wraps this... + batch, err := v.parent.Pull(false) + if batch == nil || err != nil { + v.sendResult(nil, err) + return + } + vals := batch.Values() + if len(vals) != 1 { + // We require exactly one data object per pull. + err := errors.New("system error: VecScanner encountered multi-valued batch") + v.sendResult(nil, err) + return + } + named, ok := vals[0].Type.(*zed.TypeNamed) + if !ok { + v.sendResult(nil, fmt.Errorf("system error: VecScanner encountered unnamed object: %s", zson.String(vals[0]))) + return + } + if named.Name != "data.Object" { + v.sendResult(nil, fmt.Errorf("system error: VecScanner encountered unnamed object: %q", named.Name)) + return + } + var meta data.Object + if err := v.unmarshaler.Unmarshal(&vals[0], &meta); err != nil { + v.sendResult(nil, fmt.Errorf("system error: VecScanner could not unmarshal value: %q", zson.String(vals[0]))) + return + } + object, err := v.cache.Fetch(v.octx.Context, meta.VectorURI(v.pool.DataPath), meta.ID) + if err != nil { + v.sendResult(nil, err) + return + } + if err := v.genVecs(object, v.resultCh); err != nil { + v.sendResult(nil, err) + return + } + } +} + +func (v *VecScanner) sendResult(vec vector.Any, err error) (bool, bool) { + select { + case v.resultCh <- result{vec, err}: + return false, true + case <-v.doneCh: + if vec != nil { + vec.Unref() //XXX add + } + b, pullErr := v.parent.Pull(true) + if err == nil { + err = pullErr + } + if err != nil { + select { + case v.resultCh <- result{err: err}: + return true, false + case <-v.octx.Done(): + return false, false + } + } + if b != nil { + b.Unref() + } + return true, true + case <-v.octx.Done(): + return false, false + } +} + +type result struct { + vector vector.Any + err error +} + +// XXX for each type that has target columns, we return a bundle of the vectors. +// this will usually just be one bundle but for eclectic data, could be +// a bundle per relevant type. Note that each slot has a unique type so the +// the bundles are interleaved but non-overlapping in terms of their output slots. +func (v *VecScanner) genVecs(o *vcache.Object, ch chan result) error { + //XXX we should map the type to a shared context and have a table to + // memoize the per-type lookup so we don't have to spin through every type? + var group errgroup.Group + for typeKey := range o.Types() { + typeKey := uint32(typeKey) + for _, path := range v.paths { + path := path + group.Go(func() error { + vec, err := o.Load(typeKey, path) + //XXX for now ignore error, e.g., need to distinguish between + // missing/ignore and other real errors + if err != nil { + err = nil + } + if vec == nil || err != nil { + return err + } + v.sendResult(vec, nil) + return nil + }) + } + if err := group.Wait(); err != nil { + return err + } + } + return nil +} diff --git a/runtime/vcache/array.go b/runtime/vcache/array.go index 36967c058d..1e8a11e8ad 100644 --- a/runtime/vcache/array.go +++ b/runtime/vcache/array.go @@ -1,54 +1,38 @@ package vcache import ( + "fmt" "io" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" "github.com/brimdata/zed/vng" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" + meta "github.com/brimdata/zed/vng/vector" //XXX rename package ) -type Array struct { - segmap []vector.Segment - values Vector - lengths []int32 -} - -func NewArray(array *vector.Array, r io.ReaderAt) (*Array, error) { - values, err := NewVector(array.Values, r) - if err != nil { - return nil, err - } - return &Array{ - segmap: array.Lengths, - values: values, - }, nil -} - -func (a *Array) NewIter(reader io.ReaderAt) (iterator, error) { - // The lengths vector is typically large and is loaded on demand. - if a.lengths == nil { - lengths, err := vng.ReadIntVector(a.segmap, reader) +func loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array, r io.ReaderAt) (*vector.Array, error) { + if *any == nil { + var innerType zed.Type + switch typ := typ.(type) { + case *zed.TypeArray: + innerType = typ.Type + case *zed.TypeSet: + innerType = typ.Type + default: + return nil, fmt.Errorf("internal error: vcache.loadArray encountered bad type: %s", typ) + } + lengths, err := vng.ReadIntVector(m.Lengths, r) if err != nil { return nil, err } - a.lengths = lengths - } - values, err := a.values.NewIter(reader) - if err != nil { - return nil, err - } - off := 0 - return func(b *zcode.Builder) error { - b.BeginContainer() - len := a.lengths[off] - off++ - for ; len > 0; len-- { - if err := values(b); err != nil { - return err - } + values, err := loadVector(nil, innerType, path, m.Values, r) + if err != nil { + return nil, err } - b.EndContainer() - return nil - }, nil + *any = vector.NewArray(typ.(*zed.TypeArray), lengths, values) + } + //XXX always return the array as the vector engine needs to know how to handle + // manipulating the array no matter what it contains + return (*any).(*vector.Array), nil } diff --git a/runtime/vcache/cache.go b/runtime/vcache/cache.go index 4dfbd04806..6bf9d1ff0b 100644 --- a/runtime/vcache/cache.go +++ b/runtime/vcache/cache.go @@ -2,12 +2,14 @@ package vcache import ( "context" + "sync" "github.com/brimdata/zed/pkg/storage" "github.com/segmentio/ksuid" ) type Cache struct { + mu sync.Mutex engine storage.Engine // objects is currently a simple map but we will turn this into an // LRU cache sometime soon. First step is object-level granularity, though @@ -27,6 +29,10 @@ func NewCache(engine storage.Engine) *Cache { } func (c *Cache) Fetch(ctx context.Context, uri *storage.URI, id ksuid.KSUID) (*Object, error) { + //XXX do we want finer grained mutex? might be ok if lookups always done inside + // a dedicated goroutine + c.mu.Lock() + defer c.mu.Unlock() if object, ok := c.objects[id]; ok { return object, nil } diff --git a/runtime/vcache/map.go b/runtime/vcache/map.go index ac736ae124..7a124554ae 100644 --- a/runtime/vcache/map.go +++ b/runtime/vcache/map.go @@ -1,67 +1,31 @@ package vcache import ( + "fmt" "io" - "github.com/brimdata/zed/vng" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" ) -type Map struct { - segmap []vector.Segment - keys Vector - values Vector - lengths []int32 -} - -func NewMap(m *vector.Map, r io.ReaderAt) (*Map, error) { - keys, err := NewVector(m.Keys, r) - if err != nil { - return nil, err - } - values, err := NewVector(m.Values, r) - if err != nil { - return nil, err - } - return &Map{ - segmap: m.Lengths, - keys: keys, - values: values, - }, nil -} - -func (m *Map) NewIter(reader io.ReaderAt) (iterator, error) { - // The lengths vector is typically large and is loaded on demand. - if m.lengths == nil { - lengths, err := vng.ReadIntVector(m.segmap, reader) +func loadMap(any *vector.Any, typ zed.Type, path field.Path, m *meta.Map, r io.ReaderAt) (*vector.Map, error) { + if *any == nil { + mapType, ok := typ.(*zed.TypeMap) + if !ok { + return nil, fmt.Errorf("internal error: vcache.loadMap encountered bad type: %s", typ) + } + var keys, values vector.Any + _, err := loadVector(&keys, mapType.KeyType, path, m.Keys, r) if err != nil { return nil, err } - m.lengths = lengths - } - keys, err := m.keys.NewIter(reader) - if err != nil { - return nil, err - } - values, err := m.values.NewIter(reader) - if err != nil { - return nil, err - } - off := 0 - return func(b *zcode.Builder) error { - len := m.lengths[off] - off++ - b.BeginContainer() - for ; len > 0; len-- { - if err := keys(b); err != nil { - return err - } - if err := values(b); err != nil { - return err - } + _, err = loadVector(&values, mapType.ValType, path, m.Values, r) + if err != nil { + return nil, err } - b.EndContainer() - return nil - }, nil + *any = vector.NewMap(mapType, keys, values) + } + return (*any).(*vector.Map), nil } diff --git a/runtime/vcache/nulls.go b/runtime/vcache/nulls.go index 432b67a706..9f44e01cf5 100644 --- a/runtime/vcache/nulls.go +++ b/runtime/vcache/nulls.go @@ -3,28 +3,24 @@ package vcache import ( "io" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" ) -type Nulls struct { - // The runs array encodes the run lengths of values and nulls in - // the same fashion as the VNG Nulls vector. - // This data structure provides a nice way to creator an iterator closure - // and (somewhat) efficiently build all the values that comprise a field - // into an zcode.Builder while allowing projections to intermix the calls - // to the iterator. There's probably a better data structure for this - // but this is a prototype for now. - runs []int - values Vector -} - -func NewNulls(nulls *vector.Nulls, values Vector, r io.ReaderAt) (*Nulls, error) { +func loadNulls(any *vector.Any, typ zed.Type, path field.Path, m *meta.Nulls, r io.ReaderAt) (vector.Any, error) { // The runlengths are typically small so we load them with the metadata // and don't bother waiting for a reference. - runlens := vector.NewInt64Reader(nulls.Runs, r) - var runs []int + runlens := meta.NewInt64Reader(m.Runs, r) //XXX 32-bit reader? + var off, nulls uint32 + null := true + //XXX finish this loop... need to remove slots covered by nulls and subtract + // cumulative number of nulls for each surviving value slot. + // In zed, nulls are generally bad and not really needed because we don't + // need super-wide uber schemas with lots of nulls. for { + //XXX need nullslots array to build vector.Nullmask and need a way to pass down Nullmask XXX run, err := runlens.Read() if err != nil { if err == io.EOF { @@ -32,37 +28,12 @@ func NewNulls(nulls *vector.Nulls, values Vector, r io.ReaderAt) (*Nulls, error) } return nil, err } - runs = append(runs, int(run)) - } - return &Nulls{ - runs: runs, - values: values, - }, nil -} - -func (n *Nulls) NewIter(reader io.ReaderAt) (iterator, error) { - null := true - var run, off int - values, err := n.values.NewIter(reader) - if err != nil { - return nil, err - } - return func(b *zcode.Builder) error { - for run == 0 { - if off >= len(n.runs) { - //XXX this shouldn't happen... call panic? - b.Append(nil) - return nil - } - null = !null - run = n.runs[off] - off++ - } - run-- + off += uint32(run) if null { - b.Append(nil) - return nil + nulls += uint32(run) } - return values(b) - }, nil + null = !null + } + //newSlots := slots //XXX need to create this above + return loadVector(any, typ, path, m.Values, r) } diff --git a/runtime/vcache/object.go b/runtime/vcache/object.go index 2fb6274197..e6364986e2 100644 --- a/runtime/vcache/object.go +++ b/runtime/vcache/object.go @@ -3,12 +3,15 @@ package vcache import ( "context" "fmt" + "sync" "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/vector" "github.com/brimdata/zed/vng" + meta "github.com/brimdata/zed/vng/vector" "github.com/segmentio/ksuid" - "golang.org/x/sync/errgroup" ) const MaxTypesPerObject = 2500 @@ -19,6 +22,7 @@ const MaxTypesPerObject = 2500 // we support dynamic loading of vectors as they are needed and data and // metadata are all cached in memory. type Object struct { + mu []sync.Mutex id ksuid.KSUID uri *storage.URI engine storage.Engine @@ -27,20 +31,26 @@ type Object struct { // for each query and we need to map the VNG object context to the query // context. Of course, with Zed, this is very cheap. local *zed.Context - // There is one vector per Zed type and the typeIDs array provides - // the sequence order of each vector to be accessed. When - // ordering doesn't matter, the vectors can be traversed directly - // without an indirection through the typeIDs array. - vectors []Vector - types []zed.Type - typeIDs []int32 + metas []meta.Metadata + // There is one vector per Zed type and the typeKeys array provides + // the sequence order of each vector to be accessed. + vectors []vector.Any + typeDict []zed.Type + typeKeys []int32 + //slots map[int32][]int32 //XXX handle this differently? } // NewObject creates a new in-memory Object corresponding to a VNG object // residing in storage. It loads the list of VNG root types (one per value -// in the file) and the VNG metadata for vector reassembly. This provides -// the metadata needed to load vector chunks on demand only as they are -// referenced. +// in the file) and the VNG metadata for vector reassembly. A table for each +// type is also created to map the global slot number in the object to the local +// slot number in the type so that an element's local position in the vector +// (within a particular type) can be related to its slot number in the object, +// e.g., so that filtering of a local vector can be turned into the list of +// matching object slots. The object provides the metadata needed to load vectors +// on demand only as they are referenced. A vector is loaded by calling its Load method, +// which decodes its zcode.Bytes into its native representation. +// XXX we may want to change the VNG format to code vectors in native format. func NewObject(ctx context.Context, engine storage.Engine, uri *storage.URI, id ksuid.KSUID) (*Object, error) { // XXX currently we open a storage.Reader for every object and never close it. // We should either close after a timeout and reopen when needed or change the @@ -55,12 +65,14 @@ func NewObject(ctx context.Context, engine storage.Engine, uri *storage.URI, id if err != nil { return nil, err } + // XXX use the query's zctx so we don't have to map?, + // or maybe use a single context across all objects in the cache? zctx := zed.NewContext() z, err := vng.NewObject(zctx, reader, size) if err != nil { return nil, err } - typeIDs, metas, err := z.FetchMetadata() + typeKeys, metas, err := z.FetchMetadata() if err != nil { return nil, err } @@ -70,36 +82,23 @@ func NewObject(ctx context.Context, engine storage.Engine, uri *storage.URI, id if len(metas) > MaxTypesPerObject { return nil, fmt.Errorf("too many types in VNG object: %s", uri) } - types := make([]zed.Type, 0, len(metas)) + typeDict := make([]zed.Type, 0, len(metas)) for _, meta := range metas { - types = append(types, meta.Type(zctx)) - } - var group errgroup.Group - vectors := make([]Vector, len(metas)) - for k, meta := range metas { - which := k - this := meta - group.Go(func() error { - v, err := NewVector(this, reader) - if err != nil { - return err - } - vectors[which] = v - return nil - }) - } - if err := group.Wait(); err != nil { - return nil, err + typeDict = append(typeDict, meta.Type(zctx)) //XXX commanet about context locality } + vectors := make([]vector.Any, len(metas)) return &Object{ - id: id, - uri: uri, - engine: engine, - reader: reader, - local: zctx, - vectors: vectors, - types: types, - typeIDs: typeIDs, + mu: make([]sync.Mutex, len(typeDict)), + id: id, + uri: uri, + engine: engine, + reader: reader, + local: zctx, + metas: metas, + vectors: vectors, + typeDict: typeDict, + typeKeys: typeKeys, + //slots: slots, }, nil } @@ -110,13 +109,33 @@ func (o *Object) Close() error { return nil } -func (o *Object) NewReader() *Reader { - return &Reader{ - object: o, - iters: make([]iterator, len(o.vectors)), - } +func (o *Object) LocalContext() *zed.Context { + return o.local +} + +func (o *Object) Types() []zed.Type { + return o.typeDict +} + +func (o *Object) TypeKeys() []int32 { + return o.typeKeys +} + +func (o *Object) LookupType(typeKey uint32) zed.Type { + return o.typeDict[typeKey] +} + +func (o *Object) Len() int { + return len(o.typeKeys) } -func (o *Object) NewProjection(fields []string) (*Projection, error) { - return NewProjection(o, fields) +// XXX fix comment +// Due to the heterogenous nature of Zed data, a given path can appear in +// multiple types and a given type can have multiple vectors XXX (due to union +// types in the hiearchy). Load returns a Group for each type and the Group +// may contain multiple vectors. +func (o *Object) Load(typeKey uint32, path field.Path) (vector.Any, error) { + o.mu[typeKey].Lock() + defer o.mu[typeKey].Unlock() + return loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey], o.reader) } diff --git a/runtime/vcache/primitive.go b/runtime/vcache/primitive.go index 0a7c573804..70f413dc5a 100644 --- a/runtime/vcache/primitive.go +++ b/runtime/vcache/primitive.go @@ -1,68 +1,144 @@ package vcache import ( + "fmt" "io" - "github.com/brimdata/zed/vng/vector" + "github.com/brimdata/zed" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" "github.com/brimdata/zed/zcode" ) -type Primitive struct { - meta *vector.Primitive - bytes zcode.Bytes -} - -func NewPrimitive(meta *vector.Primitive) (*Primitive, error) { - return &Primitive{meta: meta}, nil -} - -func (p *Primitive) NewIter(r io.ReaderAt) (iterator, error) { - if p.bytes == nil { - // The VNG primitive columns are stored as one big - // list of Zed values. So we can just read the data in - // all at once, compute the byte offsets of each value - // (for random access, not used yet). - var n int - for _, segment := range p.meta.Segmap { - n += int(segment.MemLength) +func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any, error) { + // The VNG primitive columns are stored as one big + // list of Zed values. So we can just read the data in + // all at once, compute the byte offsets of each value + // (for random access, not used yet). + var n int + for _, segment := range m.Segmap { + n += int(segment.MemLength) + } + bytes := make([]byte, n) + var off int + for _, segment := range m.Segmap { + if err := segment.Read(r, bytes[off:]); err != nil { + return nil, err } - data := make([]byte, n) - var off int - for _, segment := range p.meta.Segmap { - if err := segment.Read(r, data[off:]); err != nil { - return nil, err + off += int(segment.MemLength) + } + switch typ := typ.(type) { + case *zed.TypeOfUint8, *zed.TypeOfUint16, *zed.TypeOfUint32, *zed.TypeOfUint64, *zed.TypeOfTime: + //XXX put valcnt in vng meta and use vector allocator + var vals []uint64 + var nullslots []uint32 + it := zcode.Bytes(bytes).Iter() + for !it.Done() { + val := it.Next() + if val == nil { + nullslots = append(nullslots, uint32(len(vals))) + vals = append(vals, 0) + } else { + vals = append(vals, zed.DecodeUint(val)) } - off += int(segment.MemLength) } - p.bytes = data - } - if dict := p.meta.Dict; dict != nil { - bytes := p.bytes - return func(b *zcode.Builder) error { - pos := bytes[0] - bytes = bytes[1:] - b.Append(dict[pos].Value.Bytes()) - return nil - }, nil + return vector.NewUint(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil + case *zed.TypeOfInt8, *zed.TypeOfInt16, *zed.TypeOfInt32, *zed.TypeOfInt64, *zed.TypeOfDuration: + //XXX put valcnt in vng meta and use vector allocator + var vals []int64 + var nullslots []uint32 + it := zcode.Bytes(bytes).Iter() + for !it.Done() { + val := it.Next() + if val == nil { + nullslots = append(nullslots, uint32(len(vals))) + vals = append(vals, 0) + } else { + vals = append(vals, zed.DecodeInt(val)) + } + } + return vector.NewInt(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil + case *zed.TypeOfFloat16: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfFloat32: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfFloat64: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfBool: + var vals []bool + var nullslots []uint32 + it := zcode.Bytes(bytes).Iter() + for !it.Done() { + val := it.Next() + if val == nil { + nullslots = append(nullslots, uint32(len(vals))) + vals = append(vals, false) + } else { + vals = append(vals, zed.DecodeBool(val)) + } + } + return vector.NewBool(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil + case *zed.TypeOfBytes: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfString: + var vals []string + var nullslots []uint32 + it := zcode.Bytes(bytes).Iter() + for !it.Done() { + val := it.Next() + if val == nil { + nullslots = append(nullslots, uint32(len(vals))) + } else { + vals = append(vals, zed.DecodeString(val)) + } + } + return vector.NewString(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil + case *zed.TypeOfIP: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfNet: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfNull: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) + case *zed.TypeOfType: + return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ) } - it := zcode.Iter(p.bytes) - return func(b *zcode.Builder) error { - b.Append(it.Next()) - return nil - }, nil + return nil, nil + /* + XXX + if dict := p.meta.Dict; dict != nil { + bytes := p.bytes + return func(b *zcode.Builder) error { + pos := bytes[0] + bytes = bytes[1:] + b.Append(dict[pos].Value.Bytes()) + return nil + }, nil + } + it := zcode.Iter(p.bytes) + return func(b *zcode.Builder) error { + b.Append(it.Next()) + return nil + }, nil + + /* XXX + + return nil, fmt.Errorf("internal error: vcache.Primitive.Load uknown type %T", typ) + */ } type Const struct { bytes zcode.Bytes } -func NewConst(meta *vector.Const) *Const { - return &Const{bytes: meta.Value.Bytes()} +func NewConst(m *meta.Const) *Const { + return &Const{bytes: m.Value.Bytes()} } +/* func (c *Const) NewIter(r io.ReaderAt) (iterator, error) { return func(b *zcode.Builder) error { b.Append(c.bytes) return nil }, nil } +*/ diff --git a/runtime/vcache/projection.go b/runtime/vcache/projection.go deleted file mode 100644 index a12691ed8a..0000000000 --- a/runtime/vcache/projection.go +++ /dev/null @@ -1,134 +0,0 @@ -package vcache - -import ( - "fmt" - "strings" - - "github.com/brimdata/zed" - "github.com/brimdata/zed/pkg/storage" - "github.com/brimdata/zed/zcode" - "github.com/brimdata/zed/zio" - "golang.org/x/sync/errgroup" -) - -type Projection struct { - object *Object - cuts []*cut - off int - builder zcode.Builder - val zed.Value -} - -type cut struct { - it iterator - typ zed.Type -} - -var _ zio.Reader = (*Projection)(nil) - -func NewProjection(o *Object, names []string) (*Projection, error) { - cuts, err := findCuts(o, names) - if err != nil { - return nil, err - } - return &Projection{ - object: o, - cuts: cuts, - }, nil -} - -func (p *Projection) Read() (*zed.Value, error) { - o := p.object - var c *cut - for c == nil { - if p.off >= len(o.typeIDs) { - return nil, nil - } - id := o.typeIDs[p.off] - p.off++ - c = p.cuts[id] - } - p.builder.Truncate() - if err := c.it(&p.builder); err != nil { - return nil, err - } - p.val = *zed.NewValue(c.typ, p.builder.Bytes().Body()) - return &p.val, nil -} - -func findCuts(o *Object, names []string) ([]*cut, error) { - var dirty bool - cuts := make([]*cut, len(o.types)) - var group errgroup.Group - // Loop through each type to determine if there is a cut and build - // a cut for that type. The creation of all the iterators is done - // in parallel to avoid synchronous round trips to storage. - for k, typ := range o.types { - recType := zed.TypeRecordOf(typ) - if recType == nil { - continue - } - fields := Under(o.vectors[k]).(Record) - var actuals []string - for _, name := range names { - if _, ok := recType.IndexOfField(name); !ok { - continue - } - actuals = append(actuals, name) - } - if len(actuals) == 0 { - continue - } - dirty = true - whichCut := k - group.Go(func() error { - c, err := newCut(o.local, recType, fields, actuals, o.reader) - cuts[whichCut] = c - return err - }) - } - if err := group.Wait(); err != nil { - return nil, err - } - if !dirty { - return nil, fmt.Errorf("none of the specified fields were found: %s", strings.Join(names, ", ")) - } - return cuts, nil -} - -func newCut(zctx *zed.Context, typ *zed.TypeRecord, fields []Vector, actuals []string, reader storage.Reader) (*cut, error) { - var group errgroup.Group - iters := make([]iterator, len(actuals)) - outFields := make([]zed.Field, len(actuals)) - for k, name := range actuals { - i, _ := typ.IndexOfField(name) - outFields[k] = typ.Fields[i] - which := k - group.Go(func() error { - it, err := fields[i].NewIter(reader) - if err != nil { - return err - } - iters[which] = it - return nil - }) - } - if err := group.Wait(); err != nil { - return nil, err - } - outType, err := zctx.LookupTypeRecord(outFields) - if err != nil { - return nil, err - } - project := func(b *zcode.Builder) error { - b.BeginContainer() - for _, it := range iters { - if err := it(b); err != nil { - return err - } - } - b.EndContainer() - return nil - } - return &cut{it: project, typ: outType}, nil -} diff --git a/runtime/vcache/reader.go b/runtime/vcache/reader.go index 844457a120..11277a51d6 100644 --- a/runtime/vcache/reader.go +++ b/runtime/vcache/reader.go @@ -1,11 +1,6 @@ package vcache -import ( - "github.com/brimdata/zed" - "github.com/brimdata/zed/zcode" - "github.com/brimdata/zed/zio" -) - +/* type Reader struct { object *Object iters []iterator @@ -17,25 +12,32 @@ type Reader struct { var _ zio.Reader = (*Reader)(nil) func (r *Reader) Read() (*zed.Value, error) { - o := r.object - if r.off >= len(o.typeIDs) { - return nil, nil - } - id := o.typeIDs[r.off] - r.off++ - it := r.iters[id] - if it == nil { - var err error - it, err = o.vectors[id].NewIter(o.reader) - if err != nil { - return nil, err - } - r.iters[id] = it - } - r.builder.Truncate() - if err := it(&r.builder); err != nil { - return nil, err - } - r.val = *zed.NewValue(o.types[id], r.builder.Bytes().Body()) - return &r.val, nil + o := r.object + + if r.off >= len(o.typeIDs) { + return nil, nil + } + + id := o.typeIDs[r.off] + r.off++ + it := r.iters[id] + + if it == nil { + var err error + it, err = o.vectors[id].NewIter(o.reader) + if err != nil { + return nil, err + } + r.iters[id] = it + } + + r.builder.Truncate() + + if err := it(&r.builder); err != nil { + return nil, err + } + + r.val = *zed.NewValue(o.types[id], r.builder.Bytes().Body()) + return &r.val, nil } +*/ diff --git a/runtime/vcache/record.go b/runtime/vcache/record.go index c9384496f9..d154461a0c 100644 --- a/runtime/vcache/record.go +++ b/runtime/vcache/record.go @@ -1,53 +1,37 @@ package vcache import ( + "fmt" "io" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" - "golang.org/x/sync/errgroup" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" + "github.com/brimdata/zed/zson" ) -type Record []Vector +//XXX we need locking as multiple threads can access Native columns concurrently +// should do a fast lookup on the path -func NewRecord(fields []vector.Field, r io.ReaderAt) (Record, error) { - record := make([]Vector, 0, len(fields)) - for _, field := range fields { - v, err := NewVector(field.Values, r) - if err != nil { - return nil, err - } - record = append(record, v) +func loadRecord(any *vector.Any, typ *zed.TypeRecord, path field.Path, meta *meta.Record, r io.ReaderAt) (vector.Any, error) { + if *any == nil { + *any = vector.NewRecord(typ) } - return record, nil -} - -func (r Record) NewIter(reader io.ReaderAt) (iterator, error) { - fields := make([]iterator, len(r)) - var group errgroup.Group - for k, f := range r { - which := k - field := f - group.Go(func() error { - it, err := field.NewIter(reader) - if err != nil { - return err - } - fields[which] = it - return nil - }) + vec, ok := (*any).(*vector.Record) + if !ok { + return nil, fmt.Errorf("system error: vcache.loadRecord not a record type %q", zson.String(vec.Typ)) } - if err := group.Wait(); err != nil { - return nil, err + if len(path) == 0 { + return vec, nil } - return func(b *zcode.Builder) error { - b.BeginContainer() - for _, it := range fields { - if err := it(b); err != nil { - return err - } - } - b.EndContainer() - return nil - }, nil + fieldName := path[0] + off, ok := vec.Typ.IndexOfField(fieldName) + if !ok { + return nil, fmt.Errorf("system error: vcache.loadRecord no such field %q in record type %q", fieldName, zson.String(vec.Typ)) + } + return loadVector(&vec.Fields[off], typ.Fields[off].Type, path[1:], meta.Fields[off].Values, r) } + +// XXX since cache is persistent across queries, does it still make sense to +// have context.Context buried in the reader? diff --git a/runtime/vcache/union.go b/runtime/vcache/union.go index 946a7f838c..ea6d31bc16 100644 --- a/runtime/vcache/union.go +++ b/runtime/vcache/union.go @@ -5,33 +5,32 @@ import ( "io" "github.com/brimdata/zed" - "github.com/brimdata/zed/vng" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" - "golang.org/x/sync/errgroup" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" + "github.com/brimdata/zed/zson" ) -type Union struct { - values []Vector - tags []int32 - segmap []vector.Segment -} - -func NewUnion(union *vector.Union, r io.ReaderAt) (*Union, error) { - values := make([]Vector, 0, len(union.Values)) - for _, val := range union.Values { - v, err := NewVector(val, r) +func loadUnion(any *vector.Any, typ *zed.TypeUnion, path field.Path, m *meta.Union, r io.ReaderAt) (*vector.Union, error) { + if *any == nil { + *any = vector.NewUnion(typ) + } + vec, ok := (*any).(*vector.Union) + if !ok { + return nil, fmt.Errorf("system error: vcache.loadUnion not a union type %q", zson.String(vec.Typ)) + } + //XXX should just load paths we want here? for now, load everything. + for k := range vec.Values { + var err error + _, err = loadVector(&vec.Values[k], typ.Types[k], path, m.Values[k], r) if err != nil { return nil, err } - values = append(values, v) } - return &Union{ - values: values, - segmap: union.Tags, - }, nil + return vec, nil } +/* func (u *Union) NewIter(reader io.ReaderAt) (iterator, error) { if u.tags == nil { tags, err := vng.ReadIntVector(u.segmap, reader) @@ -73,3 +72,4 @@ func (u *Union) NewIter(reader io.ReaderAt) (iterator, error) { return nil }, nil } +*/ diff --git a/runtime/vcache/vector.go b/runtime/vcache/vector.go index f36e684f38..3606398d2f 100644 --- a/runtime/vcache/vector.go +++ b/runtime/vcache/vector.go @@ -3,59 +3,46 @@ package vcache import ( "fmt" "io" + "strings" - "github.com/brimdata/zed/vng/vector" - "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/field" + "github.com/brimdata/zed/vector" + meta "github.com/brimdata/zed/vng/vector" ) -type iterator func(*zcode.Builder) error - -// Vector is the primary interface to in-memory sequences of Zed values -// representing the VNG vector format. As we implement additional optimizations -// and various forms of pushdown, we will enhance this interface with -// corresponding methods. -type Vector interface { - NewIter(io.ReaderAt) (iterator, error) -} - -// NewVector converts a VNG metadata reader to its equivalent vector cache -// metadata manager. -func NewVector(meta vector.Metadata, r io.ReaderAt) (Vector, error) { - switch meta := meta.(type) { - case *vector.Named: - return NewVector(meta.Values, r) - case *vector.Record: - return NewRecord(meta.Fields, r) - case *vector.Primitive: - return NewPrimitive(meta) - case *vector.Array: - return NewArray(meta, r) - case *vector.Set: - a := *(*vector.Array)(meta) - return NewArray(&a, r) - case *vector.Map: - return NewMap(meta, r) - case *vector.Union: - return NewUnion(meta, r) - case *vector.Nulls: - values, err := NewVector(meta.Values, r) - if err != nil { - return nil, err +func loadVector(any *vector.Any, typ zed.Type, path field.Path, m meta.Metadata, r io.ReaderAt) (vector.Any, error) { + switch m := m.(type) { + case *meta.Named: + return loadVector(any, typ.(*zed.TypeNamed).Type, path, m.Values, r) + case *meta.Record: + return loadRecord(any, typ.(*zed.TypeRecord), path, m, r) + case *meta.Primitive: + if len(path) != 0 { + return nil, fmt.Errorf("internal error: vcache encountered path at primitive element: %q", strings.Join(path, ".")) } - return NewNulls(meta, values, r) - case *vector.Const: - return NewConst(meta), nil - default: - return nil, fmt.Errorf("vector cache: type %T not supported", meta) - } -} - -func Under(v Vector) Vector { - for { - if nulls, ok := v.(*Nulls); ok { - v = nulls.values - continue + if *any == nil { + v, err := loadPrimitive(typ, m, r) + if err != nil { + return nil, err + } + *any = v } - return v + return *any, nil + case *meta.Array: + return loadArray(any, typ, path, m, r) + case *meta.Set: + a := *(*meta.Array)(m) + return loadArray(any, typ, path, &a, r) + case *meta.Map: + return loadMap(any, typ, path, m, r) + case *meta.Union: + return loadUnion(any, typ.(*zed.TypeUnion), path, m, r) + case *meta.Nulls: + return loadNulls(any, typ, path, m, r) + case *meta.Const: + return vector.NewConst(m.Value), nil + default: + return nil, fmt.Errorf("vector cache: type %T not supported", m) } } diff --git a/vector/any.go b/vector/any.go new file mode 100644 index 0000000000..61781913f5 --- /dev/null +++ b/vector/any.go @@ -0,0 +1,27 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +type Any interface { + Type() zed.Type + Ref() + Unref() + NewBuilder() Builder +} + +/* XXX don't need this anymore? Nullmask carries the nulls without a special vector +func Under(a Any) Any { + for { + if nulls, ok := a.(*Nulls); ok { + a = nulls.values + continue + } + return a + } +} +*/ + +type Builder func(*zcode.Builder) bool diff --git a/vector/array.go b/vector/array.go new file mode 100644 index 0000000000..31e9729fbf --- /dev/null +++ b/vector/array.go @@ -0,0 +1,26 @@ +package vector + +import ( + "github.com/brimdata/zed" +) + +type Array struct { + mem + Typ *zed.TypeArray //XXX type array or set + Lengths []int32 + Values Any +} + +var _ Any = (*Array)(nil) + +func NewArray(typ *zed.TypeArray, lengths []int32, values Any) *Array { + return &Array{Typ: typ, Lengths: lengths, Values: values} +} + +func (a *Array) Type() zed.Type { + return a.Typ +} + +func (a *Array) NewBuilder() Builder { + return nil //XXX +} diff --git a/vector/bool.go b/vector/bool.go new file mode 100644 index 0000000000..e0213b6b70 --- /dev/null +++ b/vector/bool.go @@ -0,0 +1,42 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +type Bool struct { + mem + Typ zed.Type + Values []bool //XXX bit vector + Nulls Nullmask +} + +var _ Any = (*Int)(nil) + +func NewBool(typ zed.Type, vals []bool, nulls Nullmask) *Bool { + return &Bool{Typ: typ, Values: vals, Nulls: nulls} +} + +func (b *Bool) Type() zed.Type { + return b.Typ +} + +func (b *Bool) NewBuilder() Builder { + vals := b.Values + nulls := b.Nulls + var voff int + return func(b *zcode.Builder) bool { + if voff < len(vals) { + if !nulls.Has(uint32(voff)) { + b.Append(zed.EncodeBool(vals[voff])) + } else { + b.Append(nil) + } + voff++ + return true + + } + return false + } +} diff --git a/vector/const.go b/vector/const.go new file mode 100644 index 0000000000..d4d8220c4b --- /dev/null +++ b/vector/const.go @@ -0,0 +1,24 @@ +package vector + +import ( + "github.com/brimdata/zed" +) + +type Const struct { + val *zed.Value +} + +func NewConst(val *zed.Value) *Const { + return &Const{val: val} +} + +func (c *Const) Type() zed.Type { + return c.val.Type +} + +func (*Const) Ref() {} +func (*Const) Unref() {} + +func (c *Const) NewBuilder() Builder { + return nil //XXX +} diff --git a/vector/int.go b/vector/int.go new file mode 100644 index 0000000000..8af3ef4fcc --- /dev/null +++ b/vector/int.go @@ -0,0 +1,42 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +type Int struct { + mem + Typ zed.Type + Values []int64 + Nulls Nullmask +} + +var _ Any = (*Int)(nil) + +func NewInt(typ zed.Type, vals []int64, nulls Nullmask) *Int { + return &Int{Typ: typ, Values: vals, Nulls: nulls} +} + +func (i *Int) Type() zed.Type { + return i.Typ +} + +func (i *Int) NewBuilder() Builder { + vals := i.Values + nulls := i.Nulls + var voff int + return func(b *zcode.Builder) bool { + if voff < len(vals) { + if nulls.Has(uint32(voff)) { + b.Append(nil) + } else { + b.Append(zed.EncodeInt(vals[voff])) + } + voff++ + return true + + } + return false + } +} diff --git a/vector/map.go b/vector/map.go new file mode 100644 index 0000000000..96acd7a4b7 --- /dev/null +++ b/vector/map.go @@ -0,0 +1,26 @@ +package vector + +import ( + "github.com/brimdata/zed" +) + +type Map struct { + mem + Typ *zed.TypeMap + Keys Any + Values Any +} + +var _ Any = (*Map)(nil) + +func NewMap(typ *zed.TypeMap, keys Any, values Any) *Map { + return &Map{Typ: typ, Keys: keys, Values: values} +} + +func (m *Map) Type() zed.Type { + return m.Typ +} + +func (m *Map) NewBuilder() Builder { + return nil //XXX +} diff --git a/vector/nulls.go b/vector/nulls.go new file mode 100644 index 0000000000..8bab5807d9 --- /dev/null +++ b/vector/nulls.go @@ -0,0 +1,23 @@ +package vector + +type Nullmask []byte //XXX change to uint64 + +func NewNullmask(slots []uint32, nvals int) Nullmask { + var nulls Nullmask + if len(slots) > 0 { + nulls = make([]byte, (nvals+7)/8) + for _, slot := range slots { + nulls[slot>>3] |= 1 << (slot & 7) + } + } + return nulls +} + +func (n Nullmask) Has(slot uint32) bool { + off := slot / 8 + if off >= uint32(len(n)) { + return false + } + pos := slot & 7 + return (n[off] & (1 << pos)) != 0 +} diff --git a/vector/record.go b/vector/record.go new file mode 100644 index 0000000000..e2c812736a --- /dev/null +++ b/vector/record.go @@ -0,0 +1,55 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +// XXX need to create memory model +type mem struct{} + +func (*mem) Ref() {} +func (*mem) Unref() {} + +type Record struct { + mem + Typ *zed.TypeRecord + Fields []Any +} + +var _ Any = (*Record)(nil) + +func NewRecord(typ *zed.TypeRecord) *Record { + return NewRecordWithFields(typ, make([]Any, len(typ.Fields))) +} + +func NewRecordWithFields(typ *zed.TypeRecord, fields []Any) *Record { + return &Record{Typ: typ, Fields: fields} +} + +func (r *Record) Type() zed.Type { + return r.Typ +} + +func (r *Record) NewBuilder() Builder { + //XXX + fields := make([]Builder, 0, len(r.Fields)) + for _, v := range r.Fields { + fields = append(fields, v.NewBuilder()) + } + //XXX should change Builder API to not return bool because + // you should never be called if you would return a nil... + // the top level needs to know how much stuff there is, no? + // That said, we should be robust to file errors like bad runlens + // and return an error instead of panic. + return func(b *zcode.Builder) bool { + b.BeginContainer() + for _, f := range fields { + if !f(b) { + return false + } + } + b.EndContainer() + return true + } +} diff --git a/vector/string.go b/vector/string.go new file mode 100644 index 0000000000..c868bfa1f4 --- /dev/null +++ b/vector/string.go @@ -0,0 +1,42 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +type String struct { + mem + Typ zed.Type + Values []string + Nulls Nullmask +} + +var _ Any = (*String)(nil) + +func NewString(typ zed.Type, vals []string, nulls Nullmask) *String { + return &String{Typ: typ, Values: vals, Nulls: nulls} +} + +func (s *String) Type() zed.Type { + return s.Typ +} + +func (s *String) NewBuilder() Builder { + vals := s.Values + nulls := s.Nulls + var voff int + return func(b *zcode.Builder) bool { + if voff < len(vals) { + if !nulls.Has(uint32(voff)) { + b.Append(zed.EncodeString(vals[voff])) + } else { + b.Append(nil) + } + voff++ + return true + + } + return false + } +} diff --git a/vector/uint.go b/vector/uint.go new file mode 100644 index 0000000000..ca9a5f2207 --- /dev/null +++ b/vector/uint.go @@ -0,0 +1,42 @@ +package vector + +import ( + "github.com/brimdata/zed" + "github.com/brimdata/zed/zcode" +) + +type Uint struct { + mem + Typ zed.Type + Values []uint64 + Nulls Nullmask +} + +var _ Any = (*Uint)(nil) + +func NewUint(typ zed.Type, vals []uint64, nulls Nullmask) *Uint { + return &Uint{Typ: typ, Values: vals, Nulls: nulls} +} + +func (u *Uint) Type() zed.Type { + return u.Typ +} + +func (u *Uint) NewBuilder() Builder { + vals := u.Values + nulls := u.Nulls + var voff int + return func(b *zcode.Builder) bool { + if voff < len(vals) { + if !nulls.Has(uint32(voff)) { + b.Append(zed.EncodeUint(vals[voff])) + } else { + b.Append(nil) + } + voff++ + return true + + } + return false + } +} diff --git a/vector/union.go b/vector/union.go new file mode 100644 index 0000000000..5ecfb759bb --- /dev/null +++ b/vector/union.go @@ -0,0 +1,25 @@ +package vector + +import ( + "github.com/brimdata/zed" +) + +type Union struct { + mem + Typ *zed.TypeUnion + Values []Any +} + +var _ Any = (*Union)(nil) + +func NewUnion(typ *zed.TypeUnion) *Union { + return &Union{Typ: typ, Values: make([]Any, len(typ.Types))} +} + +func (u *Union) Type() zed.Type { + return u.Typ +} + +func (u *Union) NewBuilder() Builder { + return nil //XXX +}