Skip to content

Commit

Permalink
dev vector query: fix CSUP reader
Browse files Browse the repository at this point in the history
  • Loading branch information
mattnibs committed Dec 11, 2024
1 parent 6e7d041 commit 69a6333
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
18 changes: 8 additions & 10 deletions cmd/super/dev/vector/query/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"errors"
"flag"
"os"

"github.com/brimdata/super"
"github.com/brimdata/super/cli/outputflags"
Expand All @@ -11,10 +12,9 @@ import (
"github.com/brimdata/super/pkg/charm"
"github.com/brimdata/super/pkg/storage"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/vcache"
"github.com/brimdata/super/zbuf"
"github.com/brimdata/super/zio"
"github.com/segmentio/ksuid"
"github.com/brimdata/super/zio/vngio"
)

var spec = &charm.Spec{
Expand Down Expand Up @@ -57,23 +57,21 @@ func (c *Command) Run(args []string) error {
return errors.New("usage: query followed by a single path argument of VNG data")
}
text := args[0]
uri, err := storage.ParseURI(args[1])
f, err := os.Open(args[1])
if err != nil {
return err
}
local := storage.NewLocalEngine()
cache := vcache.NewCache(local)
object, err := cache.Fetch(ctx, uri, ksuid.Nil)
rctx := runtime.NewContext(ctx, super.NewContext())
r, err := vngio.NewVectorReader(ctx, rctx.Zctx, f, nil)
if err != nil {
return err
}
defer object.Close()
rctx := runtime.NewContext(ctx, super.NewContext())
puller, err := compiler.VectorCompile(rctx, text, object)
defer r.Pull(true)
puller, err := compiler.VectorCompile(rctx, text, r)
if err != nil {
return err
}
writer, err := c.outputFlags.Open(ctx, local)
writer, err := c.outputFlags.Open(ctx, storage.NewLocalEngine())
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions compiler/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/brimdata/super/runtime/sam/op"
"github.com/brimdata/super/runtime/vam"
vamop "github.com/brimdata/super/runtime/vam/op"
"github.com/brimdata/super/runtime/vcache"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/zbuf"
"github.com/brimdata/super/zio"
)
Expand Down Expand Up @@ -118,7 +118,7 @@ func bundleOutputs(rctx *runtime.Context, outputs map[string]zbuf.Puller) zbuf.P
// where the entire query is vectorizable. It does not call optimize
// nor does it compute the demand of the query to prune the projection
// from the vcache.
func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (zbuf.Puller, error) {
func VectorCompile(rctx *runtime.Context, query string, puller vector.Puller) (zbuf.Puller, error) {
ast, err := parser.ParseQuery(query)
if err != nil {
return nil, err
Expand All @@ -132,7 +132,6 @@ func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (
panic("DAG assumptions violated")
}
entry = entry[1:]
puller := vam.NewVectorProjection(rctx.Zctx, object, nil) //XXX project all
builder := kernel.NewBuilder(rctx, env)
outputs, err := builder.BuildWithPuller(entry, puller)
if err != nil {
Expand Down

0 comments on commit 69a6333

Please sign in to comment.