Skip to content

Commit

Permalink
refactor: move vcursorImpl to another package - wip
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 2, 2024
1 parent 79162c5 commit 231b8cb
Show file tree
Hide file tree
Showing 11 changed files with 646 additions and 543 deletions.
87 changes: 55 additions & 32 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import (
)

var (
errNoKeyspace = vterrors.VT09005()
defaultTabletType = topodatapb.TabletType_PRIMARY

// TODO: @rafael - These two counters should be deprecated in favor of the ByTable ones in v17+. They are kept for now for backwards compatibility.
Expand Down Expand Up @@ -168,6 +167,7 @@ func NewExecutor(
pv plancontext.PlannerVersion,
warmingReadsPercent int,
) *Executor {
warnings.Add("WarnUnshardedOnly", 1)
e := &Executor{
env: env,
serv: serv,
Expand Down Expand Up @@ -301,7 +301,7 @@ func (e *Executor) StreamExecute(
srr := &streaminResultReceiver{callback: callback}
var err error

resultHandler := func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, execStart time.Time) error {
resultHandler := func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, execStart time.Time) error {
var seenResults atomic.Bool
var resultMu sync.Mutex
result := &sqltypes.Result{}
Expand Down Expand Up @@ -369,7 +369,7 @@ func (e *Executor) StreamExecute(
logStats.TablesUsed = plan.TablesUsed
logStats.TabletType = vc.TabletType().String()
logStats.ExecuteTime = time.Since(execStart)
logStats.ActiveKeyspace = vc.keyspace
logStats.ActiveKeyspace = vc.GetKeyspace()

e.updateQueryCounts(plan.Instructions.RouteType(), plan.Instructions.GetKeyspaceName(), plan.Instructions.GetTableName(), int64(logStats.ShardQueries))

Expand Down Expand Up @@ -435,7 +435,7 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
var err error
var qr *sqltypes.Result
var stmtType sqlparser.StatementType
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
stmtType = plan.Type
qr, err = e.executePlan(ctx, safeSession, plan, vc, bindVars, logStats, time)
return err
Expand All @@ -449,7 +449,7 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
}

// addNeededBindVars adds bind vars that are needed by the plan
func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *econtext.SafeSession) error {
func (e *Executor) addNeededBindVars(vcursor *econtext.VCursorImpl, bindVarNeeds *sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *econtext.SafeSession) error {
for _, funcName := range bindVarNeeds.NeedFunctionResult {
switch funcName {
case sqlparser.DBVarName:
Expand Down Expand Up @@ -542,7 +542,7 @@ func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlpars
}

evalExpr, err := evalengine.Translate(expr, &evalengine.Config{
Collation: vcursor.collation,
Collation: vcursor.ConnCollation(),
Environment: e.env,
SQLMode: evalengine.ParseSQLMode(vcursor.SQLMode()),
})
Expand All @@ -553,7 +553,7 @@ func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlpars
if err != nil {
return err
}
bindVars[key] = sqltypes.ValueBindVariable(evaluated.Value(vcursor.collation))
bindVars[key] = sqltypes.ValueBindVariable(evaluated.Value(vcursor.ConnCollation()))
}
}
}
Expand Down Expand Up @@ -723,7 +723,7 @@ func (e *Executor) CloseSession(ctx context.Context, safeSession *econtext.SafeS
return e.txConn.ReleaseAll(ctx, safeSession)
}

func (e *Executor) setVitessMetadata(ctx context.Context, name, value string) error {
func (e *Executor) SetVitessMetadata(ctx context.Context, name, value string) error {
// TODO(kalfonso): move to its own acl check and consolidate into an acl component that can handle multiple operations (vschema, metadata)
user := callerid.ImmediateCallerIDFromContext(ctx)
allowed := vschemaacl.Authorized(user)
Expand All @@ -742,7 +742,7 @@ func (e *Executor) setVitessMetadata(ctx context.Context, name, value string) er
return ts.UpsertMetadata(ctx, name, value)
}

func (e *Executor) showVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
func (e *Executor) ShowVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
ts, err := e.serv.GetTopoServer()
if err != nil {
return nil, err
Expand Down Expand Up @@ -775,7 +775,7 @@ func (e *Executor) showVitessMetadata(ctx context.Context, filter *sqlparser.Sho

type tabletFilter func(tablet *topodatapb.Tablet, servingState string, primaryTermStartTime int64) bool

func (e *Executor) showShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) {
func (e *Executor) ShowShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) {
showVitessShardsFilters := func(filter *sqlparser.ShowFilter) ([]func(string) bool, []func(string, *topodatapb.ShardReference) bool) {
keyspaceFilters := []func(string) bool{}
shardFilters := []func(string, *topodatapb.ShardReference) bool{}
Expand Down Expand Up @@ -859,7 +859,7 @@ func (e *Executor) showShards(ctx context.Context, filter *sqlparser.ShowFilter,
}, nil
}

func (e *Executor) showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
func (e *Executor) ShowTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
getTabletFilters := func(filter *sqlparser.ShowFilter) []tabletFilter {
var filters []tabletFilter

Expand Down Expand Up @@ -932,7 +932,7 @@ func (e *Executor) showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result,
}, nil
}

func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
func (e *Executor) ShowVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
ctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
defer cancel()
rows := [][]sqltypes.Value{}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func (e *Executor) ParseDestinationTarget(targetString string) (string, topodata
// the cache, it reuses it.
func (e *Executor) getPlan(
ctx context.Context,
vcursor *vcursorImpl,
vcursor *econtext.VCursorImpl,
query string,
stmt sqlparser.Statement,
comments sqlparser.MarginComments,
Expand Down Expand Up @@ -1131,10 +1131,10 @@ func (e *Executor) getPlan(
reservedVars,
bindVars,
parameterize,
vcursor.keyspace,
vcursor.safeSession.GetSelectLimit(),
vcursor.GetKeyspace(),
vcursor.GetSelectLimit(),
setVarComment,
vcursor.safeSession.SystemVariables,
vcursor.GetSystemVariablesCopy(),
vcursor.GetForeignKeyChecksState(),
vcursor,
)
Expand All @@ -1153,9 +1153,9 @@ func (e *Executor) getPlan(
return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *vcursorImpl, query string) PlanCacheKey {
func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
hasher := vthash.New256()
vcursor.keyForPlan(ctx, query, hasher)
vcursor.KeyForPlan(ctx, query, hasher)

var planKey PlanCacheKey
hasher.Sum(planKey[:0])
Expand All @@ -1164,7 +1164,7 @@ func (e *Executor) hashPlan(ctx context.Context, vcursor *vcursorImpl, query str

func (e *Executor) buildStatement(
ctx context.Context,
vcursor *vcursorImpl,
vcursor *econtext.VCursorImpl,
query string,
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
Expand All @@ -1175,23 +1175,22 @@ func (e *Executor) buildStatement(
return nil, err
}

plan.Warnings = vcursor.warnings
vcursor.warnings = nil
plan.Warnings = vcursor.GetAndEmptyWarnings()

err = e.checkThatPlanIsValid(stmt, plan)
return plan, err
}

func (e *Executor) cacheAndBuildStatement(
ctx context.Context,
vcursor *vcursorImpl,
vcursor *econtext.VCursorImpl,
query string,
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
bindVarNeeds *sqlparser.BindVarNeeds,
logStats *logstats.LogStats,
) (*engine.Plan, error) {
planCachable := sqlparser.CachePlan(stmt) && vcursor.safeSession.CachePlan()
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
planKey := e.hashPlan(ctx, vcursor, query)

Expand All @@ -1209,7 +1208,7 @@ func (e *Executor) canNormalizeStatement(stmt sqlparser.Statement, setVarComment
return sqlparser.CanNormalize(stmt) || setVarComment != ""
}

func prepareSetVarComment(vcursor *vcursorImpl, stmt sqlparser.Statement) (string, error) {
func prepareSetVarComment(vcursor *econtext.VCursorImpl, stmt sqlparser.Statement) (string, error) {
if vcursor == nil || vcursor.Session().InReservedConn() {
return "", nil
}
Expand Down Expand Up @@ -1405,9 +1404,29 @@ func (e *Executor) prepare(ctx context.Context, safeSession *econtext.SafeSessio
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unrecognized prepare statement: %s", sql)
}

func (e *Executor) getVCursorConfig() econtext.VCursorConfig {
connCollation := collations.Unknown
if gw, isTabletGw := e.resolver.resolver.GetGateway().(*TabletGateway); isTabletGw {
connCollation = gw.DefaultConnCollation()
}
return econtext.VCursorConfig{
WarmingReadsPercent: warmingReadsPercent,
Collation: connCollation,
MaxMemoryRows: 0,
EnableShardRouting: false,
DefaultTabletType: 0,
QueryTimeout: 0,
DBDDLPlugin: "",
ForeignKeyMode: 0,
SetVarEnabled: false,
EnableViews: false,
}
}

func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)

vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.getVCursorConfig())

stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
Expand Down Expand Up @@ -1456,12 +1475,12 @@ func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.St
}

// ExecuteMultiShard implements the IExecutor interface
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) {
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver econtext.ResultsObserver) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows, resultsObserver)
}

// StreamExecuteMulti implements the IExecutor interface
func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *econtext.SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, resultsObserver resultsObserver) []error {
func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *econtext.SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, resultsObserver econtext.ResultsObserver) []error {
return e.scatterConn.StreamExecuteMulti(ctx, primitive, query, rss, vars, session, autocommit, callback, resultsObserver)
}

Expand Down Expand Up @@ -1581,21 +1600,21 @@ func (e *Executor) ReleaseLock(ctx context.Context, session *econtext.SafeSessio
return e.txConn.ReleaseLock(ctx, session)
}

// planPrepareStmt implements the IExecutor interface
func (e *Executor) planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) {
// PlanPrepareStmt implements the IExecutor interface
func (e *Executor) PlanPrepareStmt(ctx context.Context, vcursor *econtext.VCursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) {
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return nil, nil, err
}

// creating this log stats to not interfere with the original log stats.
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil)
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.Session().GetSessionUUID(), nil)
plan, err := e.getPlan(
ctx,
vcursor,
query,
sqlparser.Clone(stmt),
vcursor.marginComments,
vcursor.GetMarginComments(),
map[string]*querypb.BindVariable{},
reservedVars, /* normalize */
false,
Expand All @@ -1617,7 +1636,7 @@ func (e *Executor) Close() {
e.plans.Close()
}

func (e *Executor) environment() *vtenv.Environment {
func (e *Executor) Environment() *vtenv.Environment {
return e.env
}

Expand All @@ -1629,6 +1648,10 @@ func (e *Executor) UnresolvedTransactions(ctx context.Context, targets []*queryp
return e.txConn.UnresolvedTransactions(ctx, targets)
}

func (e *Executor) AddWarningCount(name string, count int64) {
warnings.Add(name, count)
}

type (
errorTransformer interface {
TransformError(err error) error
Expand Down
Loading

0 comments on commit 231b8cb

Please sign in to comment.