diff --git a/compiler/semantic/op.go b/compiler/semantic/op.go index 17573b9f8d..c89cd132a5 100644 --- a/compiler/semantic/op.go +++ b/compiler/semantic/op.go @@ -237,8 +237,13 @@ func (a *analyzer) formatArg(args ast.FromArgs) string { func (a *analyzer) semFile(name string, args ast.FromArgs) dag.Op { format := a.formatArg(args) - if format == "" && strings.HasSuffix(name, ".parquet") { - format = "parquet" + if format == "" { + switch filepath.Ext(name) { + case ".parquet": + format = "parquet" + case ".csup": + format = "csup" + } } return &dag.FileScan{ Kind: "FileScan", diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index 3c906f6434..c30222ff90 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -18,6 +18,7 @@ import ( "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio/anyio" "github.com/brimdata/super/zio/parquetio" + "github.com/brimdata/super/zio/vngio" "github.com/segmentio/ksuid" ) @@ -124,8 +125,8 @@ func (c *closePuller) Pull(done bool) (zbuf.Batch, error) { } func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, format string, fields []field.Path) (vector.Puller, error) { - if format != "parquet" { - return nil, fmt.Errorf("vector runtime supports only Parquet files") + if format != "parquet" && format != "csup" { + return nil, fmt.Errorf("vector runtime supports only Parquet and CSUP files") } if path == "-" { path = "stdio:stdin" @@ -138,7 +139,12 @@ func (e *Environment) VectorOpen(ctx context.Context, zctx *super.Context, path, if err != nil { return nil, err } - puller, err := parquetio.NewVectorReader(ctx, zctx, r, fields) + var puller vector.Puller + if format == "parquet" { + puller, err = parquetio.NewVectorReader(ctx, zctx, r, fields) + } else { + puller, err = vngio.NewVectorReader(ctx, zctx, r, fields) + } if err != nil { r.Close() return nil, err diff --git a/zio/vngio/vectorreader.go b/zio/vngio/vectorreader.go new file mode 100644 index 0000000000..8210f383fa --- /dev/null +++ b/zio/vngio/vectorreader.go @@ -0,0 +1,75 @@ +package vngio + +import ( + "context" + "errors" + "io" + "sync/atomic" + + "github.com/brimdata/super" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vng" +) + +type VectorReader struct { + ctx context.Context + zctx *super.Context + + activeReaders *atomic.Int64 + nextObject *atomic.Int64 + objects []*vng.Object + projection vcache.Path + readerAt io.ReaderAt +} + +func NewVectorReader(ctx context.Context, zctx *super.Context, r io.Reader, fields []field.Path) (*VectorReader, error) { + ra, ok := r.(io.ReaderAt) + if !ok { + return nil, errors.New("Super Columnar requires a seekable input") + } + objects, err := readObjects(ra) + if err != nil { + return nil, err + } + return &VectorReader{ + ctx: ctx, + zctx: zctx, + activeReaders: &atomic.Int64{}, + nextObject: &atomic.Int64{}, + objects: objects, + projection: vcache.NewProjection(fields), + readerAt: ra, + }, nil +} + +func (v *VectorReader) NewConcurrentPuller() vector.Puller { + v.activeReaders.Add(1) + return v +} + +func (v *VectorReader) Pull(done bool) (vector.Any, error) { + if done { + return nil, v.close() + } + if err := v.ctx.Err(); err != nil { + v.close() + return nil, err + } + n := int(v.nextObject.Add(1) - 1) + if n >= len(v.objects) { + return nil, v.close() + } + o := v.objects[n] + return vcache.NewObjectFromVNG(o).Fetch(v.zctx, v.projection) +} + +func (v *VectorReader) close() error { + if v.activeReaders.Add(-1) <= 0 { + if closer, ok := v.readerAt.(io.Closer); ok { + return closer.Close() // coffee is for closers + } + } + return nil +}