diff --git a/vng/writer.go b/vng/writer.go index ff8c6d55cf..91184dd066 100644 --- a/vng/writer.go +++ b/vng/writer.go @@ -11,6 +11,8 @@ import ( "github.com/brimdata/super/zson" ) +var maxObjectSize uint32 = 120_000 + // Writer implements the zio.Writer interface. A Writer creates a vector // VNG object from a stream of super.Records. type Writer struct { @@ -30,7 +32,7 @@ func NewWriter(w io.WriteCloser) *Writer { } func (w *Writer) Close() error { - firstErr := w.finalize() + firstErr := w.finalizeObject() if err := w.writer.Close(); err != nil && firstErr == nil { firstErr = err } @@ -38,10 +40,16 @@ func (w *Writer) Close() error { } func (w *Writer) Write(val super.Value) error { - return w.dynamic.Write(val) + if err := w.dynamic.Write(val); err != nil { + return err + } + if w.dynamic.len >= maxObjectSize { + return w.finalizeObject() + } + return nil } -func (w *Writer) finalize() error { +func (w *Writer) finalizeObject() error { meta, dataSize, err := w.dynamic.Encode() if err != nil { return fmt.Errorf("system error: could not encode VNG metadata: %w", err) @@ -74,5 +82,8 @@ func (w *Writer) finalize() error { if err := w.dynamic.Emit(w.writer); err != nil { return fmt.Errorf("system error: could not write VNG data section: %w", err) } + // Set new dynamic so we can write the next section. + w.dynamic = NewDynamicEncoder() + w.zctx.Reset() return nil } diff --git a/zio/vngio/reader.go b/zio/vngio/reader.go index 3136aee0a3..c94667e827 100644 --- a/zio/vngio/reader.go +++ b/zio/vngio/reader.go @@ -10,14 +10,72 @@ import ( "github.com/brimdata/super/zio" ) +type reader struct { + zctx *super.Context + objects []object + n int + readerAt io.ReaderAt + reader zio.Reader +} + func NewReader(zctx *super.Context, r io.Reader, fields []field.Path) (zio.Reader, error) { ra, ok := r.(io.ReaderAt) if !ok { return nil, errors.New("Super Columnar requires a seekable input") } - o, err := vng.NewObject(ra) + objects, err := readObjects(ra) if err != nil { return nil, err } - return o.NewReader(zctx) + return &reader{ + zctx: zctx, + objects: objects, + readerAt: ra, + }, nil +} + +func (r *reader) Read() (*super.Value, error) { +again: + if r.reader == nil { + if r.n >= len(r.objects) { + return nil, nil + } + meta := r.objects[r.n] + r.n++ + o, err := vng.NewObject(io.NewSectionReader(r.readerAt, meta.start, meta.offset)) + if err != nil { + return nil, err + } + r.reader, err = o.NewReader(r.zctx) + if err != nil { + return nil, err + } + } + v, err := r.reader.Read() + if v == nil && err == nil { + r.reader = nil + goto again + } + return v, err +} + +type object struct { + start, offset int64 +} + +func readObjects(r io.ReaderAt) ([]object, error) { + var objects []object + var start int64 + for { + header, err := vng.ReadHeader(io.NewSectionReader(r, start, vng.HeaderSize)) + if err != nil { + if err == io.EOF && len(objects) > 0 { + return objects, nil + } + return nil, err + } + offset := int64(vng.HeaderSize) + int64(header.MetaSize+header.DataSize) + objects = append(objects, object{start, offset}) + start += offset + } }