Skip to content

Commit

Permalink
feat: mark the preparedPool shutdown after the connections have been …
Browse files Browse the repository at this point in the history
…fetched for rolling back

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 10, 2024
1 parent 9e9595a commit 1543fab
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (te *TxEngine) transition(state txEngineState) {
te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB())

if te.twopcEnabled && te.state == AcceptingReadAndWrite {
// Set the preparedPool to start accepting connections.
te.preparedPool.shutdown = false
// If there are errors, we choose to raise an alert and
// continue anyway. Serving traffic is considered more important
// than blocking everything for the sake of a few transactions.
Expand Down Expand Up @@ -442,7 +444,7 @@ func (te *TxEngine) shutdownTransactions() {

func (te *TxEngine) rollbackPrepared() {
ctx := tabletenv.LocalContext()
for _, conn := range te.preparedPool.FetchAll() {
for _, conn := range te.preparedPool.FetchAllForRollback() {
te.txPool.Rollback(ctx, conn)
conn.Release(tx.TxRollback)
}
Expand Down
16 changes: 14 additions & 2 deletions go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type TxPreparedPool struct {
mu sync.Mutex
conns map[string]*StatefulConnection
reserved map[string]error
// shutdown tells if the prepared pool has been drained and shutdown.
shutdown bool
capacity int
}

Expand All @@ -55,6 +57,10 @@ func NewTxPreparedPool(capacity int) *TxPreparedPool {
func (pp *TxPreparedPool) Put(c *StatefulConnection, dtid string) error {
pp.mu.Lock()
defer pp.mu.Unlock()
// If the pool is shutdown, we don't accept new prepared transactions.
if pp.shutdown {
return errors.New("pool is shutdown")
}
if _, ok := pp.reserved[dtid]; ok {
return errors.New("duplicate DTID in Prepare: " + dtid)
}
Expand Down Expand Up @@ -95,6 +101,11 @@ func (pp *TxPreparedPool) FetchForRollback(dtid string) *StatefulConnection {
func (pp *TxPreparedPool) FetchForCommit(dtid string) (*StatefulConnection, error) {
pp.mu.Lock()
defer pp.mu.Unlock()
// If the pool is shutdown, we don't have any connections to return.
// That however doesn't mean this transaction was committed, it could very well have been rollbacked.
if pp.shutdown {
return nil, errors.New("pool is shutdown")
}
if err, ok := pp.reserved[dtid]; ok {
return nil, err
}
Expand All @@ -121,11 +132,12 @@ func (pp *TxPreparedPool) Forget(dtid string) {
delete(pp.reserved, dtid)
}

// FetchAll removes all connections and returns them as a list.
// FetchAllForRollback removes all connections and returns them as a list.
// It also forgets all reserved dtids.
func (pp *TxPreparedPool) FetchAll() []*StatefulConnection {
func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.shutdown = true
conns := make([]*StatefulConnection, 0, len(pp.conns))
for _, c := range pp.conns {
conns = append(conns, c)
Expand Down
12 changes: 5 additions & 7 deletions go/vt/vttablet/tabletserver/tx_prep_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,9 @@ func TestPrepFetchAll(t *testing.T) {
conn2 := &StatefulConnection{}
pp.Put(conn1, "aa")
pp.Put(conn2, "bb")
got := pp.FetchAll()
if len(got) != 2 {
t.Errorf("FetchAll len: %d, want 2", len(got))
}
if len(pp.conns) != 0 {
t.Errorf("len(pp.conns): %d, want 0", len(pp.conns))
}
got := pp.FetchAllForRollback()
require.Len(t, got, 2)
require.Len(t, pp.conns, 0)
_, err := pp.FetchForCommit("aa")
require.ErrorContains(t, err, "pool is shutdown")
}

0 comments on commit 1543fab

Please sign in to comment.