diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index a247ca15aa4..42425fa68a4 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string { // See above reference for more information on each code. const ( // Vitess specific errors, (100-999) - ERNotReplica = ErrorCode(100) - ERNonAtomicCommit = ErrorCode(301) + ERNotReplica = ErrorCode(100) + ERNonAtomicCommit = ErrorCode(301) + ERInAtomicRecovery = ErrorCode(302) // unknown ERUnknownError = ErrorCode(1105) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index dc2aba61b1b..98bc158c4da 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -31,11 +31,13 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) // TestDTCommit tests distributed transaction commit for insert, update and delete operations @@ -580,6 +582,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After MM commit") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during metadata manager commit; transaction will be committed/rollbacked based on the state on recovery", + false, "COMMIT", "ks:40-80,ks:-40") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -656,6 +662,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After RM prepared") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery", + true /* transaction concluded */, "", "") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -714,6 +724,10 @@ func TestDTResolveDuringRMPrepare(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail During RM prepare") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery", + true, "", "") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -776,6 +790,10 @@ func TestDTResolveDuringRMCommit(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail During RM commit") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during resource manager commit; transaction will be committed on recovery", + false, "COMMIT", "ks:40-80,ks:-40") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -851,18 +869,9 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After TR created") - t.Run("ReadTransactionState", func(t *testing.T) { - errStr := err.Error() - indx := strings.Index(errStr, "Fail") - require.Greater(t, indx, 0) - dtid := errStr[0 : indx-2] - res, err := conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) - require.NoError(t, err) - resStr := fmt.Sprintf("%v", res.Rows) - require.Contains(t, resStr, `[[VARCHAR("ks:80-`) - require.Contains(t, resStr, `VARCHAR("PREPARE") DATETIME("`) - require.Contains(t, resStr, `+0000 UTC") VARCHAR("ks:40-80")]]`) - }) + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction record creation; rollback attempted; conclude on recovery", + false, "PREPARE", "ks:40-80") // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) @@ -882,3 +891,67 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { assert.Equal(t, expectations, logTable, "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) } + +type warn struct { + level string + code uint16 + msg string +} + +func toWarn(row sqltypes.Row) warn { + code, _ := row[1].ToUint16() + return warn{ + level: row[0].ToString(), + code: code, + msg: row[2].ToString(), + } +} + +type txStatus struct { + dtid string + state string + rTime string + participants string +} + +func toTxStatus(row sqltypes.Row) txStatus { + return txStatus{ + dtid: row[0].ToString(), + state: row[1].ToString(), + rTime: row[2].ToString(), + participants: row[3].ToString(), + } +} + +func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSession, warnMsg string, + txConcluded bool, txState string, txParticipants string) { + t.Helper() + + qr, err := conn.Execute(context.Background(), "show warnings", nil) + require.NoError(t, err) + require.Len(t, qr.Rows, 1) + + // validate warning output + w := toWarn(qr.Rows[0]) + assert.Equal(t, "Warning", w.level) + assert.EqualValues(t, 302, w.code) + assert.Contains(t, w.msg, warnMsg) + + // extract transaction ID + indx := strings.Index(w.msg, " ") + require.Greater(t, indx, 0) + dtid := w.msg[:indx] + + qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) + require.NoError(t, err) + + // validate transaction status + if txConcluded { + require.Empty(t, qr.Rows) + } else { + tx := toTxStatus(qr.Rows[0]) + assert.Equal(t, dtid, tx.dtid) + assert.Equal(t, txState, tx.state) + assert.Equal(t, txParticipants, tx.participants) + } +} diff --git a/go/vt/vtgate/debug_2pc.go b/go/vt/vtgate/debug_2pc.go index f31f1413007..dc052df33d6 100644 --- a/go/vt/vtgate/debug_2pc.go +++ b/go/vt/vtgate/debug_2pc.go @@ -18,4 +18,52 @@ limitations under the License. package vtgate +import ( + "context" + + "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + const DebugTwoPc = true + +// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true. +func checkTestFailure(ctx context.Context, expectCaller string, target *querypb.Target) error { + callerID := callerid.EffectiveCallerIDFromContext(ctx) + if callerID == nil || callerID.GetPrincipal() != expectCaller { + return nil + } + switch callerID.Principal { + case "TRCreated_FailNow": + log.Errorf("Fail After TR created") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created") + case "RMPrepare_-40_FailNow": + if target.Shard != "-40" { + return nil + } + log.Errorf("Fail During RM prepare") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare") + case "RMPrepared_FailNow": + log.Errorf("Fail After RM prepared") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared") + case "MMCommitted_FailNow": + log.Errorf("Fail After MM commit") + // commit decision is made. Transaction should be committed. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit") + case "RMCommit_-40_FailNow": + if target.Shard != "-40" { + return nil + } + log.Errorf("Fail During RM commit") + // commit decision is made. Transaction should be a committed. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit") + default: + return nil + } +} diff --git a/go/vt/vtgate/engine/transaction_status.go b/go/vt/vtgate/engine/transaction_status.go index a087db08256..61cc72c08d9 100644 --- a/go/vt/vtgate/engine/transaction_status.go +++ b/go/vt/vtgate/engine/transaction_status.go @@ -84,7 +84,7 @@ func (t *TransactionStatus) TryExecute(ctx context.Context, vcursor VCursor, bin if wantfields { res.Fields = t.getFields() } - if transactionState != nil { + if transactionState != nil && transactionState.Dtid != "" { var participantString []string for _, participant := range transactionState.Participants { participantString = append(participantString, fmt.Sprintf("%s:%s", participant.Keyspace, participant.Shard)) diff --git a/go/vt/vtgate/production.go b/go/vt/vtgate/production.go index 83e0cbdddf5..d3b0ff4fe7e 100644 --- a/go/vt/vtgate/production.go +++ b/go/vt/vtgate/production.go @@ -18,6 +18,12 @@ limitations under the License. package vtgate +import ( + "context" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + // This file defines debug constants that are always false. // This file is used for building production code. // We use go build directives to include a file that defines the constant to true @@ -26,3 +32,7 @@ package vtgate // production performance. const DebugTwoPc = false + +func checkTestFailure(_ context.Context, _ string, _ *querypb.Target) error { + return nil +} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 56d45799175..f8b08def10c 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -22,10 +22,7 @@ import ( "strings" "sync" - "github.com/pkg/errors" - "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" @@ -62,6 +59,16 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY, } +type commitPhase int + +const ( + Commit2pcCreateTransaction commitPhase = iota + Commit2pcPrepare + Commit2pcStartCommit + Commit2pcPrepareCommit + Commit2pcConclude +) + // Begin begins a new transaction. If one is already in progress, it commits it // and starts a new one. func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error { @@ -179,7 +186,7 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error } // commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice -func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { +func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) { if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { _ = txc.Rollback(ctx, session) return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits") @@ -190,114 +197,118 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return txc.commitNormal(ctx, session) } - participants := make([]*querypb.Target, 0, len(session.ShardSessions)-1) - for _, s := range session.ShardSessions[1:] { - participants = append(participants, s.Target) - } mmShard := session.ShardSessions[0] + rmShards := session.ShardSessions[1:] dtid := dtids.New(mmShard) - err := txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants) - if err != nil { - // Normal rollback is safe because nothing was prepared yet. - _ = txc.Rollback(ctx, session) + participants := make([]*querypb.Target, len(rmShards)) + for i, s := range rmShards { + participants[i] = s.Target + } + + var txPhase commitPhase + defer func() { + if err == nil { + return + } + txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards) + }() + + txPhase = Commit2pcCreateTransaction + if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { return err } - if DebugTwoPc { - // Test code to simulate a failure after RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "TRCreated_FailNow", nil); failNow { - return errors.Wrapf(err, "%v", dtid) + if DebugTwoPc { // Test code to simulate a failure after RM prepare + if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil { + return terr } } - err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { - if DebugTwoPc { - // Test code to simulate a failure during RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepare_-40_FailNow", s.Target); failNow { - return err + txPhase = Commit2pcPrepare + prepareAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DebugTwoPc { // Test code to simulate a failure during RM prepare + if terr := checkTestFailure(ctx, "RMPrepare_-40_FailNow", s.Target); terr != nil { + return terr } } return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid) - }) - if err != nil { - // TODO(sougou): Perform a more fine-grained cleanup - // including unprepared transactions. - if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, session.ShardSessions[1:], session.logging); resumeErr != nil { - log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) - } - // Return the original error even if the previous operation fails. + } + if err = txc.runSessions(ctx, rmShards, session.logging, prepareAction); err != nil { return err } - if DebugTwoPc { - // Test code to simulate a failure after RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepared_FailNow", nil); failNow { - return err + if DebugTwoPc { // Test code to simulate a failure after RM prepare + if terr := checkTestFailure(ctx, "RMPrepared_FailNow", nil); terr != nil { + return terr } } + txPhase = Commit2pcStartCommit err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err } - if DebugTwoPc { - // Test code to simulate a failure after MM commit - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "MMCommitted_FailNow", nil); failNow { - return err + if DebugTwoPc { // Test code to simulate a failure after MM commit + if terr := checkTestFailure(ctx, "MMCommitted_FailNow", nil); terr != nil { + return terr } } - err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { - if DebugTwoPc { - // Test code to simulate a failure during RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMCommit_-40_FailNow", s.Target); failNow { - return err + txPhase = Commit2pcPrepareCommit + prepareCommitAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DebugTwoPc { // Test code to simulate a failure during RM prepare + if terr := checkTestFailure(ctx, "RMCommit_-40_FailNow", s.Target); terr != nil { + return terr } } return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid) - }) - if err != nil { + } + if err = txc.runSessions(ctx, rmShards, session.logging, prepareCommitAction); err != nil { return err } - return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) + // At this point, application can continue forward. + // The transaction is already committed. + // This step is to clean up the transaction metadata. + txPhase = Commit2pcConclude + _ = txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) + return nil } -func checkTestFailure(callerID *vtrpcpb.CallerID, expectCaller string, target *querypb.Target) (bool, error) { - if callerID == nil || callerID.GetPrincipal() != expectCaller { - return false, nil - } - switch callerID.Principal { - case "TRCreated_FailNow": - log.Errorf("Fail After TR created") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created") - case "RMPrepare_-40_FailNow": - if target.Shard != "-40" { - return false, nil +func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) { + switch txPhase { + case Commit2pcCreateTransaction: + // Normal rollback is safe because nothing was prepared yet. + if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil { + log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr) } - log.Errorf("Fail During RM prepare") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare") - case "RMPrepared_FailNow": - log.Errorf("Fail After RM prepared") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared") - case "MMCommitted_FailNow": - log.Errorf("Fail After MM commit") - // commit decision is made. Transaction should be committed. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit") - case "RMCommit_-40_FailNow": - if target.Shard != "-40" { - return false, nil + case Commit2pcPrepare: + // Rollback the prepared and unprepared transactions. + if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil { + log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) } - log.Errorf("Fail During RM commit") - // commit decision is made. Transaction should be a committed. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit") - default: - return false, nil } + session.RecordWarning(&querypb.QueryWarning{ + Code: uint32(sqlerror.ERInAtomicRecovery), + Message: createWarningMessage(dtid, txPhase)}) +} + +func createWarningMessage(dtid string, txPhase commitPhase) string { + warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) + switch txPhase { + case Commit2pcCreateTransaction: + warningMsg += " transaction record creation; rollback attempted; conclude on recovery" + case Commit2pcPrepare: + warningMsg += " transaction prepare phase; prepare transaction rollback attempted; conclude on recovery" + case Commit2pcStartCommit: + warningMsg += " metadata manager commit; transaction will be committed/rollbacked based on the state on recovery" + case Commit2pcPrepareCommit: + warningMsg += " resource manager commit; transaction will be committed on recovery" + case Commit2pcConclude: + warningMsg += " transaction conclusion" + } + return warningMsg } // Rollback rolls back the current transaction. There are no retries on this operation. @@ -467,24 +478,32 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa } // rollbackTx rollbacks the specified distributed transaction. +// Rollbacks happens on the metadata manager and all participants irrespective of the failure. func (txc *TxConn) rollbackTx(ctx context.Context, dtid string, mmShard *vtgatepb.Session_ShardSession, participants []*vtgatepb.Session_ShardSession, logging *executeLogger) error { - qs, err := txc.queryService(ctx, mmShard.TabletAlias) - if err != nil { - return err + var errs []error + if mmErr := txc.rollbackMM(ctx, dtid, mmShard); mmErr != nil { + errs = append(errs, mmErr) } - if err := qs.SetRollback(ctx, mmShard.Target, dtid, mmShard.TransactionId); err != nil { - return err - } - err = txc.runSessions(ctx, participants, logging, func(ctx context.Context, session *vtgatepb.Session_ShardSession, logger *executeLogger) error { + if rmErr := txc.runSessions(ctx, participants, logging, func(ctx context.Context, session *vtgatepb.Session_ShardSession, logger *executeLogger) error { return txc.tabletGateway.RollbackPrepared(ctx, session.Target, dtid, session.TransactionId) - }) - if err != nil { + }); rmErr != nil { + errs = append(errs, rmErr) + } + if err := vterrors.Aggregate(errs); err != nil { return err } return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } +func (txc *TxConn) rollbackMM(ctx context.Context, dtid string, mmShard *vtgatepb.Session_ShardSession) error { + qs, err := txc.queryService(ctx, mmShard.TabletAlias) + if err != nil { + return err + } + return qs.SetRollback(ctx, mmShard.Target, dtid, mmShard.TransactionId) +} + func (txc *TxConn) resumeRollback(ctx context.Context, target *querypb.Target, transaction *querypb.TransactionMetadata) error { err := txc.runTargets(transaction.Participants, func(t *querypb.Target) error { return txc.tabletGateway.RollbackPrepared(ctx, t, transaction.Dtid, 0) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 74329153936..ed977b75051 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -1090,9 +1090,7 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) { sbc0.MustFailConcludeTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.NoError(t, err) // ConcludeTransaction is best-effort as it does not impact the outcome. assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")