Skip to content

Commit

Permalink
align from-less query semantics with SQL (#5418)
Browse files Browse the repository at this point in the history
This commit changes the interpretation of a query that omits the
from operator.  Previously, a lake query would use the default
"HEAD" parameter as an implied "from HEAD".  This contradicts
SQL which assumes a table with one empty row (like the implied
yield null case).

The new assumption in a lake query for a from-less query is
an implied null source so that a "select 'hello'" query will
return not a single 'hello' rather than a sequence based on the
implied read from HEAD.

Similary, a from-less command-line query (i.e., super -c)
with no file arguments also has an automatic null scan inserted.

As part of this change, we moved the logic for inserting the
null scan into the DAG (along with the logic for inserting
DefaultScan of zio.Readers) into the semantic pass.

These changes mean that lake queries that read from a pool require
an explicit "from" and there is no impleied pool from HEAD.
Because of this, we removed the HEAD references in the code paths
that involve lake queries.  HEAD is still used by the various
"super db" commands to refer to a default pool/branch/commit.

Currently, the null scan does not follow the done protocol
and restart.  We will need to add this when we want from-less
subqueries to restart.
  • Loading branch information
mccanne authored Nov 1, 2024
1 parent f2bf9e6 commit f1a996d
Show file tree
Hide file tree
Showing 69 changed files with 271 additions and 323 deletions.
4 changes: 1 addition & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/brimdata/super/compiler/srcfiles"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/order"
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/pkg/nano"
Expand Down Expand Up @@ -98,8 +97,7 @@ type EventBranch struct {
}

type QueryRequest struct {
Query string `json:"query"`
Head lakeparse.Commitish `json:"head"`
Query string `json:"query"`
}

type QueryChannelSet struct {
Expand Down
6 changes: 1 addition & 5 deletions api/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/brimdata/super/compiler/srcfiles"
"github.com/brimdata/super/lake"
"github.com/brimdata/super/lake/branches"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/runtime/exec"
"github.com/brimdata/super/zio/zngio"
"github.com/brimdata/super/zson"
Expand Down Expand Up @@ -296,15 +295,12 @@ func (c *Connection) Revert(ctx context.Context, poolID ksuid.KSUID, branchName
//
// As for Connection.Do, if the returned error is nil, the user is expected to
// call Response.Body.Close.
func (c *Connection) Query(ctx context.Context, head *lakeparse.Commitish, src string, filenames ...string) (*Response, error) {
func (c *Connection) Query(ctx context.Context, src string, filenames ...string) (*Response, error) {
files, err := srcfiles.Concat(filenames, src)
if err != nil {
return nil, err
}
body := api.QueryRequest{Query: string(files.Text)}
if head != nil {
body.Head = *head
}
req := c.NewRequest(ctx, http.MethodPost, "/query?ctrl=T", body)
res, err := c.Do(req)
if ae := (*api.Error)(nil); errors.As(err, &ae) && len(ae.CompilationErrors) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions cmd/super/compile/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func (s *Shared) Run(ctx context.Context, args []string, lakeFlags *lakeflags.Fl
}
return s.writeValue(ctx, ast.Parsed())
}
runtime, err := compiler.NewJob(runtime.DefaultContext(), ast, data.NewSource(nil, lk), nil)
runtime, err := compiler.NewJob(runtime.DefaultContext(), ast, data.NewSource(nil, lk), false)
if err != nil {
return err
}
if desc {
description, err := describe.AnalyzeDAG(ctx, runtime.Entry(), data.NewSource(nil, lk), nil)
description, err := describe.AnalyzeDAG(ctx, runtime.Entry(), data.NewSource(nil, lk))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/branch/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Command) list(ctx context.Context, lake api.Interface) error {
if err != nil {
return err
}
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
w.Close()
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/log/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Command) Run(args []string) error {
return err
}
defer w.Close()
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/db/ls/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
w.Close()
return err
Expand Down
6 changes: 1 addition & 5 deletions cmd/super/db/query/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"

"github.com/brimdata/super/cli/outputflags"
"github.com/brimdata/super/cli/poolflags"
"github.com/brimdata/super/cli/queryflags"
"github.com/brimdata/super/cli/runtimeflags"
"github.com/brimdata/super/cmd/super/db"
Expand Down Expand Up @@ -33,15 +32,13 @@ func init() {
type Command struct {
*db.Command
outputFlags outputflags.Flags
poolFlags poolflags.Flags
queryFlags queryflags.Flags
runtimeFlags runtimeflags.Flags
}

func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*db.Command)}
c.outputFlags.SetFlags(f)
c.poolFlags.SetFlags(f)
c.queryFlags.SetFlags(f)
c.runtimeFlags.SetFlags(f)
return c, nil
Expand All @@ -68,8 +65,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
head, _ := c.poolFlags.HEAD()
query, err := lake.Query(ctx, head, src, c.queryFlags.Includes...)
query, err := lake.Query(ctx, src, c.queryFlags.Includes...)
if err != nil {
w.Close()
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/super/internal/lakemanage/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type objectIterator struct {

func newObjectIterator(ctx context.Context, lake api.Interface, head *lakeparse.Commitish) (*objectIterator, error) {
query := fmt.Sprintf(iteratorQuery, head.Pool, head.Branch, head.Pool, head.Branch)
q, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, query)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions compiler/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,10 @@ type (
Loc `json:"loc"`
}
Delete struct {
Kind string `json:"kind" unpack:""`
Loc `json:"loc"` // dummy field, not needed except to implement Node
Kind string `json:"kind" unpack:""`
Pool string `json:"pool"`
Branch string `json:"branch"`
Loc `json:"loc"` // dummy field, not needed except to implement Node
}
)

Expand Down
4 changes: 4 additions & 0 deletions compiler/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ type (
Kind string `json:"kind" unpack:""`
Meta string `json:"meta"`
}
NullScan struct {
Kind string `json:"kind" unpack:""`
}
)

var LakeMetas = map[string]struct{}{
Expand Down Expand Up @@ -273,6 +276,7 @@ func (*DeleteScan) OpNode() {}
func (*LakeMetaScan) OpNode() {}
func (*PoolMetaScan) OpNode() {}
func (*CommitMetaScan) OpNode() {}
func (*NullScan) OpNode() {}

func (*Lister) OpNode() {}
func (*Slicer) OpNode() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var unpacker = unpack.New(
MapExpr{},
Merge{},
Mirror{},
NullScan{},
Output{},
Over{},
OverExpr{},
Expand Down
62 changes: 30 additions & 32 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/brimdata/super/compiler/parser"
"github.com/brimdata/super/compiler/semantic"
"github.com/brimdata/super/lake"
"github.com/brimdata/super/lakeparse"
"github.com/brimdata/super/order"
"github.com/brimdata/super/pkg/field"
"github.com/segmentio/ksuid"
Expand All @@ -31,48 +30,46 @@ type (
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
Inferred bool `json:"inferred"`
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
Inferred bool `json:"inferred"`
Kind string `json:"kind"`
URI string `json:"uri"`
}
Null struct {
Kind string `json:"kind"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}
func (*Null) Source() {}

type Channel struct {
Name string `json:"name"`
AggregationKeys field.List `json:"aggregation_keys"`
Sort order.SortKeys `json:"sort"`
}

func Analyze(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
func Analyze(ctx context.Context, query string, src *data.Source) (*Info, error) {
ast, err := parser.ParseQuery(query)
if err != nil {
return nil, err
}
entry, err := semantic.Analyze(ctx, ast, src, head)
entry, err := semantic.Analyze(ctx, ast, src, false)
if err != nil {
return nil, err
}
return AnalyzeDAG(ctx, entry, src, head)
return AnalyzeDAG(ctx, entry, src)
}

func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
srcInferred := !semantic.HasSource(entry)
if err := semantic.AddDefaultSource(ctx, &entry, src, head); err != nil {
return nil, err
}
func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source) (*Info, error) {
var err error
var info Info
if info.Sources, err = describeSources(ctx, src.Lake(), entry[0], srcInferred); err != nil {
if info.Sources, err = describeSources(ctx, src.Lake(), entry[0]); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, src).SortKeys(entry)
Expand Down Expand Up @@ -102,51 +99,52 @@ func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lake
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op, inferred bool) ([]Source, error) {
func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Scope:
return describeSources(ctx, lk, o.Body[0], inferred)
return describeSources(ctx, lk, o.Body[0])
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0], inferred)
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin", Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.NullScan:
return []Source{&Null{Kind: "Null"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path, Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL, Inferred: inferred}}, nil
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID, inferred)
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool, inferred)
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID, inferred bool) ([]Source, error) {
func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
Inferred: inferred,
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions compiler/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewFileSystemCompiler(engine storage.Engine) runtime.Compiler {
}

func (f *fsCompiler) NewQuery(rctx *runtime.Context, ast *parser.AST, readers []zio.Reader) (runtime.Query, error) {
job, err := NewJob(rctx, ast, f.src, nil)
job, err := NewJob(rctx, ast, f.src, len(readers) > 0)
if err != nil {
return nil, err
}
Expand All @@ -45,7 +45,7 @@ func (f *fsCompiler) NewQuery(rctx *runtime.Context, ast *parser.AST, readers []
return optimizeAndBuild(job, readers)
}

func (*fsCompiler) NewLakeQuery(_ *runtime.Context, ast *parser.AST, parallelism int, head *lakeparse.Commitish) (runtime.Query, error) {
func (*fsCompiler) NewLakeQuery(_ *runtime.Context, ast *parser.AST, parallelism int) (runtime.Query, error) {
panic("NewLakeQuery called on compiler.fsCompiler")
}

Expand Down
Loading

0 comments on commit f1a996d

Please sign in to comment.