Skip to content

Commit

Permalink
Wait for prepared transactions to go through when disabling query ser…
Browse files Browse the repository at this point in the history
…vice

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Sep 25, 2024
1 parent 3743f09 commit f2ef7e5
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 22 deletions.
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -354,7 +355,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)

// Setting super_read_only `OFF` so that we can run the DDL commands
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil {
Expand Down Expand Up @@ -600,7 +601,7 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)

// If using semi-sync, we need to enable source-side.
if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil {
Expand Down Expand Up @@ -925,7 +926,7 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str

// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,17 @@ func (tm *TabletManager) createKeyspaceShard(ctx context.Context) (*topo.ShardIn
return nil, err
}

tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil)
if err := tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil); err != nil {
return nil, err
}

// Rebuild keyspace if this the first tablet in this keyspace/cell
srvKeyspace, err := tm.TopoServer.GetSrvKeyspace(ctx, tm.tabletAlias.Cell, tablet.Keyspace)
switch {
case err == nil:
tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace)
if err := tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace); err != nil {
return nil, err
}
case topo.IsErrType(err, topo.NoNode):
var rebuildKsCtx context.Context
rebuildKsCtx, tm._rebuildKeyspaceCancel = context.WithCancel(tm.BatchCtx)
Expand Down Expand Up @@ -603,7 +607,7 @@ func (tm *TabletManager) rebuildKeyspace(ctx context.Context, done chan<- struct
defer func() {
log.Infof("Keyspace rebuilt: %v", keyspace)
if ctx.Err() == nil {
tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace)
_ = tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace)
}
close(done)
}()
Expand Down
27 changes: 22 additions & 5 deletions go/vt/vttablet/tabletmanager/tm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -135,11 +136,10 @@ func (ts *tmState) RefreshFromTopo(ctx context.Context) error {
if err != nil {
return err
}
ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace)
return nil
return ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace)
}

func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) {
func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) error {
ts.mu.Lock()
defer ts.mu.Unlock()

Expand All @@ -157,6 +157,7 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar
if srvKeyspace != nil {
ts.isShardServing = make(map[topodatapb.TabletType]bool)
ts.tabletControls = make(map[topodatapb.TabletType]bool)
ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, true)

for _, partition := range srvKeyspace.GetPartitions() {

Expand All @@ -169,15 +170,31 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar
for _, tabletControl := range partition.GetShardTabletControls() {
if key.KeyRangeEqual(tabletControl.GetKeyRange(), ts.KeyRange()) {
if tabletControl.QueryServiceDisabled {
ts.tabletControls[partition.GetServedType()] = true
err := ts.prepareForDisableQueryService(ctx, partition.GetServedType())
if err != nil {
return err
}
}
break
}
}
}
}

_ = ts.updateLocked(ctx)
return ts.updateLocked(ctx)
}

// prepareForDisableQueryService prepares the tablet for disabling query service.
func (ts *tmState) prepareForDisableQueryService(ctx context.Context, servType topodatapb.TabletType) error {
if servType == topodatapb.TabletType_PRIMARY {
ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, false)
err := ts.tm.QueryServiceControl.WaitForPreparedTwoPCTransactions(ctx)
if err != nil {
return err
}
}
ts.tabletControls[servType] = true
return nil
}

func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error {
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type Controller interface {
RedoPreparedTransactions()

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
SetTwoPCAllowed(bool)
SetTwoPCAllowed(int, bool)

// UnresolvedTransactions returns all unresolved transactions list
UnresolvedTransactions(ctx context.Context, target *querypb.Target, abandonAgeSeconds int64) ([]*querypb.TransactionMetadata, error)
Expand All @@ -111,6 +111,9 @@ type Controller interface {

// RollbackPrepared rolls back the prepared transaction and removes the transaction log.
RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) error

// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
if !dte.te.twopcAllowed {
if !dte.te.IsTwoPCAllowed() {
return vterrors.VT10002("two-pc is enabled, but semi-sync is not")
}
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
Expand Down
24 changes: 21 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmpty)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmptyForTable)

tsv.sm = &stateManager{
statelessql: tsv.statelessql,
Expand Down Expand Up @@ -700,6 +700,24 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
)
}

// WaitForPreparedTwoPCTransactions waits for all the prepared transactions to complete.
func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) error {
if tsv.te.preparedPool.IsEmpty() {
return nil
}
for {
select {
case <-ctx.Done():
// Return an error if we run out of time.
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Prepared transactions have not been resolved yet")
case <-time.After(100 * time.Millisecond):
if tsv.te.preparedPool.IsEmpty() {
return nil
}
}
}
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return tsv.execRequest(
Expand Down Expand Up @@ -1700,8 +1718,8 @@ func (tsv *TabletServer) RedoPreparedTransactions() {
}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) {
tsv.te.twopcAllowed = allowed
func (tsv *TabletServer) SetTwoPCAllowed(twoPCAllowedPosition int, allowed bool) {
tsv.te.twopcAllowed[twoPCAllowedPosition] = allowed
}

// HandlePanic is part of the queryservice.QueryService interface
Expand Down
25 changes: 21 additions & 4 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ type TxEngine struct {

// twopcEnabled is the flag value of whether the user has enabled twopc or not.
twopcEnabled bool
// twopcAllowed is wether it is safe to allow two pc transactions or not.
// If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
twopcAllowed bool
// twopcAllowed is whether it is safe to allow two pc transactions or not.
// There are multiple reasons to disallow TwoPC:
// 1. If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
// 2. TabletControls have been set in the tablet record, and Query service is going to be disabled.
twopcAllowed []bool
shutdownGracePeriod time.Duration
coordinatorAddress string
abandonAge time.Duration
Expand All @@ -94,6 +96,11 @@ type TxEngine struct {
dxNotify func()
}

const (
TwoPCAllowed_SemiSync = iota
TwoPCAllowed_TabletControls
)

// NewTxEngine creates a new TxEngine.
func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
config := env.Config()
Expand All @@ -106,7 +113,7 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
te.txPool = NewTxPool(env, limiter)
// We initially allow twoPC (handles vttablet restarts).
// We will disallow them, when a new tablet is promoted if semi-sync is turned off.
te.twopcAllowed = true
te.twopcAllowed = []bool{true, true}
te.twopcEnabled = config.TwoPCEnable
if te.twopcEnabled {
if config.TwoPCAbandonAge <= 0 {
Expand Down Expand Up @@ -708,3 +715,13 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnect
_, _, err = te.txPool.begin(ctx, nil, false, sc, nil)
return sc, err
}

// IsTwoPCAllowed checks if TwoPC is allowed.
func (te *TxEngine) IsTwoPCAllowed() bool {
for _, allowed := range te.twopcAllowed {
if !allowed {
return false
}
}
return true
}
17 changes: 16 additions & 1 deletion go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection {
return conns
}

func (pp *TxPreparedPool) IsEmpty(tableName string) bool {
// IsEmptyForTable returns true if no prepared transactions are found for the table.
func (pp *TxPreparedPool) IsEmptyForTable(tableName string) bool {
pp.mu.Lock()
defer pp.mu.Unlock()
if !pp.twoPCEnabled {
Expand All @@ -194,3 +195,17 @@ func (pp *TxPreparedPool) IsEmpty(tableName string) bool {
}
return true
}

// IsEmpty returns true if the pool is empty.
func (pp *TxPreparedPool) IsEmpty() bool {
pp.mu.Lock()
defer pp.mu.Unlock()
if !pp.twoPCEnabled {
return true
}
// If the pool is shutdown, we do not know the correct state of prepared transactions.
if !pp.open {
return false
}
return len(pp.conns) == 0
}
8 changes: 7 additions & 1 deletion go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott
func (tqsc *Controller) RedoPreparedTransactions() {}

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
func (tqsc *Controller) SetTwoPCAllowed(bool) {
func (tqsc *Controller) SetTwoPCAllowed(int, bool) {
}

// UnresolvedTransactions is part of the tabletserver.Controller interface
Expand All @@ -260,6 +260,12 @@ func (tqsc *Controller) RollbackPrepared(context.Context, *querypb.Target, strin
return nil
}

// WaitForPreparedTwoPCTransactions is part of the tabletserver.Controller interface
func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error {
tqsc.MethodCalled["WaitForPreparedTwoPCTransactions"] = true
return nil
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down

0 comments on commit f2ef7e5

Please sign in to comment.