Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

align from-less query semantics with SQL #5418

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/brimdata/super/compiler/srcfiles"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/order"
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/pkg/nano"
Expand Down Expand Up @@ -98,8 +97,7 @@ type EventBranch struct {
}

type QueryRequest struct {
Query string `json:"query"`
Head lakeparse.Commitish `json:"head"`
Query string `json:"query"`
}

type QueryChannelSet struct {
Expand Down
6 changes: 1 addition & 5 deletions api/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/brimdata/super/compiler/srcfiles"
"github.com/brimdata/super/lake"
"github.com/brimdata/super/lake/branches"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/runtime/exec"
"github.com/brimdata/super/zio/zngio"
"github.com/brimdata/super/zson"
Expand Down Expand Up @@ -296,15 +295,12 @@ func (c *Connection) Revert(ctx context.Context, poolID ksuid.KSUID, branchName
//
// As for Connection.Do, if the returned error is nil, the user is expected to
// call Response.Body.Close.
func (c *Connection) Query(ctx context.Context, head *lakeparse.Commitish, src string, filenames ...string) (*Response, error) {
func (c *Connection) Query(ctx context.Context, src string, filenames ...string) (*Response, error) {
files, err := srcfiles.Concat(filenames, src)
if err != nil {
return nil, err
}
body := api.QueryRequest{Query: string(files.Text)}
if head != nil {
body.Head = *head
}
req := c.NewRequest(ctx, http.MethodPost, "/query?ctrl=T", body)
res, err := c.Do(req)
if ae := (*api.Error)(nil); errors.As(err, &ae) && len(ae.CompilationErrors) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions cmd/super/compile/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func (s *Shared) Run(ctx context.Context, args []string, lakeFlags *lakeflags.Fl
}
return s.writeValue(ctx, ast.Parsed())
}
runtime, err := compiler.NewJob(runtime.DefaultContext(), ast, data.NewSource(nil, lk), nil)
runtime, err := compiler.NewJob(runtime.DefaultContext(), ast, data.NewSource(nil, lk), false)
if err != nil {
return err
}
if desc {
description, err := describe.AnalyzeDAG(ctx, runtime.Entry(), data.NewSource(nil, lk), nil)
description, err := describe.AnalyzeDAG(ctx, runtime.Entry(), data.NewSource(nil, lk))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/branch/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Command) list(ctx context.Context, lake api.Interface) error {
if err != nil {
return err
}
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
w.Close()
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/log/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Command) Run(args []string) error {
return err
}
defer w.Close()
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/ls/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
w.Close()
return err
Expand Down
6 changes: 1 addition & 5 deletions cmd/super/db/query/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"

"github.com/brimdata/super/cli/outputflags"
"github.com/brimdata/super/cli/poolflags"
"github.com/brimdata/super/cli/queryflags"
"github.com/brimdata/super/cli/runtimeflags"
"github.com/brimdata/super/cmd/super/db"
Expand Down Expand Up @@ -33,15 +32,13 @@ func init() {
type Command struct {
*db.Command
outputFlags outputflags.Flags
poolFlags poolflags.Flags
queryFlags queryflags.Flags
runtimeFlags runtimeflags.Flags
}

func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*db.Command)}
c.outputFlags.SetFlags(f)
c.poolFlags.SetFlags(f)
c.queryFlags.SetFlags(f)
c.runtimeFlags.SetFlags(f)
return c, nil
Expand All @@ -68,8 +65,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
head, _ := c.poolFlags.HEAD()
query, err := lake.Query(ctx, head, src, c.queryFlags.Includes...)
query, err := lake.Query(ctx, src, c.queryFlags.Includes...)
if err != nil {
w.Close()
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/internal/lakemanage/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type objectIterator struct {

func newObjectIterator(ctx context.Context, lake api.Interface, head *lakeparse.Commitish) (*objectIterator, error) {
query := fmt.Sprintf(iteratorQuery, head.Pool, head.Branch, head.Pool, head.Branch)
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions compiler/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,10 @@ type (
Loc `json:"loc"`
}
Delete struct {
Kind string `json:"kind" unpack:""`
Loc `json:"loc"` // dummy field, not needed except to implement Node
Kind string `json:"kind" unpack:""`
Pool string `json:"pool"`
Branch string `json:"branch"`
Loc `json:"loc"` // dummy field, not needed except to implement Node
}
)

Expand Down
4 changes: 4 additions & 0 deletions compiler/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ type (
Kind string `json:"kind" unpack:""`
Meta string `json:"meta"`
}
NullScan struct {
Kind string `json:"kind" unpack:""`
}
)

var LakeMetas = map[string]struct{}{
Expand Down Expand Up @@ -273,6 +276,7 @@ func (*DeleteScan) OpNode() {}
func (*LakeMetaScan) OpNode() {}
func (*PoolMetaScan) OpNode() {}
func (*CommitMetaScan) OpNode() {}
func (*NullScan) OpNode() {}

func (*Lister) OpNode() {}
func (*Slicer) OpNode() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var unpacker = unpack.New(
MapExpr{},
Merge{},
Mirror{},
NullScan{},
Output{},
Over{},
OverExpr{},
Expand Down
62 changes: 30 additions & 32 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/brimdata/super/compiler/parser"
"github.com/brimdata/super/compiler/semantic"
"github.com/brimdata/super/lake"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/order"
"github.com/brimdata/super/pkg/field"
"github.com/segmentio/ksuid"
Expand All @@ -31,48 +30,46 @@ type (
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
Inferred bool `json:"inferred"`
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
Inferred bool `json:"inferred"`
Kind string `json:"kind"`
URI string `json:"uri"`
}
Null struct {
Kind string `json:"kind"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}
func (*Null) Source() {}

type Channel struct {
Name string `json:"name"`
AggregationKeys field.List `json:"aggregation_keys"`
Sort order.SortKeys `json:"sort"`
}

func Analyze(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
func Analyze(ctx context.Context, query string, src *data.Source) (*Info, error) {
ast, err := parser.ParseQuery(query)
if err != nil {
return nil, err
}
entry, err := semantic.Analyze(ctx, ast, src, head)
entry, err := semantic.Analyze(ctx, ast, src, false)
if err != nil {
return nil, err
}
return AnalyzeDAG(ctx, entry, src, head)
return AnalyzeDAG(ctx, entry, src)
}

func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
srcInferred := !semantic.HasSource(entry)
if err := semantic.AddDefaultSource(ctx, &entry, src, head); err != nil {
return nil, err
}
func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source) (*Info, error) {
var err error
var info Info
if info.Sources, err = describeSources(ctx, src.Lake(), entry[0], srcInferred); err != nil {
if info.Sources, err = describeSources(ctx, src.Lake(), entry[0]); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, src).SortKeys(entry)
Expand Down Expand Up @@ -102,51 +99,52 @@ func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lake
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op, inferred bool) ([]Source, error) {
func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Scope:
return describeSources(ctx, lk, o.Body[0], inferred)
return describeSources(ctx, lk, o.Body[0])
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0], inferred)
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin", Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.NullScan:
return []Source{&Null{Kind: "Null"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path, Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL, Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID, inferred)
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID, inferred bool) ([]Source, error) {
func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
Inferred: inferred,
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions compiler/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewFileSystemCompiler(engine storage.Engine) runtime.Compiler {
}

func (f *fsCompiler) NewQuery(rctx *runtime.Context, ast *parser.AST, readers []zio.Reader) (runtime.Query, error) {
job, err := NewJob(rctx, ast, f.src, nil)
job, err := NewJob(rctx, ast, f.src, len(readers) > 0)
if err != nil {
return nil, err
}
Expand All @@ -45,7 +45,7 @@ func (f *fsCompiler) NewQuery(rctx *runtime.Context, ast *parser.AST, readers []
return optimizeAndBuild(job, readers)
}

func (*fsCompiler) NewLakeQuery(_ *runtime.Context, ast *parser.AST, parallelism int, head *lakeparse.Commitish) (runtime.Query, error) {
func (*fsCompiler) NewLakeQuery(_ *runtime.Context, ast *parser.AST, parallelism int) (runtime.Query, error) {
panic("NewLakeQuery called on compiler.fsCompiler")
}

Expand Down
Loading