diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index db73f67ed64..2d1754a0e6c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -178,6 +178,7 @@ type Executor struct { ts *topo.Server lagThrottler *throttle.Throttler toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool) + IsPreparedPoolEmpty func(tableName string) bool requestGCChecksFunc func() tabletAlias *topodatapb.TabletAlias @@ -238,6 +239,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top tabletTypeFunc func() topodatapb.TabletType, toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool), requestGCChecksFunc func(), + isPreparedPoolEmpty func(tableName string) bool, ) *Executor { // sanitize flags if maxConcurrentOnlineDDLs < 1 { @@ -255,6 +257,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top ts: ts, lagThrottler: lagThrottler, toggleBufferTableFunc: toggleBufferTableFunc, + IsPreparedPoolEmpty: isPreparedPoolEmpty, requestGCChecksFunc: requestGCChecksFunc, ticks: timer.NewTimer(migrationCheckInterval), // Gracefully return an error if any caller tries to execute @@ -870,7 +873,10 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa threadId := row.AsInt64("trx_mysql_thread_id", 0) log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId) killConnection := fmt.Sprintf("KILL %d", threadId) - _, _ = conn.Conn.ExecuteFetch(killConnection, 1, false) + _, err = conn.Conn.ExecuteFetch(killConnection, 1, false) + if err != nil { + log.Errorf("Unable to kill the connection %d: %v", threadId, err) + } } } } @@ -1102,6 +1108,11 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh time.Sleep(100 * time.Millisecond) if shouldForceCutOver { + // We should only proceed with forceful cut over if there is no pending atomic transaction for the table. + // This will help in keeping the atomicity guarantee of a prepared transaction. + if !e.IsPreparedPoolEmpty(onlineDDL.Table) { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot force cut-over on non-empty prepared pool for table: %s", onlineDDL.Table) + } if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 97dc035f461..395dd5e7a59 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -84,6 +84,9 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.VT10002("cannot prepare the transaction on a reserved connection") } + // Fail Prepare if any query rule disallows it. + // This could be due to ongoing cutover happening in vreplication workflow + // regarding OnlineDDL or MoveTables. for _, query := range conn.TxProperties().Queries { qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...) if qr != nil { @@ -101,6 +104,28 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } + // Recheck the rules. As some prepare transaction could have passed the first check. + // If they are put in the prepared pool, then vreplication workflow waits. + // This check helps reject the prepare that came later. + for _, query := range conn.TxProperties().Queries { + qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...) + if qr != nil { + act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{}) + if act != rules.QRContinue { + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + dte.te.preparedPool.FetchForRollback(dtid) + return vterrors.VT10002("cannot prepare the transaction due to query rule") + } + } + } + + // If OnlineDDL killed the connection. We should avoid the prepare for it. + if conn.IsClosed() { + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + dte.te.preparedPool.FetchForRollback(dtid) + return vterrors.VT10002("cannot prepare the transaction on a closed connection") + } + return dte.inTransaction(func(localConn *StatefulConnection) error { return dte.te.twoPC.SaveRedo(dte.ctx, localConn, dtid, conn.TxProperties().Queries) }) diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index fb45ab454fc..95b071f6dbb 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -26,6 +26,10 @@ import ( "time" "vitess.io/vitess/go/event/syslogger" + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" + "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" + "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "github.com/stretchr/testify/require" @@ -185,6 +189,61 @@ func TestTxExecutorPrepareRedoCommitFail(t *testing.T) { require.Contains(t, err.Error(), "commit fail") } +func TestExecutorPrepareRuleFailure(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + alterRule := rules.NewQueryRule("disable update", "disable update", rules.QRBuffer) + alterRule.AddTableCond("test_table") + + r := rules.New() + r.Add(alterRule) + txe.qe.queryRuleSources.RegisterSource("bufferQuery") + err := txe.qe.queryRuleSources.SetRules("bufferQuery", r) + require.NoError(t, err) + + // start a transaction + txid := newTxForPrep(ctx, tsv) + + // taint the connection. + sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property") + require.NoError(t, err) + sc.txProps.Queries = append(sc.txProps.Queries, tx.Query{ + Sql: "update test_table set col = 5", + PlanType: planbuilder.PlanUpdate, + Tables: []string{"test_table"}, + }) + sc.Unlock() + + // try 2pc commit of Metadata Manager. + err = txe.Prepare(txid, "aa") + require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction due to query rule") +} + +func TestExecutorPrepareConnFailure(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + // start a transaction + txid := newTxForPrep(ctx, tsv) + + // taint the connection. + sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property") + require.NoError(t, err) + sc.Unlock() + sc.dbConn.Close() + + // try 2pc commit of Metadata Manager. + err = txe.Prepare(txid, "aa") + require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a closed connection") +} + func TestTxExecutorCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -610,6 +669,11 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv db = setUpQueryExecutorTest(t) logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") tsv = newTestTabletServer(ctx, smallTxPool, db) + cfg := tabletenv.NewDefaultConfig() + cfg.DB = newDBConfigs(db) + env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") + se := schema.NewEngine(env) + qe := NewQueryEngine(env, se) db.AddQueryPattern("insert into _vt\\.redo_state\\(dtid, state, time_created\\) values \\('aa', 1,.*", &sqltypes.Result{}) db.AddQueryPattern("insert into _vt\\.redo_statement.*", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) @@ -619,6 +683,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv ctx: ctx, logStats: logStats, te: tsv.te, + qe: qe, }, tsv, db } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 4741c9d1211..9f0a9f2e53e 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -186,7 +186,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) - tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks) + tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsPreparedForTable) tsv.sm = &stateManager{ statelessql: tsv.statelessql, diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index c801e208e33..6f91b484f1d 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -169,3 +169,22 @@ func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection { pp.reserved = make(map[string]error) return conns } + +func (pp *TxPreparedPool) IsPreparedForTable(tableName string) bool { + pp.mu.Lock() + defer pp.mu.Unlock() + // If the pool is shutdown, we do not know the correct state of prepared transactions. + if !pp.open { + return false + } + for _, connection := range pp.conns { + for _, query := range connection.txProps.Queries { + for _, table := range query.Tables { + if table == tableName { + return false + } + } + } + } + return true +}