Skip to content

Commit

Permalink
wip - handle last insert id in vtgate with selects
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Dec 11, 2024
1 parent 63dfb9e commit b7c5b7e
Show file tree
Hide file tree
Showing 27 changed files with 148 additions and 115 deletions.
1 change: 1 addition & 0 deletions go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func (c *Conn) ReadQueryResult(maxrows int, wantfields bool) (*sqltypes.Result,
result.SessionStateChanges = packetEof.sessionStateData
result.StatusFlags = packetEof.statusFlags
result.Info = packetEof.info
result.InsertID = packetEof.lastInsertID
}
return result, more, warnings, nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ SELECT (~ (1 || 0)) IS NULL;

SELECT 1
WHERE (~ (1 || 0)) IS NULL;

SELECT c1 FROM t0 WHERE LAST_INSERT_ID(42);

SELECT LAST_INSERT_ID();

SELECT c1 FROM t0 WHERE LAST_INSERT_ID(0);

SELECT LAST_INSERT_ID();
4 changes: 2 additions & 2 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,12 @@ func (vw *VSchemaWrapper) IsViewsEnabled() bool {

// FindMirrorRule finds the mirror rule for the requested keyspace, table
// name, and the tablet type in the VSchema.
func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) {
func (vw *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) {
destKeyspace, destTabletType, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, err
}
mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType)
mirrorRule, err := vw.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (
// each node does its part by combining the results of the
// sub-nodes.
type Plan struct {
Type sqlparser.StatementType // The type of query we have
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
Type sqlparser.StatementType // The type of query we have
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
ForceReadLastInsertID bool // ForceReadLastInsertID is set to true when we need to set the session's last insert ID value to what this plan returns no matter what

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand Down
20 changes: 13 additions & 7 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ type streaminResultReceiver struct {
callback func(*sqltypes.Result) error
}

func (s *streaminResultReceiver) storeResultStats(typ sqlparser.StatementType, qr *sqltypes.Result) error {
func (s *streaminResultReceiver) storeResultStats(typ sqlparser.StatementType, qr *sqltypes.Result, forceReadLastInsertID bool) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rowsAffected += qr.RowsAffected
s.rowsReturned += len(qr.Rows)
if qr.InsertID != 0 {
if forceReadLastInsertID || qr.InsertID != 0 {
s.insertID = qr.InsertID
}
s.stmtType = typ
Expand Down Expand Up @@ -344,7 +344,7 @@ func (e *Executor) StreamExecute(

// 4: Execute!
err := vc.StreamExecutePrimitive(ctx, plan.Instructions, bindVars, true, func(qr *sqltypes.Result) error {
return srr.storeResultStats(plan.Type, qr)
return srr.storeResultStats(plan.Type, qr, plan.ForceReadLastInsertID)
})

// Check if there was partial DML execution. If so, rollback the effect of the partially executed query.
Expand Down Expand Up @@ -439,8 +439,14 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
stmtType = plan.Type
qr, err = e.executePlan(ctx, safeSession, plan, vc, bindVars, logStats, time)
return err
}, func(typ sqlparser.StatementType, result *sqltypes.Result) error {
if err != nil {
return err
}
if plan.ForceReadLastInsertID {
safeSession.LastInsertId = qr.InsertID
}
return nil
}, func(typ sqlparser.StatementType, result *sqltypes.Result, _ bool) error {
stmtType = typ
qr = result
return nil
Expand Down Expand Up @@ -1144,7 +1150,7 @@ func (e *Executor) getPlan(
logStats.SQL = comments.Leading + query + comments.Trailing
logStats.BindVariables = sqltypes.CopyBindVariables(bindVars)

return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
return e.getFromCacheOrBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
Expand Down Expand Up @@ -1179,7 +1185,7 @@ func (e *Executor) buildStatement(
return plan, err
}

func (e *Executor) cacheAndBuildStatement(
func (e *Executor) getFromCacheOrBuildStatement(
ctx context.Context,
vcursor *econtext.VCursorImpl,
query string,
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

type planExec func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error
type txResult func(sqlparser.StatementType, *sqltypes.Result) error
type txResult func(sqlparser.StatementType, *sqltypes.Result, bool) error

var vschemaWaitTimeout = 30 * time.Second

Expand Down Expand Up @@ -157,7 +157,7 @@ func (e *Executor) newExecute(
return err
}
if result != nil {
return recResult(plan.Type, result)
return recResult(plan.Type, result, plan.ForceReadLastInsertID)
}

// 4: Prepare for execution.
Expand Down
45 changes: 25 additions & 20 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ var (

type (
planResult struct {
primitive engine.Primitive
tables []string
primitive engine.Primitive
tables []string
forceReadLastInsertID bool
}

stmtPlanner func(sqlparser.Statement, *sqlparser.ReservedVars, plancontext.VSchema) (*planResult, error)
)

func newPlanResult(prim engine.Primitive, tablesUsed ...string) *planResult {
return &planResult{primitive: prim, tables: tablesUsed}
func newPlanResult(prim engine.Primitive, lastInsertID bool, tablesUsed ...string) *planResult {
return &planResult{primitive: prim, tables: tablesUsed, forceReadLastInsertID: lastInsertID}
}

func singleTable(ks, tbl string) string {
Expand Down Expand Up @@ -115,16 +116,20 @@ func BuildFromStmt(ctx context.Context, query string, stmt sqlparser.Statement,

var primitive engine.Primitive
var tablesUsed []string
var forceReadLastInsertID bool
if planResult != nil {
primitive = planResult.primitive
tablesUsed = planResult.tables
forceReadLastInsertID = planResult.forceReadLastInsertID
}

plan := &engine.Plan{
Type: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
Type: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
ForceReadLastInsertID: forceReadLastInsertID,
}
return plan, nil
}
Expand Down Expand Up @@ -239,7 +244,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat
case *sqlparser.CommentOnly:
// There is only a comment in the input.
// This is essentially a No-op
return newPlanResult(engine.NewRowsPrimitive(nil, nil)), nil
return newPlanResult(engine.NewRowsPrimitive(nil, nil), false), nil
}

return nil, vterrors.VT13001(fmt.Sprintf("unexpected statement type: %T", stmt))
Expand Down Expand Up @@ -279,7 +284,7 @@ func buildAnalyzePlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsche
TargetDestination: dest,
Query: sqlparser.String(analyzeStmt),
}
return newPlanResult(prim, sqlparser.String(analyzeStmt.Table)), nil
return newPlanResult(prim, false, sqlparser.String(analyzeStmt.Table)), nil
}

func buildDBDDLPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
Expand All @@ -297,25 +302,25 @@ func buildDBDDLPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vschema
switch dbDDL := dbDDLstmt.(type) {
case *sqlparser.DropDatabase:
if dbDDL.IfExists && !ksExists {
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}
if !ksExists {
return nil, vterrors.VT05001(ksName)
}
return newPlanResult(engine.NewDBDDL(ksName, false, queryTimeout(dbDDL.Comments.Directives()))), nil
return newPlanResult(engine.NewDBDDL(ksName, false, queryTimeout(dbDDL.Comments.Directives())), false), nil
case *sqlparser.AlterDatabase:
if !ksExists {
return nil, vterrors.VT05002(ksName)
}
return nil, vterrors.VT12001("ALTER DATABASE")
case *sqlparser.CreateDatabase:
if dbDDL.IfNotExists && ksExists {
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}
if !dbDDL.IfNotExists && ksExists {
return nil, vterrors.VT06001(ksName)
}
return newPlanResult(engine.NewDBDDL(ksName, true, queryTimeout(dbDDL.Comments.Directives()))), nil
return newPlanResult(engine.NewDBDDL(ksName, true, queryTimeout(dbDDL.Comments.Directives())), false), nil
}
return nil, vterrors.VT13001(fmt.Sprintf("database DDL not recognized: %s", sqlparser.String(dbDDLstmt)))
}
Expand All @@ -340,7 +345,7 @@ func buildLoadPlan(query string, vschema plancontext.VSchema) (*planResult, erro
Query: query,
IsDML: true,
SingleShardOnly: true,
}), nil
}, false), nil
}

func buildVSchemaDDLPlan(stmt *sqlparser.AlterVschema, vschema plancontext.VSchema) (*planResult, error) {
Expand All @@ -351,7 +356,7 @@ func buildVSchemaDDLPlan(stmt *sqlparser.AlterVschema, vschema plancontext.VSche
return newPlanResult(&engine.AlterVSchema{
Keyspace: keyspace,
AlterVschemaDDL: stmt,
}, singleTable(keyspace.Name, stmt.Table.Name.String())), nil
}, false, singleTable(keyspace.Name, stmt.Table.Name.String())), nil
}

func buildFlushPlan(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -381,7 +386,7 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla
TargetDestination: dest,
Query: sqlparser.String(stmt),
ReservedConnectionNeeded: stmt.WithLock,
}), nil
}, false), nil
}

func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -433,7 +438,7 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
ReservedConnectionNeeded: stmt.WithLock,
}, tc.getTables()...), nil
}, false, tc.getTables()...), nil
}
}

Expand All @@ -451,7 +456,7 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
}
sources = append(sources, plan)
}
return newPlanResult(engine.NewConcatenate(sources, nil), tc.getTables()...), nil
return newPlanResult(engine.NewConcatenate(sources, nil), false, tc.getTables()...), nil
}

type tableCollector struct {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func buildPlanForBypass(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsc
MultishardAutocommit: hints.multiShardAutocommit,
QueryTimeout: hints.queryTimeout,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func GetShardRoute(vschema plancontext.VSchema, keyspace, shard string) (*vindexes.Keyspace, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/call_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func buildCallProcPlan(stmt *sqlparser.CallProc, vschema plancontext.VSchema) (*
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
}), nil
}, false), nil
}

const errNotAllowWhenSharded = "CALL is not supported for sharded keyspace"
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser
tc.addASTTable(normalDDLPlan.Keyspace.Name, tbl)
}

return newPlanResult(eddl, tc.getTables()...), nil
return newPlanResult(eddl, false, tc.getTables()...), nil
}

func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) {
Expand All @@ -89,7 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*plan
Query: sql,
IsDDL: isDDL,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func buildDDLPlans(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*engine.Send, *engine.OnlineDDL, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func gen4DeleteStmtPlanner(
if ks, tables := ctx.SemTable.SingleUnshardedKeyspace(); ks != nil {
if !ctx.SemTable.ForeignKeysPresent() {
plan := deleteUnshardedShortcut(deleteStmt, ks, tables)
return newPlanResult(plan, operators.QualifiedTables(ks, tables)...), nil
return newPlanResult(plan, false, operators.QualifiedTables(ks, tables)...), nil
}
}

Expand All @@ -83,7 +83,7 @@ func gen4DeleteStmtPlanner(
return nil, err
}

return newPlanResult(plan, operators.TablesUsed(op)...), nil
return newPlanResult(plan, false, operators.TablesUsed(op)...), nil
}

func rewriteSingleTbl(del *sqlparser.Delete) (*sqlparser.Delete, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
if tables[0].AutoIncrement == nil && !ctx.SemTable.ForeignKeysPresent() {
plan := insertUnshardedShortcut(insStmt, ks, tables)
setCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan, operators.QualifiedTables(ks, tables)...), nil
return newPlanResult(plan, false, operators.QualifiedTables(ks, tables)...), nil
}
}

Expand All @@ -80,7 +80,7 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
return nil, err
}

return newPlanResult(plan, operators.TablesUsed(op)...), nil
return newPlanResult(plan, false, operators.TablesUsed(op)...), nil
}

func errOutIfPlanCannotBeConstructed(ctx *plancontext.PlanningContext, vTbl *vindexes.Table) error {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/locktables.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
// buildLockPlan plans lock tables statement.
func buildLockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
log.Warningf("Lock Tables statement is ignored: %v", stmt)
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}

// buildUnlockPlan plans lock tables statement.
func buildUnlockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
return newPlanResult(&engine.Unlock{}), nil
return newPlanResult(&engine.Unlock{}, false), nil
}
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func buildAlterMigrationThrottleAppPlan(query string, alterMigration *sqlparser.
return newPlanResult(&engine.ThrottleApp{
Keyspace: keyspace,
ThrottledAppRule: throttledAppRule,
}), nil
}, false), nil
}

func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigration, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -116,7 +116,7 @@ func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigrat
TargetDestination: dest,
Query: query,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func buildRevertMigrationPlan(query string, stmt *sqlparser.RevertMigration, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -145,7 +145,7 @@ func buildRevertMigrationPlan(query string, stmt *sqlparser.RevertMigration, vsc
Stmt: stmt,
Query: query,
}
return newPlanResult(emig), nil
return newPlanResult(emig, false), nil
}

func buildShowMigrationLogsPlan(query string, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -173,5 +173,5 @@ func buildShowMigrationLogsPlan(query string, vschema plancontext.VSchema, cfg d
TargetDestination: dest,
Query: query,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/other_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ func buildOtherReadAndAdmin(sql string, vschema plancontext.VSchema) (*planResul
TargetDestination: destination,
Query: sql, // This is original sql query to be passed as the parser can provide partial ddl AST.
SingleShardOnly: true,
}), nil
}, false), nil
}
Loading

0 comments on commit b7c5b7e

Please sign in to comment.