From 37c02f0565e214262240693c9ba8fc853331438f Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Mon, 16 Oct 2023 17:12:59 -0700 Subject: [PATCH] zq: Call user-defined op with source The change allows zq users to start a query with a call to a user-defined op that contains a source (file) with no additional input sources. Previously attempting to do this would result in error "redundant inputs". --- cli/queryflags/flags.go | 41 ++++----------- cmd/zq/ztests/call-user-op-with-src.yaml | 16 ++++++ compiler/job.go | 2 +- compiler/semantic/analyzer.go | 66 +++++++++++++++++------- 4 files changed, 73 insertions(+), 52 deletions(-) create mode 100644 cmd/zq/ztests/call-user-op-with-src.yaml diff --git a/cli/queryflags/flags.go b/cli/queryflags/flags.go index 5feee91447..8eb8f485be 100644 --- a/cli/queryflags/flags.go +++ b/cli/queryflags/flags.go @@ -1,6 +1,7 @@ package queryflags import ( + "context" "flag" "fmt" "net/url" @@ -9,8 +10,8 @@ import ( "github.com/brimdata/zed/cli" "github.com/brimdata/zed/compiler" "github.com/brimdata/zed/compiler/ast" + "github.com/brimdata/zed/compiler/semantic" "github.com/brimdata/zed/zbuf" - "github.com/brimdata/zed/zfmt" "github.com/brimdata/zed/zson" "golang.org/x/exp/slices" ) @@ -36,11 +37,13 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool, // and appears to start with a from or yield operator. // Otherwise, consider it a file. if query, err := compiler.Parse(src, f.Includes...); err == nil { - if isFrom(query) { - return nil, query, false, nil - } - if isYield(query) { - return nil, query, true, nil + if s, err := semantic.Analyze(context.Background(), query, nil, nil); err == nil { + if semantic.HasSource(s) { + return nil, query, false, nil + } + if semantic.StartsWithYield(s) { + return nil, query, true, nil + } } } return nil, nil, false, fmt.Errorf("no such file: %s", src) @@ -53,32 +56,6 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool, return paths, query, false, nil } -func isFrom(seq ast.Seq) bool { - if len(seq) > 0 { - switch op := seq[0].(type) { - case *ast.From: - return true - case *ast.Scope: - return isFrom(op.Body) - } - } - return false -} - -func isYield(seq ast.Seq) bool { - if len(seq) > 0 { - switch op := seq[0].(type) { - case *ast.Yield: - return true - case *ast.Scope: - return isYield(op.Body) - case *ast.OpExpr: - return !zfmt.IsSearch(op.Expr) && !zfmt.IsBool(op.Expr) - } - } - return false -} - func isURLWithKnownScheme(path string, schemes ...string) bool { u, err := url.Parse(path) if err != nil { diff --git a/cmd/zq/ztests/call-user-op-with-src.yaml b/cmd/zq/ztests/call-user-op-with-src.yaml new file mode 100644 index 0000000000..fe4119c694 --- /dev/null +++ b/cmd/zq/ztests/call-user-op-with-src.yaml @@ -0,0 +1,16 @@ +script: | + zq -z -I countfile.zed 'countfile()' + +inputs: + - name: countfile.zed + data: | + op countfile(): ( + file test.zson | count() + ) + - name: test.zson + data: '{} {} {} {}' + +outputs: + - name: stdout + data: | + 4(uint64) diff --git a/compiler/job.go b/compiler/job.go index 98342bea09..ac23da59f1 100644 --- a/compiler/job.go +++ b/compiler/job.go @@ -40,7 +40,7 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm if len(seq) == 0 { return nil, errors.New("internal error: AST seq cannot be empty") } - entry, err := semantic.Analyze(octx.Context, seq, src, head) + entry, err := semantic.AnalyzeAddSource(octx.Context, seq, src, head) if err != nil { return nil, err } diff --git a/compiler/semantic/analyzer.go b/compiler/semantic/analyzer.go index 66e74adf81..ee2c46ae46 100644 --- a/compiler/semantic/analyzer.go +++ b/compiler/semantic/analyzer.go @@ -20,12 +20,21 @@ func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakepa if err != nil { return nil, err } - op, err := a.buildFrom(s[0]) + return s, nil +} + +// AnalyzeAddSource is the same as Analyze but it adds a default source if the +// DAG does not have one. +func AnalyzeAddSource(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) { + a := newAnalyzer(ctx, source, head) + s, err := a.semSeq(seq) if err != nil { return nil, err } - if op != nil { - s.Prepend(op) + if !HasSource(s) { + if err = a.addDefaultSource(&s); err != nil { + return nil, err + } } return s, nil } @@ -49,26 +58,26 @@ func newAnalyzer(ctx context.Context, source *data.Source, head *lakeparse.Commi } } -func (a *analyzer) enterScope() { - a.scope = NewScope(a.scope) -} - -func (a *analyzer) exitScope() { - a.scope = a.scope.parent -} - -func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) { - switch op := op.(type) { +func HasSource(seq dag.Seq) bool { + switch op := seq[0].(type) { case *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.DeleteScan: - return nil, nil + return true case *dag.Fork: - return a.buildFrom(op.Paths[0][0]) + return HasSource(op.Paths[0]) case *dag.Scope: - return a.buildFrom(op.Body[0]) + return HasSource(op.Body) + } + return false +} + +func (a *analyzer) addDefaultSource(seq *dag.Seq) error { + if HasSource(*seq) { + return nil } // No from so add a source. if a.head == nil { - return &kernel.Reader{}, nil + seq.Prepend(&kernel.Reader{}) + return nil } pool := &ast.Pool{ Kind: "Pool", @@ -81,9 +90,28 @@ func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) { } ops, err := a.semPool(pool) if err != nil { - return nil, err + return err + } + seq.Prepend(ops[0]) + return nil +} + +func StartsWithYield(seq dag.Seq) bool { + switch op := seq[0].(type) { + case *dag.Yield: + return true + case *dag.Scope: + return StartsWithYield(op.Body) } - return ops[0], nil + return false +} + +func (a *analyzer) enterScope() { + a.scope = NewScope(a.scope) +} + +func (a *analyzer) exitScope() { + a.scope = a.scope.parent } type opDecl struct {