Skip to content

Commit

Permalink
CSUP: Allow multiple objects
Browse files Browse the repository at this point in the history
This pr changes the protocol for CSUP files so that a single file can
contain multiple vng Objects. When writing a CSUP file a new object is
created every 120,000 values.
  • Loading branch information
mattnibs committed Nov 26, 2024
1 parent f166ba9 commit 85b4740
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
17 changes: 14 additions & 3 deletions vng/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/brimdata/super/zson"
)

var maxObjectSize uint32 = 120_000

// Writer implements the zio.Writer interface. A Writer creates a vector
// VNG object from a stream of super.Records.
type Writer struct {
Expand All @@ -30,18 +32,24 @@ func NewWriter(w io.WriteCloser) *Writer {
}

func (w *Writer) Close() error {
firstErr := w.finalize()
firstErr := w.finalizeObject()
if err := w.writer.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}

func (w *Writer) Write(val super.Value) error {
return w.dynamic.Write(val)
if err := w.dynamic.Write(val); err != nil {
return err
}
if w.dynamic.len >= maxObjectSize {
return w.finalizeObject()
}
return nil
}

func (w *Writer) finalize() error {
func (w *Writer) finalizeObject() error {
meta, dataSize, err := w.dynamic.Encode()
if err != nil {
return fmt.Errorf("system error: could not encode VNG metadata: %w", err)
Expand Down Expand Up @@ -74,5 +82,8 @@ func (w *Writer) finalize() error {
if err := w.dynamic.Emit(w.writer); err != nil {
return fmt.Errorf("system error: could not write VNG data section: %w", err)
}
// Set new dynamic so we can write the next section.
w.dynamic = NewDynamicEncoder()
w.zctx.Reset()
return nil
}
62 changes: 60 additions & 2 deletions zio/vngio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,72 @@ import (
"github.com/brimdata/super/zio"
)

type reader struct {
zctx *super.Context
objects []object
n int
readerAt io.ReaderAt
reader zio.Reader
}

func NewReader(zctx *super.Context, r io.Reader, fields []field.Path) (zio.Reader, error) {
ra, ok := r.(io.ReaderAt)
if !ok {
return nil, errors.New("Super Columnar requires a seekable input")
}
o, err := vng.NewObject(ra)
objects, err := readObjects(ra)
if err != nil {
return nil, err
}
return o.NewReader(zctx)
return &reader{
zctx: zctx,
objects: objects,
readerAt: ra,
}, nil
}

func (r *reader) Read() (*super.Value, error) {
again:
if r.reader == nil {
if r.n >= len(r.objects) {
return nil, nil
}
meta := r.objects[r.n]
r.n++
o, err := vng.NewObject(io.NewSectionReader(r.readerAt, meta.start, meta.offset))
if err != nil {
return nil, err
}
r.reader, err = o.NewReader(r.zctx)
if err != nil {
return nil, err
}
}
v, err := r.reader.Read()
if v == nil && err == nil {
r.reader = nil
goto again
}
return v, err
}

type object struct {
start, offset int64
}

func readObjects(r io.ReaderAt) ([]object, error) {
var objects []object
var start int64
for {
header, err := vng.ReadHeader(io.NewSectionReader(r, start, vng.HeaderSize))
if err != nil {
if err == io.EOF {
return objects, nil
}
return nil, err
}
offset := int64(vng.HeaderSize) + int64(header.MetaSize+header.DataSize)
objects = append(objects, object{start, offset})
start += offset
}
}

0 comments on commit 85b4740

Please sign in to comment.