From a32651f7e2f3281fb4ef7f0327e5863af72e9a6d Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Fri, 13 Dec 2024 09:08:22 -0800 Subject: [PATCH] dev vector query: support concatenated csup files (#5526) dev vector query: fix CSUP reader --- cmd/super/dev/vector/query/command.go | 18 ++++++++---------- compiler/package.go | 5 ++--- ztest/ztest.go | 18 ++++++++---------- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/cmd/super/dev/vector/query/command.go b/cmd/super/dev/vector/query/command.go index 94b02287f9..fbdf9b9805 100644 --- a/cmd/super/dev/vector/query/command.go +++ b/cmd/super/dev/vector/query/command.go @@ -3,6 +3,7 @@ package query import ( "errors" "flag" + "os" "github.com/brimdata/super" "github.com/brimdata/super/cli/outputflags" @@ -11,10 +12,9 @@ import ( "github.com/brimdata/super/pkg/charm" "github.com/brimdata/super/pkg/storage" "github.com/brimdata/super/runtime" - "github.com/brimdata/super/runtime/vcache" "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio" - "github.com/segmentio/ksuid" + "github.com/brimdata/super/zio/vngio" ) var spec = &charm.Spec{ @@ -57,23 +57,21 @@ func (c *Command) Run(args []string) error { return errors.New("usage: query followed by a single path argument of VNG data") } text := args[0] - uri, err := storage.ParseURI(args[1]) + f, err := os.Open(args[1]) if err != nil { return err } - local := storage.NewLocalEngine() - cache := vcache.NewCache(local) - object, err := cache.Fetch(ctx, uri, ksuid.Nil) + rctx := runtime.NewContext(ctx, super.NewContext()) + r, err := vngio.NewVectorReader(ctx, rctx.Zctx, f, nil) if err != nil { return err } - defer object.Close() - rctx := runtime.NewContext(ctx, super.NewContext()) - puller, err := compiler.VectorCompile(rctx, text, object) + defer r.Pull(true) + puller, err := compiler.VectorCompile(rctx, text, r) if err != nil { return err } - writer, err := c.outputFlags.Open(ctx, local) + writer, err := c.outputFlags.Open(ctx, storage.NewLocalEngine()) if err != nil { return err } diff --git a/compiler/package.go b/compiler/package.go index 11e88b1223..d053e63fca 100644 --- a/compiler/package.go +++ b/compiler/package.go @@ -17,7 +17,7 @@ import ( "github.com/brimdata/super/runtime/sam/op" "github.com/brimdata/super/runtime/vam" vamop "github.com/brimdata/super/runtime/vam/op" - "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio" ) @@ -118,7 +118,7 @@ func bundleOutputs(rctx *runtime.Context, outputs map[string]zbuf.Puller) zbuf.P // where the entire query is vectorizable. It does not call optimize // nor does it compute the demand of the query to prune the projection // from the vcache. -func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (zbuf.Puller, error) { +func VectorCompile(rctx *runtime.Context, query string, puller vector.Puller) (zbuf.Puller, error) { ast, err := parser.ParseQuery(query) if err != nil { return nil, err @@ -132,7 +132,6 @@ func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) ( panic("DAG assumptions violated") } entry = entry[1:] - puller := vam.NewVectorProjection(rctx.Zctx, object, nil) //XXX project all builder := kernel.NewBuilder(rctx, env) outputs, err := builder.BuildWithPuller(entry, puller) if err != nil { diff --git a/ztest/ztest.go b/ztest/ztest.go index aea8a39c77..b420e06c6b 100644 --- a/ztest/ztest.go +++ b/ztest/ztest.go @@ -139,11 +139,12 @@ import ( "github.com/brimdata/super/compiler" "github.com/brimdata/super/compiler/parser" "github.com/brimdata/super/runtime" - "github.com/brimdata/super/runtime/vcache" + "github.com/brimdata/super/vector" "github.com/brimdata/super/vng" "github.com/brimdata/super/zbuf" "github.com/brimdata/super/zio" "github.com/brimdata/super/zio/anyio" + "github.com/brimdata/super/zio/vngio" "github.com/brimdata/super/zio/zsonio" "github.com/pmezard/go-difflib/difflib" "gopkg.in/yaml.v3" @@ -554,12 +555,12 @@ func runvec(zedProgram string, input string, outputFlags []string) (string, erro if err := flags.Parse(outputFlags); err != nil { return "", err } - object, err := createVCacheObject(input) + r, err := createVPuller(input) if err != nil { return "", err } - defer object.Close() - puller, err := compiler.VectorCompile(runtime.DefaultContext(), zedProgram, object) + defer r.Pull(true) + puller, err := compiler.VectorCompile(runtime.DefaultContext(), zedProgram, r) if err != nil { return "", err } @@ -575,16 +576,13 @@ func runvec(zedProgram string, input string, outputFlags []string) (string, erro return outbuf.String(), err } -func createVCacheObject(input string) (*vcache.Object, error) { +func createVPuller(input string) (vector.Puller, error) { var buf bytes.Buffer w := vng.NewWriter(zio.NopCloser(&buf)) r := zsonio.NewReader(super.NewContext(), strings.NewReader(input)) if err := errors.Join(zio.Copy(w, r), w.Close()); err != nil { return nil, err } - o, err := vng.NewObject(bytes.NewReader(buf.Bytes())) - if err != nil { - return nil, err - } - return vcache.NewObjectFromVNG(o), nil + rctx := runtime.DefaultContext() + return vngio.NewVectorReader(rctx.Context, rctx.Zctx, bytes.NewReader(buf.Bytes()), nil) }