diff --git a/compiler/semantic/op.go b/compiler/semantic/op.go index 17573b9f8d..c89cd132a5 100644 --- a/compiler/semantic/op.go +++ b/compiler/semantic/op.go @@ -237,8 +237,13 @@ func (a *analyzer) formatArg(args ast.FromArgs) string { func (a *analyzer) semFile(name string, args ast.FromArgs) dag.Op { format := a.formatArg(args) - if format == "" && strings.HasSuffix(name, ".parquet") { - format = "parquet" + if format == "" { + switch filepath.Ext(name) { + case ".parquet": + format = "parquet" + case ".csup": + format = "csup" + } } return &dag.FileScan{ Kind: "FileScan", diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index 3c906f6434..c30222ff90 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -18,6 +18,7 @@ import ( "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio/anyio" "github.com/brimdata/super/zio/parquetio" + "github.com/brimdata/super/zio/vngio" "github.com/segmentio/ksuid" ) @@ -124,8 +125,8 @@ func (c *closePuller) Pull(done bool) (zbuf.Batch, error) { } func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, format string, fields []field.Path) (vector.Puller, error) { - if format != "parquet" { - return nil, fmt.Errorf("vector runtime supports only Parquet files") + if format != "parquet" && format != "csup" { + return nil, fmt.Errorf("vector runtime supports only Parquet and CSUP files") } if path == "-" { path = "stdio:stdin" @@ -138,7 +139,12 @@ func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, if err != nil { return nil, err } - puller, err := parquetio.NewVectorReader(ctx, zctx, r, fields) + var puller vector.Puller + if format == "parquet" { + puller, err = parquetio.NewVectorReader(ctx, zctx, r, fields) + } else { + puller, err = vngio.NewVectorReader(ctx, zctx, r, fields) + } if err != nil { r.Close() return nil, err diff --git a/zio/vngio/vectorreader.go b/zio/vngio/vectorreader.go new file mode 100644 index 0000000000..3dce6f79e7 --- /dev/null +++ b/zio/vngio/vectorreader.go @@ -0,0 +1,73 @@ +package vngio + +import ( + "context" + "errors" + "io" + "sync/atomic" + + "github.com/brimdata/super" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vng" +) + +type VectorReader struct { + ctx context.Context + zctx *super.Context + + objects []object + nextObject *atomic.Int64 + projection vcache.Path + readerAt io.ReaderAt +} + +func NewVectorReader(ctx context.Context, zctx *super.Context, r io.Reader, fields []field.Path) (*VectorReader, error) { + ra, ok := r.(io.ReaderAt) + if !ok { + return nil, errors.New("Super Columnar requires a seekable input") + } + objects, err := readObjects(ra) + if err != nil { + return nil, err + } + return &VectorReader{ + ctx: ctx, + zctx: zctx, + objects: objects, + nextObject: &atomic.Int64{}, + projection: vcache.NewProjection(fields), + readerAt: ra, + }, nil +} + +func (v *VectorReader) NewConcurrentPuller() vector.Puller { + return &VectorReader{ + ctx: v.ctx, + zctx: v.zctx, + objects: v.objects, + nextObject: v.nextObject, + projection: v.projection, + readerAt: v.readerAt, + } +} + +func (v *VectorReader) Pull(done bool) (vector.Any, error) { + if done { + return nil, nil + } + if err := v.ctx.Err(); err != nil { + return nil, err + } + n := int(v.nextObject.Add(1) - 1) + if n >= len(v.objects) { + return nil, nil + } + meta := v.objects[n] + o, err := vng.NewObject(io.NewSectionReader(v.readerAt, meta.start, meta.offset)) + if err != nil { + return nil, err + } + return vcache.NewObjectFromVNG(o).Fetch(v.zctx, v.projection) +}