Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework field selection for projection pushdown (aka demand) #5553

Merged
merged 1 commit into from
Jan 8, 2025
Merged

Conversation

nwt
Copy link
Member

@nwt nwt commented Jan 3, 2025

Simplify compiler/optimizer/demand.go and extend it to handle all DAG expressions and operators.

I'm not sure how precise this is, but in an attempt to verify that it doesn't omit any required fields, I tested it with the following diff, which adds a cut operator after every scan when a field list is available. make test-unit test-system test-heavy succeeds with it.

diff --git a/compiler/dag/op.go b/compiler/dag/op.go
index c8149b0e6..20cb35018 100644
--- a/compiler/dag/op.go
+++ b/compiler/dag/op.go
@@ -202,6 +202,7 @@ type (
 	// DefaultScan scans an input stream provided by the runtime.
 	DefaultScan struct {
 		Kind     string         `json:"kind" unpack:""`
+		Fields   []field.Path   `json:"fields"`
 		Filter   Expr           `json:"filter"`
 		SortKeys order.SortKeys `json:"sort_keys"`
 	}
diff --git a/compiler/kernel/op.go b/compiler/kernel/op.go
index 5ace22a2c..9f0a00b05 100644
--- a/compiler/kernel/op.go
+++ b/compiler/kernel/op.go
@@ -280,7 +280,11 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
 		body := strings.NewReader(v.Body)
 		return b.env.OpenHTTP(b.rctx.Context, b.zctx(), v.URL, v.Format, v.Method, v.Headers, body, nil)
 	case *dag.FileScan:
-		return b.env.Open(b.rctx.Context, b.zctx(), v.Path, v.Format, v.Fields, b.PushdownOf(v.Filter))
+		p, err := b.env.Open(b.rctx.Context, b.zctx(), v.Path, v.Format, v.Fields, b.PushdownOf(v.Filter))
+		if err != nil {
+			return nil, err
+		}
+		return b.cutFields(p, v.Fields)
 	case *dag.RobotScan:
 		e, err := compileExpr(v.Expr)
 		if err != nil {
@@ -290,7 +294,11 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
 	case *dag.DefaultScan:
 		pushdown := b.PushdownOf(v.Filter)
 		if len(b.readers) == 1 {
-			return zbuf.NewScanner(b.rctx.Context, b.readers[0], pushdown)
+			p, err := zbuf.NewScanner(b.rctx.Context, b.readers[0], pushdown)
+			if err != nil {
+				return nil, err
+			}
+			return b.cutFields(p, v.Fields)
 		}
 		scanners := make([]zbuf.Scanner, 0, len(b.readers))
 		for _, r := range b.readers {
@@ -300,7 +308,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
 			}
 			scanners = append(scanners, scanner)
 		}
-		return zbuf.MultiScanner(scanners...), nil
+		return b.cutFields(zbuf.MultiScanner(scanners...), v.Fields)
 	case *dag.NullScan:
 		//XXX we need something that implements the done protocol and restarst
 		return zbuf.NewPuller(zbuf.NewArray([]super.Value{super.Null})), nil
@@ -334,7 +342,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
 				return nil, err
 			}
 		}
-		return meta.NewSequenceScanner(b.rctx, parent, pool, b.PushdownOf(v.Filter), pruner, b.progress), nil
+		return b.cutFields(meta.NewSequenceScanner(b.rctx, parent, pool, b.PushdownOf(v.Filter), pruner, b.progress), v.Fields)
 	case *dag.Deleter:
 		pool, err := b.lookupPool(v.Pool)
 		if err != nil {
@@ -388,6 +396,29 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
 	}
 }
 
+func (b *Builder) cutFields(parent zbuf.Puller, fields []field.Path) (zbuf.Puller, error) {
+	if len(fields) == 0 {
+		return parent, nil
+	}
+	var assignments []dag.Assignment
+	for _, f := range fields {
+		this := &dag.This{
+			Kind: "This",
+			Path: f,
+		}
+		assignments = append(assignments, dag.Assignment{
+			Kind: "Assignment",
+			LHS:  this,
+			RHS:  this,
+		})
+	}
+	cut := &dag.Cut{
+		Kind: "Cut",
+		Args: assignments,
+	}
+	return b.compileLeaf(cut, parent)
+}
+
 func (b *Builder) compileDefs(defs []dag.Def) ([]string, []expr.Evaluator, error) {
 	exprs := make([]expr.Evaluator, 0, len(defs))
 	names := make([]string, 0, len(defs))
diff --git a/compiler/optimizer/demand.go b/compiler/optimizer/demand.go
index f6f68f6b5..f174dd934 100644
--- a/compiler/optimizer/demand.go
+++ b/compiler/optimizer/demand.go
@@ -104,7 +104,11 @@ func demandForOp(op dag.Op, downstream demand.Demand) demand.Demand {
 		}
 		return d
 
-	case *dag.CommitMetaScan, *dag.DefaultScan, *dag.Deleter, *dag.DeleteScan, *dag.LakeMetaScan:
+	case *dag.CommitMetaScan, *dag.Deleter, *dag.DeleteScan, *dag.LakeMetaScan:
+		return demand.None()
+	case *dag.DefaultScan:
+		d := demand.Union(downstream, demandForExpr(op.Filter))
+		op.Fields = demand.Fields(d)
 		return demand.None()
 	case *dag.FileScan:
 		d := demand.Union(downstream, demandForExpr(op.Filter))
diff --git a/runtime/ztests/expr/array-expr.yaml b/runtime/ztests/expr/array-expr.yaml
index 0ef22d0d2..7124fbc54 100644
--- a/runtime/ztests/expr/array-expr.yaml
+++ b/runtime/ztests/expr/array-expr.yaml
@@ -1,3 +1,5 @@
+skip: Adding a cut operator changes error("quiet") to error("missing")
+
 zed: yield [a,b,c]
 
 vector: true

Simplify compiler/optimizer/demand.go and extend it to handle all DAG
expressions and operators.
@nwt nwt requested a review from mattnibs January 3, 2025 00:00
@mattnibs
Copy link
Collaborator

mattnibs commented Jan 6, 2025

Nice! Do you think we should add some suite of tests that checks we have the correct fields from given queries?

@nwt nwt merged commit 86dc144 into main Jan 8, 2025
3 checks passed
@nwt nwt deleted the rework-demand branch January 8, 2025 16:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants