Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Mar 13, 2024
1 parent f889134 commit 00b25d8
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 10 deletions.
88 changes: 88 additions & 0 deletions go/vt/vtgate/engine/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"
"sync"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*Record)(nil)

type Record struct {
Input Primitive
doOnce sync.Once
record func(result *sqltypes.Result)
}

// RouteType returns a description of the query routing type used by the primitive
func (rc *Record) RouteType() string {
return rc.Input.RouteType()
}

// GetKeyspaceName specifies the Keyspace that this primitive routes to.
func (rc *Record) GetKeyspaceName() string {
return rc.Input.GetKeyspaceName()
}

// GetTableName specifies the table that this primitive routes to.
func (rc *Record) GetTableName() string {
return rc.Input.GetTableName()
}

// TryExecute satisfies the Primitive interface.
func (rc *Record) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
r, err := rc.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return nil, err
}
go rc.doOnce.Do(func() {
rc.record(r)
})
return r, nil
}

// TryStreamExecute satisfies the Primitive interface.
func (rc *Record) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
panic("not implemented")
}

// GetFields implements the Primitive interface.
func (rc *Record) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return rc.Input.GetFields(ctx, vcursor, bindVars)
}

// Inputs returns the input to limit
func (rc *Record) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{rc.Input}, nil
}

// NeedsTransaction implements the Primitive interface.
func (rc *Record) NeedsTransaction() bool {
return rc.Input.NeedsTransaction()
}

func (rc *Record) description() PrimitiveDescription {
other := map[string]any{}
return PrimitiveDescription{
Other: other,
}
}
88 changes: 88 additions & 0 deletions go/vt/vtgate/engine/replan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"
"sync"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*RePlan)(nil)

type RePlan struct {
Input Primitive
replanOnce sync.Once
replan func()
}

// RouteType returns a description of the query routing type used by the primitive
func (rp *RePlan) RouteType() string {
return rp.Input.RouteType()
}

// GetKeyspaceName specifies the Keyspace that this primitive routes to.
func (rp *RePlan) GetKeyspaceName() string {
return rp.Input.GetKeyspaceName()
}

// GetTableName specifies the table that this primitive routes to.
func (rp *RePlan) GetTableName() string {
return rp.Input.GetTableName()
}

// TryExecute satisfies the Primitive interface.
func (rp *RePlan) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
defer func() {
go rp.replanOnce.Do(rp.replan)
}()
r, err := rp.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return nil, err
}
return r, nil
}

// TryStreamExecute satisfies the Primitive interface.
func (rp *RePlan) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
panic("not implemented")
}

// GetFields implements the Primitive interface.
func (rp *RePlan) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return rp.Input.GetFields(ctx, vcursor, bindVars)
}

// Inputs returns the input to limit
func (rp *RePlan) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{rp.Input}, nil
}

// NeedsTransaction implements the Primitive interface.
func (rp *RePlan) NeedsTransaction() bool {
return rp.Input.NeedsTransaction()
}

func (rp *RePlan) description() PrimitiveDescription {
other := map[string]any{}
return PrimitiveDescription{
Other: other,
}
}
31 changes: 27 additions & 4 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,35 @@ func addColumnsToInput(ctx *plancontext.PlanningContext, root Operator) Operator
return TopDown(root, TableID, visitor, stopAtRoute)
}

// addColumnsToInput adds columns needed by an operator to its input.
// This happens only when the filter expression can be retrieved as an offset from the underlying mysql.
func pullDistinctFromUNION(_ *plancontext.PlanningContext, root Operator) Operator {
func expandUNION(ctx *plancontext.PlanningContext, root Operator) Operator {
visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) {
union, ok := in.(*Union)
if !ok || !union.distinct {
if !ok {
return in, NoRewrite
}

var missingType bool
var newSources []Operator
for _, source := range union.Sources {
var typeNeeded []int
cols := source.GetColumns(ctx)
for colOffset, col := range cols {
if _, found := ctx.SemTable.TypeForExpr(col.Expr); !found {
typeNeeded = append(typeNeeded, colOffset)
}
}
if len(typeNeeded) > 0 {
//
missingType = true
} else {
newSources = append(newSources, source)
}
}
if missingType {
//
}

if !union.distinct {
return in, NoRewrite
}

Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
const (
physicalTransform Phase = iota
initialPlanning
pullDistinctFromUnion
expandUnion
delegateAggregation
addAggrOrdering
cleanOutPerfDistinct
Expand All @@ -49,8 +49,8 @@ func (p Phase) String() string {
return "physicalTransform"
case initialPlanning:
return "initial horizon planning optimization"
case pullDistinctFromUnion:
return "pull distinct from UNION1"
case expandUnion:
return "expand UNION"
case delegateAggregation:
return "split aggregation between vtgate and mysql"
case addAggrOrdering:
Expand All @@ -68,7 +68,7 @@ func (p Phase) String() string {

func (p Phase) shouldRun(s semantics.QuerySignature) bool {
switch p {
case pullDistinctFromUnion:
case expandUnion:
return s.Union
case delegateAggregation:
return s.Aggregation
Expand All @@ -87,8 +87,8 @@ func (p Phase) shouldRun(s semantics.QuerySignature) bool {

func (p Phase) act(ctx *plancontext.PlanningContext, op Operator) Operator {
switch p {
case pullDistinctFromUnion:
return pullDistinctFromUNION(ctx, op)
case expandUnion:
return expandUNION(ctx, op)
case delegateAggregation:
return enableDelegateAggregation(ctx, op)
case addAggrOrdering:
Expand Down
79 changes: 79 additions & 0 deletions go/vt/vtgate/planbuilder/operators/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

type Record struct {
Source Operator
}

func newRecord(src Operator) *Record {
return &Record{
Source: src,
}
}

// Clone implements the Operator interface
func (rc *Record) Clone(inputs []Operator) Operator {
newOp := *rc
newOp.Source = inputs[0]
return &newOp
}

func (rc *Record) GetOrdering(*plancontext.PlanningContext) []OrderBy {
return nil
}

// Inputs implements the Operator interface
func (rc *Record) Inputs() []Operator {
return []Operator{rc.Source}
}

// SetInputs implements the Operator interface
func (rc *Record) SetInputs(ops []Operator) {
rc.Source = ops[0]
}

func (rc *Record) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
newSource := rc.Source.AddPredicate(ctx, expr)
rc.Source = newSource
return rc
}

func (rc *Record) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, expr *sqlparser.AliasedExpr) int {
return rc.Source.AddColumn(ctx, reuse, gb, expr)
}

func (rc *Record) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return rc.Source.FindCol(ctx, expr, underRoute)
}

func (rc *Record) GetColumns(ctx *plancontext.PlanningContext) (result []*sqlparser.AliasedExpr) {
return rc.Source.GetColumns(ctx)
}

func (rc *Record) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return rc.Source.GetSelectExprs(ctx)
}

func (rc *Record) ShortDescription() string {
return ""
}

0 comments on commit 00b25d8

Please sign in to comment.