diff --git a/compiler/semantic/op.go b/compiler/semantic/op.go index 17573b9f8d..c89cd132a5 100644 --- a/compiler/semantic/op.go +++ b/compiler/semantic/op.go @@ -237,8 +237,13 @@ func (a *analyzer) formatArg(args ast.FromArgs) string { func (a *analyzer) semFile(name string, args ast.FromArgs) dag.Op { format := a.formatArg(args) - if format == "" && strings.HasSuffix(name, ".parquet") { - format = "parquet" + if format == "" { + switch filepath.Ext(name) { + case ".parquet": + format = "parquet" + case ".csup": + format = "csup" + } } return &dag.FileScan{ Kind: "FileScan", diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index 3c906f6434..c30222ff90 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -18,6 +18,7 @@ import ( "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio/anyio" "github.com/brimdata/super/zio/parquetio" + "github.com/brimdata/super/zio/vngio" "github.com/segmentio/ksuid" ) @@ -124,8 +125,8 @@ func (c *closePuller) Pull(done bool) (zbuf.Batch, error) { } func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, format string, fields []field.Path) (vector.Puller, error) { - if format != "parquet" { - return nil, fmt.Errorf("vector runtime supports only Parquet files") + if format != "parquet" && format != "csup" { + return nil, fmt.Errorf("vector runtime supports only Parquet and CSUP files") } if path == "-" { path = "stdio:stdin" @@ -138,7 +139,12 @@ func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, if err != nil { return nil, err } - puller, err := parquetio.NewVectorReader(ctx, zctx, r, fields) + var puller vector.Puller + if format == "parquet" { + puller, err = parquetio.NewVectorReader(ctx, zctx, r, fields) + } else { + puller, err = vngio.NewVectorReader(ctx, zctx, r, fields) + } if err != nil { r.Close() return nil, err diff --git a/vng/object.go b/vng/object.go index 735ee52a17..27ed7cc869 100644 --- a/vng/object.go +++ b/vng/object.go @@ -75,6 +75,10 @@ func (o *Object) NewReader(zctx *super.Context) (zio.Reader, error) { return NewZedReader(zctx, o.meta, o.readerAt) } +func (o *Object) Size() uint64 { + return HeaderSize + o.header.MetaSize + o.header.DataSize +} + func readMetadata(r io.Reader) (Metadata, error) { zctx := super.NewContext() zr := zngio.NewReader(zctx, r) diff --git a/zio/vngio/reader.go b/zio/vngio/reader.go index c94667e827..2bfa87e448 100644 --- a/zio/vngio/reader.go +++ b/zio/vngio/reader.go @@ -3,6 +3,7 @@ package vngio import ( "errors" "io" + "math" "github.com/brimdata/super" "github.com/brimdata/super/pkg/field" @@ -12,7 +13,7 @@ import ( type reader struct { zctx *super.Context - objects []object + objects []*vng.Object n int readerAt io.ReaderAt reader zio.Reader @@ -40,14 +41,10 @@ again: if r.n >= len(r.objects) { return nil, nil } - meta := r.objects[r.n] + o := r.objects[r.n] r.n++ - o, err := vng.NewObject(io.NewSectionReader(r.readerAt, meta.start, meta.offset)) - if err != nil { - return nil, err - } - r.reader, err = o.NewReader(r.zctx) - if err != nil { + var err error + if r.reader, err = o.NewReader(r.zctx); err != nil { return nil, err } } @@ -59,23 +56,20 @@ again: return v, err } -type object struct { - start, offset int64 -} - -func readObjects(r io.ReaderAt) ([]object, error) { - var objects []object +func readObjects(r io.ReaderAt) ([]*vng.Object, error) { + var objects []*vng.Object var start int64 for { - header, err := vng.ReadHeader(io.NewSectionReader(r, start, vng.HeaderSize)) + // NewObject puts the right end to the passed in SectionReader and since + // the end is unkown just pass MaxInt64. + o, err := vng.NewObject(io.NewSectionReader(r, start, math.MaxInt64)) if err != nil { if err == io.EOF && len(objects) > 0 { return objects, nil } return nil, err } - offset := int64(vng.HeaderSize) + int64(header.MetaSize+header.DataSize) - objects = append(objects, object{start, offset}) - start += offset + objects = append(objects, o) + start += int64(o.Size()) } } diff --git a/zio/vngio/vectorreader.go b/zio/vngio/vectorreader.go new file mode 100644 index 0000000000..3dce6f79e7 --- /dev/null +++ b/zio/vngio/vectorreader.go @@ -0,0 +1,73 @@ +package vngio + +import ( + "context" + "errors" + "io" + "sync/atomic" + + "github.com/brimdata/super" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vng" +) + +type VectorReader struct { + ctx context.Context + zctx *super.Context + + objects []object + nextObject *atomic.Int64 + projection vcache.Path + readerAt io.ReaderAt +} + +func NewVectorReader(ctx context.Context, zctx *super.Context, r io.Reader, fields []field.Path) (*VectorReader, error) { + ra, ok := r.(io.ReaderAt) + if !ok { + return nil, errors.New("Super Columnar requires a seekable input") + } + objects, err := readObjects(ra) + if err != nil { + return nil, err + } + return &VectorReader{ + ctx: ctx, + zctx: zctx, + objects: objects, + nextObject: &atomic.Int64{}, + projection: vcache.NewProjection(fields), + readerAt: ra, + }, nil +} + +func (v *VectorReader) NewConcurrentPuller() vector.Puller { + return &VectorReader{ + ctx: v.ctx, + zctx: v.zctx, + objects: v.objects, + nextObject: v.nextObject, + projection: v.projection, + readerAt: v.readerAt, + } +} + +func (v *VectorReader) Pull(done bool) (vector.Any, error) { + if done { + return nil, nil + } + if err := v.ctx.Err(); err != nil { + return nil, err + } + n := int(v.nextObject.Add(1) - 1) + if n >= len(v.objects) { + return nil, nil + } + meta := v.objects[n] + o, err := vng.NewObject(io.NewSectionReader(v.readerAt, meta.start, meta.offset)) + if err != nil { + return nil, err + } + return vcache.NewObjectFromVNG(o).Fetch(v.zctx, v.projection) +}