Skip to content

Commit

Permalink
Thread demand through anyio.NewReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jamii committed Nov 7, 2023
1 parent 1552baa commit 9566145
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 41 deletions.
3 changes: 2 additions & 1 deletion cli/inputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/auto"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/anyio"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (f *Flags) Open(ctx context.Context, zctx *zed.Context, engine storage.Engi
if path == "-" {
path = "stdio:stdin"
}
file, err := anyio.Open(ctx, zctx, engine, path, f.ReaderOpts)
file, err := anyio.Open(ctx, zctx, engine, path, demand.All(), f.ReaderOpts)
if err != nil {
err = fmt.Errorf("%s: %w", path, err)
if stopOnErr {
Expand Down
3 changes: 2 additions & 1 deletion cmd/zed/dev/indexfile/create/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/inputflags"
"github.com/brimdata/zed/cmd/zed/dev/indexfile"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/index"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/charm"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (c *Command) Run(args []string) error {
}
zctx := zed.NewContext()
local := storage.NewLocalEngine()
file, err := anyio.Open(ctx, zctx, local, path, c.inputFlags.Options())
file, err := anyio.Open(ctx, zctx, local, path, demand.All(), c.inputFlags.Options())
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions compiler/data/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/storage"
Expand Down Expand Up @@ -57,11 +58,11 @@ func (s *Source) SortKey(ctx context.Context, src dag.Op) order.SortKey {
return order.Nil
}

func (s *Source) Open(ctx context.Context, zctx *zed.Context, path, format string, pushdown zbuf.Filter) (zbuf.Puller, error) {
func (s *Source) Open(ctx context.Context, zctx *zed.Context, path, format string, pushdown zbuf.Filter, demandOut demand.Demand) (zbuf.Puller, error) {
if path == "-" {
path = "stdio:stdin"
}
file, err := anyio.Open(ctx, zctx, s.engine, path, anyio.ReaderOpts{Format: format})
file, err := anyio.Open(ctx, zctx, s.engine, path, demandOut, anyio.ReaderOpts{Format: format})
if err != nil {
return nil, fmt.Errorf("%s: %w", path, err)
}
Expand All @@ -74,7 +75,7 @@ func (s *Source) Open(ctx context.Context, zctx *zed.Context, path, format strin
return &closePuller{sn, file}, nil
}

func (s *Source) OpenHTTP(ctx context.Context, zctx *zed.Context, url, format, method string, headers http.Header, body io.Reader) (zbuf.Puller, error) {
func (s *Source) OpenHTTP(ctx context.Context, zctx *zed.Context, url, format, method string, headers http.Header, body io.Reader, demandOut demand.Demand) (zbuf.Puller, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
Expand All @@ -84,7 +85,7 @@ func (s *Source) OpenHTTP(ctx context.Context, zctx *zed.Context, url, format, m
if err != nil {
return nil, err
}
file, err := anyio.NewFile(zctx, resp.Body, url, anyio.ReaderOpts{Format: format})
file, err := anyio.NewFile(zctx, resp.Body, url, demandOut, anyio.ReaderOpts{Format: format})
if err != nil {
resp.Body.Close()
return nil, fmt.Errorf("%s: %w", url, err)
Expand Down
5 changes: 3 additions & 2 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
Expand Down Expand Up @@ -237,9 +238,9 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return meta.NewLakeMetaScanner(b.octx.Context, b.octx.Zctx, b.source.Lake(), v.Meta)
case *dag.HTTPScan:
body := strings.NewReader(v.Body)
return b.source.OpenHTTP(b.octx.Context, b.octx.Zctx, v.URL, v.Format, v.Method, v.Headers, body)
return b.source.OpenHTTP(b.octx.Context, b.octx.Zctx, v.URL, v.Format, v.Method, v.Headers, body, demand.All())
case *dag.FileScan:
return b.source.Open(b.octx.Context, b.octx.Zctx, v.Path, v.Format, b.PushdownOf(v.Filter))
return b.source.Open(b.octx.Context, b.octx.Zctx, v.Path, v.Format, b.PushdownOf(v.Filter), demand.All())
case *Reader:
pushdown := b.PushdownOf(v.Filter)
if len(v.Readers) == 1 {
Expand Down
8 changes: 4 additions & 4 deletions fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func ReadZNG(bs []byte) ([]zed.Value, error) {
return a.Values(), nil
}

func ReadVNG(bs []byte) ([]zed.Value, error) {
func ReadVNG(bs []byte, demandOut demand.Demand) ([]zed.Value, error) {
bytesReader := bytes.NewReader(bs)
context := zed.NewContext()
reader, err := vngio.NewReader(context, bytesReader)
reader, err := vngio.NewReader(context, bytesReader, demandOut)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,13 +81,13 @@ func RunQueryZNG(t *testing.T, buf *bytes.Buffer, querySource string) []zed.Valu

func RunQueryVNG(t *testing.T, buf *bytes.Buffer, querySource string) []zed.Value {
zctx := zed.NewContext()
reader, err := vngio.NewReader(zctx, bytes.NewReader(buf.Bytes()))
reader, err := vngio.NewReader(zctx, bytes.NewReader(buf.Bytes()), demand.All())
require.NoError(t, err)
readers := []zio.Reader{reader}
defer zio.CloseReaders(readers)
return RunQuery(t, zctx, readers, querySource, func(demandIn demand.Demand) {
if reader, ok := readers[0].(*vngio.Reader); ok {
reader.Opts.Demand = demandIn
reader.Demand = demandIn
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
lakeapi "github.com/brimdata/zed/lake/api"
"github.com/brimdata/zed/lake/commits"
Expand Down Expand Up @@ -422,7 +423,7 @@ func handleBranchLoad(c *Core, w *ResponseWriter, r *Request) {
ZNG: zngio.ReaderOpts{Validate: true},
}
zctx := zed.NewContext()
zrc, err := anyio.NewReaderWithOpts(zctx, reader, opts)
zrc, err := anyio.NewReaderWithOpts(zctx, reader, demand.All(), opts)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
Expand Down
3 changes: 2 additions & 1 deletion service/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/api"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/lake/branches"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (r *Request) Unmarshal(w *ResponseWriter, body interface{}, templates ...in
if !ok {
return false
}
zrc, err := anyio.NewReaderWithOpts(zed.NewContext(), r.Body, anyio.ReaderOpts{Format: format})
zrc, err := anyio.NewReaderWithOpts(zed.NewContext(), r.Body, demand.All(), anyio.ReaderOpts{Format: format})
if err != nil {
w.Error(srverr.ErrInvalid(err))
return false
Expand Down
9 changes: 5 additions & 4 deletions zio/anyio/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zbuf"
)

// Open uses engine to open path for reading. path is a local file path or a
// URI whose scheme is understood by engine.
func Open(ctx context.Context, zctx *zed.Context, engine storage.Engine, path string, opts ReaderOpts) (*zbuf.File, error) {
func Open(ctx context.Context, zctx *zed.Context, engine storage.Engine, path string, demandOut demand.Demand, opts ReaderOpts) (*zbuf.File, error) {
uri, err := storage.ParseURI(path)
if err != nil {
return nil, err
Expand All @@ -27,7 +28,7 @@ func Open(ctx context.Context, zctx *zed.Context, engine storage.Engine, path st
return
}
// NewFile reads from sr, which might block.
zf, err = NewFile(zctx, sr, path, opts)
zf, err = NewFile(zctx, sr, path, demandOut, opts)
if err != nil {
sr.Close()
}
Expand All @@ -40,12 +41,12 @@ func Open(ctx context.Context, zctx *zed.Context, engine storage.Engine, path st
}
}

func NewFile(zctx *zed.Context, rc io.ReadCloser, path string, opts ReaderOpts) (*zbuf.File, error) {
func NewFile(zctx *zed.Context, rc io.ReadCloser, path string, demandOut demand.Demand, opts ReaderOpts) (*zbuf.File, error) {
r, err := GzipReader(rc)
if err != nil {
return nil, err
}
zr, err := NewReaderWithOpts(zctx, r, opts)
zr, err := NewReaderWithOpts(zctx, r, demandOut, opts)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions zio/anyio/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/arrowio"
"github.com/brimdata/zed/zio/csvio"
Expand All @@ -18,7 +19,7 @@ import (
"github.com/brimdata/zed/zio/zsonio"
)

func lookupReader(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.ReadCloser, error) {
func lookupReader(zctx *zed.Context, r io.Reader, demandOut demand.Demand, opts ReaderOpts) (zio.ReadCloser, error) {
switch opts.Format {
case "arrows":
return arrowio.NewReader(zctx, r)
Expand All @@ -35,7 +36,7 @@ func lookupReader(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.ReadClos
}
return zio.NopReadCloser(zr), nil
case "vng":
zr, err := vngio.NewReader(zctx, r)
zr, err := vngio.NewReader(zctx, r, demandOut)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions zio/anyio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/arrowio"
"github.com/brimdata/zed/zio/csvio"
Expand All @@ -27,13 +28,13 @@ type ReaderOpts struct {
ZNG zngio.ReaderOpts
}

func NewReader(zctx *zed.Context, r io.Reader) (zio.ReadCloser, error) {
return NewReaderWithOpts(zctx, r, ReaderOpts{})
func NewReader(zctx *zed.Context, r io.Reader, demandOut demand.Demand) (zio.ReadCloser, error) {
return NewReaderWithOpts(zctx, r, demandOut, ReaderOpts{})
}

func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.ReadCloser, error) {
func NewReaderWithOpts(zctx *zed.Context, r io.Reader, demandOut demand.Demand, opts ReaderOpts) (zio.ReadCloser, error) {
if opts.Format != "" && opts.Format != "auto" {
return lookupReader(zctx, r, opts)
return lookupReader(zctx, r, demandOut, opts)
}

var parquetErr, vngErr error
Expand All @@ -47,7 +48,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
if _, err := rs.Seek(n, io.SeekStart); err != nil {
return nil, err
}
zr, vngErr = vngio.NewReader(zctx, rs)
zr, vngErr = vngio.NewReader(zctx, rs, demandOut)
if vngErr == nil {
return zio.NopReadCloser(zr), nil
}
Expand Down
22 changes: 7 additions & 15 deletions zio/vngio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,15 @@ import (
"github.com/brimdata/zed/zio"
)

type ReaderOpts struct {
Demand demand.Demand
}

type Reader struct {
reader *vng.Reader
// TODO Opts should not be public but currently needed for testing.
Opts ReaderOpts
// TODO Demand should not be public but currently needed for testing.
Demand demand.Demand
// Initially nil
materializer *vector.Materializer
}

func NewReader(zctx *zed.Context, r io.Reader) (zio.Reader, error) {
return NewReaderWithOpts(zctx, r, ReaderOpts{})
}

func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Reader, error) {
func NewReader(zctx *zed.Context, r io.Reader, demandOut demand.Demand) (zio.Reader, error) {
s, ok := r.(io.Seeker)
if !ok {
return nil, errors.New("VNG must be used with a seekable input")
Expand All @@ -46,16 +38,16 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
return nil, err
}
if os.Getenv("ZED_USE_VECTOR") != "" {
if opts.Demand == nil {
opts.Demand = demand.All()
if demandOut == nil {
demandOut = demand.All()
}
vngReader, err := vng.NewReader(o)
if err != nil {
return nil, err
}
reader := &Reader{
reader: vngReader,
Opts: opts,
Demand: demandOut,
materializer: nil,
}
return reader, nil
Expand All @@ -66,7 +58,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea

func (r *Reader) Read() (*zed.Value, error) {
if r.materializer == nil {
vector, err := vector.Read(r.reader, r.Opts.Demand)
vector, err := vector.Read(r.reader, r.Demand)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion ztest/ztest.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ import (
"github.com/brimdata/zed/cli/inputflags"
"github.com/brimdata/zed/cli/outputflags"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/runtime/op"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
Expand Down Expand Up @@ -494,7 +495,7 @@ func runzq(path, zedProgram, input string, outputFlags []string, inputFlags []st
return "", err.Error(), err
}
zctx := zed.NewContext()
zrc, err := anyio.NewReaderWithOpts(zctx, r, inflags.Options())
zrc, err := anyio.NewReaderWithOpts(zctx, r, demand.All(), inflags.Options())
if err != nil {
return "", err.Error(), err
}
Expand Down

0 comments on commit 9566145

Please sign in to comment.