Skip to content

Commit

Permalink
feat: wait for prepare pool empty in onlineDDL, recheck query rules f…
Browse files Browse the repository at this point in the history
…or prepare transactions

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Aug 23, 2024
1 parent 0be1027 commit 9710dc0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 2 deletions.
13 changes: 12 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type Executor struct {
ts *topo.Server
lagThrottler *throttle.Throttler
toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool)
IsPreparedPoolEmpty func(tableName string) bool
requestGCChecksFunc func()
tabletAlias *topodatapb.TabletAlias

Expand Down Expand Up @@ -238,6 +239,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top
tabletTypeFunc func() topodatapb.TabletType,
toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool),
requestGCChecksFunc func(),
isPreparedPoolEmpty func(tableName string) bool,
) *Executor {
// sanitize flags
if maxConcurrentOnlineDDLs < 1 {
Expand All @@ -255,6 +257,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top
ts: ts,
lagThrottler: lagThrottler,
toggleBufferTableFunc: toggleBufferTableFunc,
IsPreparedPoolEmpty: isPreparedPoolEmpty,
requestGCChecksFunc: requestGCChecksFunc,
ticks: timer.NewTimer(migrationCheckInterval),
// Gracefully return an error if any caller tries to execute
Expand Down Expand Up @@ -870,7 +873,10 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
threadId := row.AsInt64("trx_mysql_thread_id", 0)
log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, _ = conn.Conn.ExecuteFetch(killConnection, 1, false)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
log.Errorf("Unable to kill the connection %d: %v", threadId, err)
}
}
}
}
Expand Down Expand Up @@ -1102,6 +1108,11 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
time.Sleep(100 * time.Millisecond)

if shouldForceCutOver {
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
// This will help in keeping the atomicity guarantee of a prepared transaction.
if !e.IsPreparedPoolEmpty(onlineDDL.Table) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot force cut-over on non-empty prepared pool for table: %s", onlineDDL.Table)
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
return vterrors.VT10002("cannot prepare the transaction on a reserved connection")
}

// Fail Prepare if any query rule disallows it.
// This could be due to ongoing cutover happening in vreplication workflow
// regarding OnlineDDL or MoveTables.
for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...)
if qr != nil {
Expand All @@ -101,6 +104,28 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err)
}

// Recheck the rules. As some prepare transaction could have passed the first check.
// If they are put in the prepared pool, then vreplication workflow waits.
// This check helps reject the prepare that came later.
for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...)
if qr != nil {
act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{})
if act != rules.QRContinue {
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
dte.te.preparedPool.FetchForRollback(dtid)
return vterrors.VT10002("cannot prepare the transaction due to query rule")
}
}
}

// If OnlineDDL killed the connection. We should avoid the prepare for it.
if conn.IsClosed() {
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
dte.te.preparedPool.FetchForRollback(dtid)
return vterrors.VT10002("cannot prepare the transaction on a closed connection")
}

return dte.inTransaction(func(localConn *StatefulConnection) error {
return dte.te.twoPC.SaveRedo(dte.ctx, localConn, dtid, conn.TxProperties().Queries)
})
Expand Down
65 changes: 65 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -185,6 +189,61 @@ func TestTxExecutorPrepareRedoCommitFail(t *testing.T) {
require.Contains(t, err.Error(), "commit fail")
}

func TestExecutorPrepareRuleFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

alterRule := rules.NewQueryRule("disable update", "disable update", rules.QRBuffer)
alterRule.AddTableCond("test_table")

r := rules.New()
r.Add(alterRule)
txe.qe.queryRuleSources.RegisterSource("bufferQuery")
err := txe.qe.queryRuleSources.SetRules("bufferQuery", r)
require.NoError(t, err)

// start a transaction
txid := newTxForPrep(ctx, tsv)

// taint the connection.
sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property")
require.NoError(t, err)
sc.txProps.Queries = append(sc.txProps.Queries, tx.Query{
Sql: "update test_table set col = 5",
PlanType: planbuilder.PlanUpdate,
Tables: []string{"test_table"},
})
sc.Unlock()

// try 2pc commit of Metadata Manager.
err = txe.Prepare(txid, "aa")
require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction due to query rule")
}

func TestExecutorPrepareConnFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

// start a transaction
txid := newTxForPrep(ctx, tsv)

// taint the connection.
sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property")
require.NoError(t, err)
sc.Unlock()
sc.dbConn.Close()

// try 2pc commit of Metadata Manager.
err = txe.Prepare(txid, "aa")
require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a closed connection")
}

func TestTxExecutorCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -610,6 +669,11 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, smallTxPool, db)
cfg := tabletenv.NewDefaultConfig()
cfg.DB = newDBConfigs(db)
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest")
se := schema.NewEngine(env)
qe := NewQueryEngine(env, se)
db.AddQueryPattern("insert into _vt\\.redo_state\\(dtid, state, time_created\\) values \\('aa', 1,.*", &sqltypes.Result{})
db.AddQueryPattern("insert into _vt\\.redo_statement.*", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
Expand All @@ -619,6 +683,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv
ctx: ctx,
logStats: logStats,
te: tsv.te,
qe: qe,
}, tsv, db
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsPreparedForTable)

tsv.sm = &stateManager{
statelessql: tsv.statelessql,
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,22 @@ func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection {
pp.reserved = make(map[string]error)
return conns
}

func (pp *TxPreparedPool) IsPreparedForTable(tableName string) bool {
pp.mu.Lock()
defer pp.mu.Unlock()
// If the pool is shutdown, we do not know the correct state of prepared transactions.
if !pp.open {
return false
}
for _, connection := range pp.conns {
for _, query := range connection.txProps.Queries {
for _, table := range query.Tables {
if table == tableName {
return false
}
}
}
}
return true
}

0 comments on commit 9710dc0

Please sign in to comment.