diff --git a/runtime/vcache/loader.go b/runtime/vcache/loader.go index f2a9e06f8f..33663f709b 100644 --- a/runtime/vcache/loader.go +++ b/runtime/vcache/loader.go @@ -441,7 +441,7 @@ func (l *loader) loadUint32(g *errgroup.Group, mu *sync.Mutex, slice *[]uint32, if *slice != nil { return nil } - v, err := vng.ReadUint32Vector(loc, l.r) + v, err := vng.ReadUint32s(loc, l.r) if err != nil { return err } @@ -463,7 +463,7 @@ func (l *loader) loadOffsets(g *errgroup.Group, mu *sync.Mutex, slice *[]uint32, if *slice != nil { return nil } - v, err := vng.ReadUint32Vector(loc, l.r) + v, err := vng.ReadUint32s(loc, l.r) if err != nil { return err } diff --git a/runtime/vcache/nulls.go b/runtime/vcache/nulls.go index 3563db00ae..4d17f0e44c 100644 --- a/runtime/vcache/nulls.go +++ b/runtime/vcache/nulls.go @@ -34,28 +34,24 @@ func (n *nulls) fetch(g *errgroup.Group, reader io.ReaderAt) { } length := n.meta.Count + n.meta.Values.Len() n.local = vector.NewBoolEmpty(length, nil) - runlens := vng.NewInt64Decoder(n.meta.Runs, reader) //XXX 32-bit reader? + runlens, err := vng.ReadUint32s(n.meta.Runs, reader) + if err != nil { + return err + } var null bool - var off int + var off uint32 b := n.local - for { - run, err := runlens.Next() - if err != nil { - if err == io.EOF { - n.meta = nil - err = nil - } - return err - } + for _, run := range runlens { if null { - for i := 0; int64(i) < run; i++ { - slot := uint32(off + i) + for i := range run { + slot := off + i b.Set(slot) } } - off += int(run) + off += run null = !null } + return nil }) } diff --git a/vng/array.go b/vng/array.go index 4e3722365f..2ad27b0fbd 100644 --- a/vng/array.go +++ b/vng/array.go @@ -58,39 +58,6 @@ func (a *ArrayEncoder) Metadata(off uint64) (uint64, Metadata) { } } -type ArrayBuilder struct { - Elems Builder - Lengths *Int64Decoder -} - -var _ Builder = (*ArrayBuilder)(nil) - -func NewArrayBuilder(array *Array, r io.ReaderAt) (*ArrayBuilder, error) { - elems, err := NewBuilder(array.Values, r) - if err != nil { - return nil, err - } - return &ArrayBuilder{ - Elems: elems, - Lengths: NewInt64Decoder(array.Lengths, r), - }, nil -} - -func (a *ArrayBuilder) Build(b *zcode.Builder) error { - len, err := a.Lengths.Next() - if err != nil { - return err - } - b.BeginContainer() - for k := 0; k < int(len); k++ { - if err := a.Elems.Build(b); err != nil { - return err - } - } - b.EndContainer() - return nil -} - type SetEncoder struct { ArrayEncoder } diff --git a/vng/builder.go b/vng/builder.go deleted file mode 100644 index 7bc7a9bcbb..0000000000 --- a/vng/builder.go +++ /dev/null @@ -1,48 +0,0 @@ -package vng - -import ( - "fmt" - "io" - - "github.com/brimdata/super/zcode" -) - -type Builder interface { - Build(*zcode.Builder) error -} - -func NewBuilder(meta Metadata, r io.ReaderAt) (Builder, error) { - switch meta := meta.(type) { - case nil: - return nil, nil - case *Nulls: - inner, err := NewBuilder(meta.Values, r) - if err != nil { - return nil, err - } - return NewNullsBuilder(inner, meta.Runs, r), nil - case *Named: - return NewBuilder(meta.Values, r) - case *Error: - return NewBuilder(meta.Values, r) - case *Record: - return NewRecordBuilder(meta, r) - case *Array: - return NewArrayBuilder(meta, r) - case *Set: - return NewArrayBuilder((*Array)(meta), r) - case *Map: - return NewMapBuilder(meta, r) - case *Union: - return NewUnionBuilder(meta, r) - case *Primitive: - if len(meta.Dict) != 0 { - return NewDictBuilder(meta, r), nil - } - return NewPrimitiveBuilder(meta, r), nil - case *Const: - return NewConstBuilder(meta), nil - default: - return nil, fmt.Errorf("unknown VNG metadata type: %T", meta) - } -} diff --git a/vng/dynamic.go b/vng/dynamic.go index 4cb7c918fe..435464652f 100644 --- a/vng/dynamic.go +++ b/vng/dynamic.go @@ -1,11 +1,9 @@ package vng import ( - "fmt" "io" "github.com/brimdata/super" - "github.com/brimdata/super/zcode" "github.com/brimdata/super/zio" "golang.org/x/sync/errgroup" ) @@ -86,84 +84,3 @@ func (d *DynamicEncoder) Emit(w io.Writer) error { } return nil } - -type dynamicBuilder struct { - types []super.Type - tags *Int64Decoder - values []Builder - builder *zcode.Builder -} - -func newDynamicBuilder(zctx *super.Context, d *Dynamic, reader io.ReaderAt) (*dynamicBuilder, error) { - values := make([]Builder, 0, len(d.Values)) - types := make([]super.Type, 0, len(d.Values)) - for _, val := range d.Values { - r, err := NewBuilder(val, reader) - if err != nil { - return nil, err - } - values = append(values, r) - types = append(types, val.Type(zctx)) - } - return &dynamicBuilder{ - types: types, - tags: NewInt64Decoder(d.Tags, reader), - values: values, - builder: zcode.NewBuilder(), - }, nil -} - -func (d *dynamicBuilder) Read() (*super.Value, error) { - b := d.builder - b.Truncate() - tag, err := d.tags.Next() - if err != nil { - if err == io.EOF { - err = nil - } - return nil, err - } - if int(tag) >= len(d.types) { - return nil, fmt.Errorf("bad tag encountered scanning VNG dynamic: tag %d when only %d types", tag, len(d.types)) - } - if err := d.values[tag].Build(b); err != nil { - return nil, err - } - return super.NewValue(d.types[tag], b.Bytes().Body()).Ptr(), nil -} - -func NewZedReader(zctx *super.Context, meta Metadata, r io.ReaderAt) (zio.Reader, error) { - if d, ok := meta.(*Dynamic); ok { - return newDynamicBuilder(zctx, d, r) - } - values, err := NewBuilder(meta, r) - if err != nil { - return nil, err - } - return &vectorBuilder{ - typ: meta.Type(zctx), - values: values, - builder: zcode.NewBuilder(), - count: meta.Len(), - }, nil -} - -type vectorBuilder struct { - typ super.Type - values Builder - builder *zcode.Builder - count uint32 -} - -func (v *vectorBuilder) Read() (*super.Value, error) { - if v.count == 0 { - return nil, nil - } - v.count-- - b := v.builder - b.Truncate() - if err := v.values.Build(b); err != nil { - return nil, err - } - return super.NewValue(v.typ, b.Bytes().Body()).Ptr(), nil -} diff --git a/vng/field.go b/vng/field.go index bef0e013db..32e5ddc42c 100644 --- a/vng/field.go +++ b/vng/field.go @@ -31,21 +31,3 @@ func (f *FieldEncoder) Encode(group *errgroup.Group) { func (f *FieldEncoder) Emit(w io.Writer) error { return f.values.Emit(w) } - -type FieldBuilder struct { - Values Builder -} - -func NewFieldBuilder(field Field, r io.ReaderAt) (*FieldBuilder, error) { - values, err := NewBuilder(field.Values, r) - if err != nil { - return nil, err - } - return &FieldBuilder{ - Values: values, - }, nil -} - -func (f *FieldBuilder) Build(b *zcode.Builder) error { - return f.Values.Build(b) -} diff --git a/vng/int.go b/vng/int.go index cbbb149e20..9258408553 100644 --- a/vng/int.go +++ b/vng/int.go @@ -4,6 +4,7 @@ import ( "io" "github.com/brimdata/super" + "github.com/brimdata/super/zcode" ) type Int64Encoder struct { @@ -18,18 +19,17 @@ func (p *Int64Encoder) Write(v int64) { p.PrimitiveEncoder.Write(super.EncodeInt(v)) } -type Int64Decoder struct { - PrimitiveBuilder -} - -func NewInt64Decoder(loc Segment, r io.ReaderAt) *Int64Decoder { - return &Int64Decoder{*NewPrimitiveBuilder(&Primitive{Typ: super.TypeInt64, Location: loc}, r)} -} - -func (p *Int64Decoder) Next() (int64, error) { - zv, err := p.ReadBytes() - if err != nil { - return 0, err +func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) { + buf := make([]byte, loc.MemLength) + if err := loc.Read(r, buf); err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + var vals []uint32 + for it := zcode.Iter(buf); !it.Done(); { + vals = append(vals, uint32(super.DecodeInt(it.Next()))) } - return super.DecodeInt(zv), err + return vals, nil } diff --git a/vng/map.go b/vng/map.go index 429ab8743f..bcc4df31f0 100644 --- a/vng/map.go +++ b/vng/map.go @@ -62,45 +62,3 @@ func (m *MapEncoder) Encode(group *errgroup.Group) { m.keys.Encode(group) m.values.Encode(group) } - -type MapBuilder struct { - Keys Builder - Values Builder - Lengths *Int64Decoder -} - -var _ Builder = (*MapBuilder)(nil) - -func NewMapBuilder(m *Map, r io.ReaderAt) (*MapBuilder, error) { - keys, err := NewBuilder(m.Keys, r) - if err != nil { - return nil, err - } - values, err := NewBuilder(m.Values, r) - if err != nil { - return nil, err - } - return &MapBuilder{ - Keys: keys, - Values: values, - Lengths: NewInt64Decoder(m.Lengths, r), - }, nil -} - -func (m *MapBuilder) Build(b *zcode.Builder) error { - len, err := m.Lengths.Next() - if err != nil { - return err - } - b.BeginContainer() - for k := 0; k < int(len); k++ { - if err := m.Keys.Build(b); err != nil { - return err - } - if err := m.Values.Build(b); err != nil { - return err - } - } - b.EndContainer() - return nil -} diff --git a/vng/nulls.go b/vng/nulls.go index 9e8108ee69..0e88695be7 100644 --- a/vng/nulls.go +++ b/vng/nulls.go @@ -87,40 +87,3 @@ func (n *NullsEncoder) Emit(w io.Writer) error { } return nil } - -type NullsBuilder struct { - Values Builder - Runs Int64Decoder - null bool - run int -} - -var _ (Builder) = (*NullsBuilder)(nil) - -func NewNullsBuilder(values Builder, loc Segment, r io.ReaderAt) *NullsBuilder { - // We start out with null true so it is immediately flipped to - // false on the first call to Read. - return &NullsBuilder{ - Values: values, - Runs: *NewInt64Decoder(loc, r), - null: true, - } -} - -func (n *NullsBuilder) Build(b *zcode.Builder) error { - run := n.run - for run == 0 { - n.null = !n.null - v, err := n.Runs.Next() - if err != nil { - return err - } - run = int(v) - } - n.run = run - 1 - if n.null { - b.Append(nil) - return nil - } - return n.Values.Build(b) -} diff --git a/vng/object.go b/vng/object.go index 27ed7cc869..605164eb79 100644 --- a/vng/object.go +++ b/vng/object.go @@ -29,7 +29,6 @@ import ( "io" "github.com/brimdata/super" - "github.com/brimdata/super/zio" "github.com/brimdata/super/zio/zngio" "github.com/brimdata/super/zson" ) @@ -71,10 +70,6 @@ func (o *Object) DataReader() io.ReaderAt { return o.readerAt } -func (o *Object) NewReader(zctx *super.Context) (zio.Reader, error) { - return NewZedReader(zctx, o.meta, o.readerAt) -} - func (o *Object) Size() uint64 { return HeaderSize + o.header.MetaSize + o.header.DataSize } @@ -100,34 +95,3 @@ func readMetadata(r io.Reader) (Metadata, error) { } return meta, nil } - -// XXX change this to single vector read -func ReadIntVector(loc Segment, r io.ReaderAt) ([]int32, error) { - decoder := NewInt64Decoder(loc, r) - var out []int32 - for { - val, err := decoder.Next() - if err != nil { - if err == io.EOF { - return out, nil - } - return nil, err - } - out = append(out, int32(val)) - } -} - -func ReadUint32Vector(loc Segment, r io.ReaderAt) ([]uint32, error) { - decoder := NewInt64Decoder(loc, r) - var out []uint32 - for { - val, err := decoder.Next() - if err != nil { - if err == io.EOF { - return out, nil - } - return nil, err - } - out = append(out, uint32(val)) - } -} diff --git a/vng/primitive.go b/vng/primitive.go index 970c876a6c..c1872dbe8a 100644 --- a/vng/primitive.go +++ b/vng/primitive.go @@ -1,7 +1,6 @@ package vng import ( - "fmt" "io" "sort" @@ -182,111 +181,3 @@ func sortDict(entries []DictEntry, cmp expr.CompareFn) { return cmp(entries[i].Value, entries[j].Value) < 0 }) } - -type PrimitiveBuilder struct { - Typ super.Type - - loc Segment - reader io.ReaderAt - - buf []byte - it zcode.Iter -} - -func NewPrimitiveBuilder(primitive *Primitive, reader io.ReaderAt) *PrimitiveBuilder { - return &PrimitiveBuilder{ - Typ: primitive.Typ, - reader: reader, - loc: primitive.Location, - } -} - -func (p *PrimitiveBuilder) Build(b *zcode.Builder) error { - zv, err := p.ReadBytes() - if err == nil { - b.Append(zv) - } - return err -} - -func (p *PrimitiveBuilder) ReadBytes() (zcode.Bytes, error) { - if p.buf == nil { - p.buf = make([]byte, p.loc.MemLength) - if err := p.loc.Read(p.reader, p.buf); err != nil { - return nil, err - } - p.it = zcode.Iter(p.buf) - } - if p.it == nil || p.it.Done() { - return nil, io.EOF - } - return p.it.Next(), nil -} - -type DictBuilder struct { - Typ super.Type - - loc Segment - reader io.ReaderAt - dict []DictEntry - selectors []byte - off int -} - -var _ Builder = (*DictBuilder)(nil) - -func NewDictBuilder(primitive *Primitive, reader io.ReaderAt) *DictBuilder { - return &DictBuilder{ - Typ: primitive.Typ, - reader: reader, - loc: primitive.Location, - dict: primitive.Dict, - } -} - -func (d *DictBuilder) Build(b *zcode.Builder) error { - bytes, err := d.ReadBytes() - if err == nil { - b.Append(bytes) - } - return err -} - -func (d *DictBuilder) ReadBytes() (zcode.Bytes, error) { - if d.selectors == nil { - d.selectors = make([]byte, d.loc.MemLength) - if err := d.loc.Read(d.reader, d.selectors); err != nil { - return nil, err - } - } - if d.off >= len(d.selectors) { - return nil, io.EOF - } - sel := int(d.selectors[d.off]) - d.off++ - if sel >= len(d.dict) { - return nil, fmt.Errorf("corrupt VNG: selector (%d) out of range (len %d)", sel, len(d.dict)) - } - return d.dict[sel].Value.Bytes(), nil -} - -type ConstBuilder struct { - Typ super.Type - bytes zcode.Bytes - cnt uint32 -} - -var _ Builder = (*ConstBuilder)(nil) - -func NewConstBuilder(c *Const) *ConstBuilder { - return &ConstBuilder{Typ: c.Value.Type(), bytes: c.Value.Bytes(), cnt: c.Count} -} - -func (c *ConstBuilder) Build(b *zcode.Builder) error { - if c.cnt == 0 { - return io.EOF - } - c.cnt-- - b.Append(c.bytes) - return nil -} diff --git a/vng/record.go b/vng/record.go index 14f88d357d..e67db68cd4 100644 --- a/vng/record.go +++ b/vng/record.go @@ -58,39 +58,3 @@ func (r *RecordEncoder) Emit(w io.Writer) error { } return nil } - -type RecordBuilder struct { - Names []string - Values []FieldBuilder -} - -var _ Builder = (*RecordBuilder)(nil) - -func NewRecordBuilder(record *Record, reader io.ReaderAt) (*RecordBuilder, error) { - names := make([]string, 0, len(record.Fields)) - values := make([]FieldBuilder, 0, len(record.Fields)) - for _, field := range record.Fields { - names = append(names, field.Name) - fr, err := NewFieldBuilder(field, reader) - if err != nil { - return nil, err - } - values = append(values, *fr) - } - result := &RecordBuilder{ - Names: names, - Values: values, - } - return result, nil -} - -func (r *RecordBuilder) Build(b *zcode.Builder) error { - b.BeginContainer() - for _, f := range r.Values { - if err := f.Build(b); err != nil { - return err - } - } - b.EndContainer() - return nil -} diff --git a/vng/union.go b/vng/union.go index f9a35f0924..7bddf00b25 100644 --- a/vng/union.go +++ b/vng/union.go @@ -1,7 +1,6 @@ package vng import ( - "errors" "io" "github.com/brimdata/super" @@ -71,42 +70,3 @@ func (u *UnionEncoder) Metadata(off uint64) (uint64, Metadata) { Length: u.count, } } - -type UnionBuilder struct { - builders []Builder - tags *Int64Decoder -} - -var _ Builder = (*UnionBuilder)(nil) - -func NewUnionBuilder(union *Union, r io.ReaderAt) (*UnionBuilder, error) { - builders := make([]Builder, 0, len(union.Values)) - for _, val := range union.Values { - b, err := NewBuilder(val, r) - if err != nil { - return nil, err - } - builders = append(builders, b) - } - return &UnionBuilder{ - builders: builders, - tags: NewInt64Decoder(union.Tags, r), - }, nil -} - -func (u *UnionBuilder) Build(b *zcode.Builder) error { - tag, err := u.tags.Next() - if err != nil { - return err - } - if tag < 0 || int(tag) >= len(u.builders) { - return errors.New("bad tag in VNG union builder") - } - b.BeginContainer() - b.Append(super.EncodeInt(tag)) - if err := u.builders[tag].Build(b); err != nil { - return err - } - b.EndContainer() - return nil -} diff --git a/zio/vngio/reader.go b/zio/vngio/reader.go index 1fec62ce72..cd71369150 100644 --- a/zio/vngio/reader.go +++ b/zio/vngio/reader.go @@ -1,22 +1,26 @@ package vngio import ( + "bytes" "errors" "io" "math" "github.com/brimdata/super" "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" "github.com/brimdata/super/vng" + "github.com/brimdata/super/zcode" "github.com/brimdata/super/zio" ) type reader struct { - zctx *super.Context - objects []*vng.Object - n int - readerAt io.ReaderAt - reader zio.Reader + zctx *super.Context + objects []*vng.Object + projection vcache.Path + readerAt io.ReaderAt + vals []super.Value } func NewReader(zctx *super.Context, r io.Reader, fields []field.Path) (zio.Reader, error) { @@ -29,31 +33,51 @@ func NewReader(zctx *super.Context, r io.Reader, fields []field.Path) (zio.Reade return nil, err } return &reader{ - zctx: zctx, - objects: objects, - readerAt: ra, + zctx: zctx, + objects: objects, + projection: vcache.NewProjection(fields), + readerAt: ra, }, nil } func (r *reader) Read() (*super.Value, error) { again: - if r.reader == nil { - if r.n >= len(r.objects) { + if len(r.vals) == 0 { + if len(r.objects) == 0 { return nil, nil } - o := r.objects[r.n] - r.n++ - var err error - if r.reader, err = o.NewReader(r.zctx); err != nil { + o := r.objects[0] + r.objects = r.objects[1:] + vec, err := vcache.NewObjectFromVNG(o).Fetch(r.zctx, r.projection) + if err != nil { return nil, err } - } - v, err := r.reader.Read() - if v == nil && err == nil { - r.reader = nil + r.materializeVector(vec) goto again } - return v, err + val := r.vals[0] + r.vals = r.vals[1:] + return &val, nil +} + +func (r *reader) materializeVector(vec vector.Any) { + r.vals = r.vals[:0] + d, _ := vec.(*vector.Dynamic) + var typ super.Type + if d == nil { + typ = vec.Type() + } + builder := zcode.NewBuilder() + n := vec.Len() + for slot := uint32(0); slot < n; slot++ { + vec.Serialize(builder, slot) + if d != nil { + typ = d.TypeOf(slot) + } + val := super.NewValue(typ, bytes.Clone(builder.Bytes().Body())) + r.vals = append(r.vals, val) + builder.Truncate() + } } func (r *reader) Close() error {