From f8758ac987c9964c2b88b8cdcbd37339b6b60efe Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 24 Jul 2024 17:51:28 +0530 Subject: [PATCH 1/7] refactor: move test code under the debug code file Signed-off-by: Harshit Gangal --- go/vt/vtgate/debug_2pc.go | 48 ++++++++++++++++++++++++++++++++ go/vt/vtgate/production.go | 10 +++++++ go/vt/vtgate/tx_conn.go | 57 +++++++------------------------------- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/go/vt/vtgate/debug_2pc.go b/go/vt/vtgate/debug_2pc.go index f31f1413007..dc052df33d6 100644 --- a/go/vt/vtgate/debug_2pc.go +++ b/go/vt/vtgate/debug_2pc.go @@ -18,4 +18,52 @@ limitations under the License. package vtgate +import ( + "context" + + "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + const DebugTwoPc = true + +// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true. +func checkTestFailure(ctx context.Context, expectCaller string, target *querypb.Target) error { + callerID := callerid.EffectiveCallerIDFromContext(ctx) + if callerID == nil || callerID.GetPrincipal() != expectCaller { + return nil + } + switch callerID.Principal { + case "TRCreated_FailNow": + log.Errorf("Fail After TR created") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created") + case "RMPrepare_-40_FailNow": + if target.Shard != "-40" { + return nil + } + log.Errorf("Fail During RM prepare") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare") + case "RMPrepared_FailNow": + log.Errorf("Fail After RM prepared") + // no commit decision is made. Transaction should be a rolled back. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared") + case "MMCommitted_FailNow": + log.Errorf("Fail After MM commit") + // commit decision is made. Transaction should be committed. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit") + case "RMCommit_-40_FailNow": + if target.Shard != "-40" { + return nil + } + log.Errorf("Fail During RM commit") + // commit decision is made. Transaction should be a committed. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit") + default: + return nil + } +} diff --git a/go/vt/vtgate/production.go b/go/vt/vtgate/production.go index 83e0cbdddf5..d3b0ff4fe7e 100644 --- a/go/vt/vtgate/production.go +++ b/go/vt/vtgate/production.go @@ -18,6 +18,12 @@ limitations under the License. package vtgate +import ( + "context" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + // This file defines debug constants that are always false. // This file is used for building production code. // We use go build directives to include a file that defines the constant to true @@ -26,3 +32,7 @@ package vtgate // production performance. const DebugTwoPc = false + +func checkTestFailure(_ context.Context, _ string, _ *querypb.Target) error { + return nil +} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 56d45799175..0aeab986ea6 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" @@ -205,16 +204,16 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { if DebugTwoPc { // Test code to simulate a failure after RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "TRCreated_FailNow", nil); failNow { - return errors.Wrapf(err, "%v", dtid) + if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil { + return errors.Wrapf(terr, "%v", dtid) } } err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepare_-40_FailNow", s.Target); failNow { - return err + if terr := checkTestFailure(ctx, "RMPrepare_-40_FailNow", s.Target); terr != nil { + return terr } } return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid) @@ -231,8 +230,8 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { if DebugTwoPc { // Test code to simulate a failure after RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepared_FailNow", nil); failNow { - return err + if terr := checkTestFailure(ctx, "RMPrepared_FailNow", nil); terr != nil { + return terr } } @@ -243,16 +242,16 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { if DebugTwoPc { // Test code to simulate a failure after MM commit - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "MMCommitted_FailNow", nil); failNow { - return err + if terr := checkTestFailure(ctx, "MMCommitted_FailNow", nil); terr != nil { + return terr } } err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare - if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMCommit_-40_FailNow", s.Target); failNow { - return err + if terr := checkTestFailure(ctx, "RMCommit_-40_FailNow", s.Target); terr != nil { + return terr } } return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid) @@ -264,42 +263,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } -func checkTestFailure(callerID *vtrpcpb.CallerID, expectCaller string, target *querypb.Target) (bool, error) { - if callerID == nil || callerID.GetPrincipal() != expectCaller { - return false, nil - } - switch callerID.Principal { - case "TRCreated_FailNow": - log.Errorf("Fail After TR created") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created") - case "RMPrepare_-40_FailNow": - if target.Shard != "-40" { - return false, nil - } - log.Errorf("Fail During RM prepare") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare") - case "RMPrepared_FailNow": - log.Errorf("Fail After RM prepared") - // no commit decision is made. Transaction should be a rolled back. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared") - case "MMCommitted_FailNow": - log.Errorf("Fail After MM commit") - // commit decision is made. Transaction should be committed. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit") - case "RMCommit_-40_FailNow": - if target.Shard != "-40" { - return false, nil - } - log.Errorf("Fail During RM commit") - // commit decision is made. Transaction should be a committed. - return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit") - default: - return false, nil - } -} - // Rollback rolls back the current transaction. There are no retries on this operation. func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error { if !session.InTransaction() { From 68eb896ca8e65d53df6e953a41f7a192f3c4c790 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 24 Jul 2024 20:26:32 +0530 Subject: [PATCH 2/7] feat: record warning on 2pc commit failure Signed-off-by: Harshit Gangal --- go/mysql/sqlerror/constants.go | 5 +++-- go/vt/vtgate/tx_conn.go | 41 +++++++++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index a247ca15aa4..42425fa68a4 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string { // See above reference for more information on each code. const ( // Vitess specific errors, (100-999) - ERNotReplica = ErrorCode(100) - ERNonAtomicCommit = ErrorCode(301) + ERNotReplica = ErrorCode(100) + ERNonAtomicCommit = ErrorCode(301) + ERInAtomicRecovery = ErrorCode(302) // unknown ERUnknownError = ErrorCode(1105) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 0aeab986ea6..d0bfc88bdd4 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -61,6 +61,16 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY, } +type CommitPhase int + +const ( + COMMIT2PC_CREATETRANSACTION = iota + COMMIT2PC_PREPARE = iota + COMMIT2PC_STARTCOMMIT = iota + COMMIT2PC_PREPARECOMMIT + COMMIT2PC_CONCLUDE +) + // Begin begins a new transaction. If one is already in progress, it commits it // and starts a new one. func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error { @@ -178,7 +188,7 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error } // commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice -func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { +func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) { if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { _ = txc.Rollback(ctx, session) return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits") @@ -195,8 +205,29 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } mmShard := session.ShardSessions[0] dtid := dtids.New(mmShard) - err := txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants) - if err != nil { + + txPhase := COMMIT2PC_CREATETRANSACTION + defer func() { + if err == nil { + return + } + warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) + switch txPhase { + case COMMIT2PC_CREATETRANSACTION: + warningMsg += " transaction record creation" + case COMMIT2PC_PREPARE: + warningMsg += " transaction prepare phase" + case COMMIT2PC_STARTCOMMIT: + warningMsg += " metadata manager commit" + case COMMIT2PC_PREPARECOMMIT: + warningMsg += " resource manager commit" + case COMMIT2PC_CONCLUDE: + warningMsg += " transaction conclusion" + } + session.RecordWarning(&querypb.QueryWarning{Code: uint32(sqlerror.ERInAtomicRecovery), Message: warningMsg}) + }() + + if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { // Normal rollback is safe because nothing was prepared yet. _ = txc.Rollback(ctx, session) return err @@ -209,6 +240,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_PREPARE err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare @@ -235,6 +267,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_STARTCOMMIT err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err @@ -247,6 +280,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_PREPARECOMMIT err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare @@ -260,6 +294,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return err } + txPhase = COMMIT2PC_CONCLUDE return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } From 27bfe7062bd5c2323936c8d647bc36c300701238 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 24 Jul 2024 21:38:37 +0530 Subject: [PATCH 3/7] test: added e2e test Signed-off-by: Harshit Gangal --- .../endtoend/transaction/twopc/twopc_test.go | 34 ++++++++++++++++++ go/vt/vtgate/tx_conn.go | 35 +++++++++++-------- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index dc2aba61b1b..da8db0952a7 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -882,3 +882,37 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { assert.Equal(t, expectations, logTable, "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) } + +// TestDTWarningAfterMMCommit tests that failure after MM commit returns a warning. +func TestDTWarningAfterMMCommit(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(9,'baz')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail After MM commit") + + qr, err := conn.Execute(qCtx, "show warnings", nil) + require.NoError(t, err) + require.Contains(t, fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("Warning") UINT16(302) VARCHAR("ks:80-:`) + require.Contains(t, fmt.Sprintf("%v", qr.Rows), `distributed transaction ID failed during metadata manager commit`) +} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index d0bfc88bdd4..3a17e1a6647 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -211,20 +211,9 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err if err == nil { return } - warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) - switch txPhase { - case COMMIT2PC_CREATETRANSACTION: - warningMsg += " transaction record creation" - case COMMIT2PC_PREPARE: - warningMsg += " transaction prepare phase" - case COMMIT2PC_STARTCOMMIT: - warningMsg += " metadata manager commit" - case COMMIT2PC_PREPARECOMMIT: - warningMsg += " resource manager commit" - case COMMIT2PC_CONCLUDE: - warningMsg += " transaction conclusion" - } - session.RecordWarning(&querypb.QueryWarning{Code: uint32(sqlerror.ERInAtomicRecovery), Message: warningMsg}) + session.RecordWarning(&querypb.QueryWarning{ + Code: uint32(sqlerror.ERInAtomicRecovery), + Message: createWarningMessage(dtid, txPhase)}) }() if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { @@ -236,6 +225,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err if DebugTwoPc { // Test code to simulate a failure after RM prepare if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil { + _ = txc.Rollback(ctx, session) return errors.Wrapf(terr, "%v", dtid) } } @@ -298,6 +288,23 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } +func createWarningMessage(dtid string, txPhase int) string { + warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) + switch txPhase { + case COMMIT2PC_CREATETRANSACTION: + warningMsg += " transaction record creation" + case COMMIT2PC_PREPARE: + warningMsg += " transaction prepare phase" + case COMMIT2PC_STARTCOMMIT: + warningMsg += " metadata manager commit" + case COMMIT2PC_PREPARECOMMIT: + warningMsg += " resource manager commit" + case COMMIT2PC_CONCLUDE: + warningMsg += " transaction conclusion" + } + return warningMsg +} + // Rollback rolls back the current transaction. There are no retries on this operation. func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error { if !session.InTransaction() { From 11483db58f63eb5f73ca7a08b46da3478a4985d5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 25 Jul 2024 18:00:19 +0530 Subject: [PATCH 4/7] update error flow for commit phase Signed-off-by: Harshit Gangal --- go/vt/vtgate/tx_conn.go | 118 ++++++++++++++++++++--------------- go/vt/vtgate/tx_conn_test.go | 4 +- 2 files changed, 67 insertions(+), 55 deletions(-) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 3a17e1a6647..2301dea8ee2 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -22,8 +22,6 @@ import ( "strings" "sync" - "github.com/pkg/errors" - "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" @@ -61,7 +59,7 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY, } -type CommitPhase int +type commitPhase int const ( COMMIT2PC_CREATETRANSACTION = iota @@ -199,59 +197,47 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err return txc.commitNormal(ctx, session) } - participants := make([]*querypb.Target, 0, len(session.ShardSessions)-1) - for _, s := range session.ShardSessions[1:] { - participants = append(participants, s.Target) - } mmShard := session.ShardSessions[0] + rmShards := session.ShardSessions[1:] dtid := dtids.New(mmShard) + participants := make([]*querypb.Target, len(rmShards)) + for i, s := range rmShards { + participants[i] = s.Target + } - txPhase := COMMIT2PC_CREATETRANSACTION + var txPhase commitPhase defer func() { if err == nil { return } - session.RecordWarning(&querypb.QueryWarning{ - Code: uint32(sqlerror.ERInAtomicRecovery), - Message: createWarningMessage(dtid, txPhase)}) + txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards) }() + txPhase = COMMIT2PC_CREATETRANSACTION if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { - // Normal rollback is safe because nothing was prepared yet. - _ = txc.Rollback(ctx, session) return err } - if DebugTwoPc { - // Test code to simulate a failure after RM prepare + if DebugTwoPc { // Test code to simulate a failure after RM prepare if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil { - _ = txc.Rollback(ctx, session) - return errors.Wrapf(terr, "%v", dtid) + return terr } } txPhase = COMMIT2PC_PREPARE - err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { - if DebugTwoPc { - // Test code to simulate a failure during RM prepare + prepareAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DebugTwoPc { // Test code to simulate a failure during RM prepare if terr := checkTestFailure(ctx, "RMPrepare_-40_FailNow", s.Target); terr != nil { return terr } } return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid) - }) - if err != nil { - // TODO(sougou): Perform a more fine-grained cleanup - // including unprepared transactions. - if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, session.ShardSessions[1:], session.logging); resumeErr != nil { - log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) - } - // Return the original error even if the previous operation fails. + } + if err = txc.runSessions(ctx, rmShards, session.logging, prepareAction); err != nil { return err } - if DebugTwoPc { - // Test code to simulate a failure after RM prepare + if DebugTwoPc { // Test code to simulate a failure after RM prepare if terr := checkTestFailure(ctx, "RMPrepared_FailNow", nil); terr != nil { return terr } @@ -263,42 +249,62 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err return err } - if DebugTwoPc { - // Test code to simulate a failure after MM commit + if DebugTwoPc { // Test code to simulate a failure after MM commit if terr := checkTestFailure(ctx, "MMCommitted_FailNow", nil); terr != nil { return terr } } txPhase = COMMIT2PC_PREPARECOMMIT - err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { - if DebugTwoPc { - // Test code to simulate a failure during RM prepare + prepareCommitAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DebugTwoPc { // Test code to simulate a failure during RM prepare if terr := checkTestFailure(ctx, "RMCommit_-40_FailNow", s.Target); terr != nil { return terr } } return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid) - }) - if err != nil { + } + if err = txc.runSessions(ctx, rmShards, session.logging, prepareCommitAction); err != nil { return err } + // At this point, application can continue forward. + // The transaction is already committed. + // This step is to clean up the transaction metadata. txPhase = COMMIT2PC_CONCLUDE - return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) + _ = txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) + return nil +} + +func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) { + switch txPhase { + case COMMIT2PC_CREATETRANSACTION: + // Normal rollback is safe because nothing was prepared yet. + if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil { + log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr) + } + case COMMIT2PC_PREPARE: + // Rollback the prepared and unprepared transactions. + if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil { + log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) + } + } + session.RecordWarning(&querypb.QueryWarning{ + Code: uint32(sqlerror.ERInAtomicRecovery), + Message: createWarningMessage(dtid, txPhase)}) } -func createWarningMessage(dtid string, txPhase int) string { +func createWarningMessage(dtid string, txPhase commitPhase) string { warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) switch txPhase { case COMMIT2PC_CREATETRANSACTION: - warningMsg += " transaction record creation" + warningMsg += " transaction record creation; rollback attempted; conclude on recovery" case COMMIT2PC_PREPARE: - warningMsg += " transaction prepare phase" + warningMsg += " transaction prepare phase; prepare transaction rollback attempted; conclude on recovery" case COMMIT2PC_STARTCOMMIT: - warningMsg += " metadata manager commit" + warningMsg += " metadata manager commit; transaction will be committed/roll based on the state on recovery" case COMMIT2PC_PREPARECOMMIT: - warningMsg += " resource manager commit" + warningMsg += " resource manager commit; transaction will be committed on recovery" case COMMIT2PC_CONCLUDE: warningMsg += " transaction conclusion" } @@ -472,24 +478,32 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa } // rollbackTx rollbacks the specified distributed transaction. +// Rollbacks happens on the metadata manager and all participants irrespective of the failure. func (txc *TxConn) rollbackTx(ctx context.Context, dtid string, mmShard *vtgatepb.Session_ShardSession, participants []*vtgatepb.Session_ShardSession, logging *executeLogger) error { - qs, err := txc.queryService(ctx, mmShard.TabletAlias) - if err != nil { - return err - } - if err := qs.SetRollback(ctx, mmShard.Target, dtid, mmShard.TransactionId); err != nil { - return err + var errs []error + if mmErr := txc.rollbackMM(ctx, dtid, mmShard); mmErr != nil { + errs = append(errs, mmErr) } - err = txc.runSessions(ctx, participants, logging, func(ctx context.Context, session *vtgatepb.Session_ShardSession, logger *executeLogger) error { + if rmErr := txc.runSessions(ctx, participants, logging, func(ctx context.Context, session *vtgatepb.Session_ShardSession, logger *executeLogger) error { return txc.tabletGateway.RollbackPrepared(ctx, session.Target, dtid, session.TransactionId) - }) - if err != nil { + }); rmErr != nil { + errs = append(errs, rmErr) + } + if err := vterrors.Aggregate(errs); err != nil { return err } return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } +func (txc *TxConn) rollbackMM(ctx context.Context, dtid string, mmShard *vtgatepb.Session_ShardSession) error { + qs, err := txc.queryService(ctx, mmShard.TabletAlias) + if err != nil { + return err + } + return qs.SetRollback(ctx, mmShard.Target, dtid, mmShard.TransactionId) +} + func (txc *TxConn) resumeRollback(ctx context.Context, target *querypb.Target, transaction *querypb.TransactionMetadata) error { err := txc.runTargets(transaction.Participants, func(t *querypb.Target) error { return txc.tabletGateway.RollbackPrepared(ctx, t, transaction.Dtid, 0) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 74329153936..ed977b75051 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -1090,9 +1090,7 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) { sbc0.MustFailConcludeTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.NoError(t, err) // ConcludeTransaction is best-effort as it does not impact the outcome. assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount") From bec8da64969f5996b328abac3505bd49bc30f1dc Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 25 Jul 2024 22:47:14 +0530 Subject: [PATCH 5/7] test: warning and transaction status testing Signed-off-by: Harshit Gangal --- .../endtoend/transaction/twopc/twopc_test.go | 115 ++++++++++++------ go/vt/vtgate/engine/transaction_status.go | 2 +- 2 files changed, 78 insertions(+), 39 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index da8db0952a7..0560ed28ebe 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -31,11 +31,13 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) // TestDTCommit tests distributed transaction commit for insert, update and delete operations @@ -580,6 +582,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After MM commit") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during metadata manager commit; transaction will be committed/roll based on the state on recovery", + false, "COMMIT", "ks:40-80,ks:-40") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -656,6 +662,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After RM prepared") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery", + true /* transaction concluded */, "", "") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -714,6 +724,10 @@ func TestDTResolveDuringRMPrepare(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail During RM prepare") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery", + true, "", "") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -776,6 +790,10 @@ func TestDTResolveDuringRMCommit(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail During RM commit") + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during resource manager commit; transaction will be committed on recovery", + false, "COMMIT", "ks:40-80,ks:-40") + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) dtMap := make(map[string]string) @@ -851,18 +869,9 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { _, err = conn.Execute(newCtx, "commit", nil) require.ErrorContains(t, err, "Fail After TR created") - t.Run("ReadTransactionState", func(t *testing.T) { - errStr := err.Error() - indx := strings.Index(errStr, "Fail") - require.Greater(t, indx, 0) - dtid := errStr[0 : indx-2] - res, err := conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) - require.NoError(t, err) - resStr := fmt.Sprintf("%v", res.Rows) - require.Contains(t, resStr, `[[VARCHAR("ks:80-`) - require.Contains(t, resStr, `VARCHAR("PREPARE") DATETIME("`) - require.Contains(t, resStr, `+0000 UTC") VARCHAR("ks:40-80")]]`) - }) + testWarningAndTransactionStatus(t, conn, + "distributed transaction ID failed during transaction record creation; rollback attempted; conclude on recovery", + false, "PREPARE", "ks:40-80") // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. tableMap := make(map[string][]*querypb.Field) @@ -883,36 +892,66 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) } -// TestDTWarningAfterMMCommit tests that failure after MM commit returns a warning. -func TestDTWarningAfterMMCommit(t *testing.T) { - defer cleanup(t) +type warn struct { + level string + code uint16 + msg string +} - vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") - require.NoError(t, err) - defer vtgateConn.Close() +func toWarn(row sqltypes.Row) warn { + code, _ := row[1].ToUint16() + return warn{ + level: row[0].ToString(), + code: code, + msg: row[2].ToString(), + } +} - conn := vtgateConn.Session("", nil) - qCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Insert into multiple shards - _, err = conn.Execute(qCtx, "begin", nil) - require.NoError(t, err) - _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) - require.NoError(t, err) - _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) - require.NoError(t, err) - _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(9,'baz')", nil) - require.NoError(t, err) - _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil) +type txStatus struct { + dtid string + state string + rTime string + participants string +} + +func toTxStatus(row sqltypes.Row) txStatus { + return txStatus{ + dtid: row[0].ToString(), + state: row[1].ToString(), + rTime: row[2].ToString(), + participants: row[3].ToString(), + } +} + +func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSession, warnMsg string, + txConcluded bool, txState string, txParticipants string) { + t.Helper() + + qr, err := conn.Execute(context.Background(), "show warnings", nil) require.NoError(t, err) + require.Len(t, qr.Rows, 1) - // The caller ID is used to simulate the failure at the desired point. - newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) - _, err = conn.Execute(newCtx, "commit", nil) - require.ErrorContains(t, err, "Fail After MM commit") + // validate warning output + w := toWarn(qr.Rows[0]) + assert.Equal(t, "Warning", w.level) + assert.EqualValues(t, 302, w.code) + assert.Contains(t, w.msg, warnMsg) + + // extract transaction ID + indx := strings.Index(w.msg, " ") + require.Greater(t, indx, 0) + dtid := w.msg[:indx] - qr, err := conn.Execute(qCtx, "show warnings", nil) + qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) require.NoError(t, err) - require.Contains(t, fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("Warning") UINT16(302) VARCHAR("ks:80-:`) - require.Contains(t, fmt.Sprintf("%v", qr.Rows), `distributed transaction ID failed during metadata manager commit`) + + // validate transaction status + if txConcluded { + require.Empty(t, qr.Rows) + } else { + tx := toTxStatus(qr.Rows[0]) + assert.Equal(t, dtid, tx.dtid) + assert.Equal(t, txState, tx.state) + assert.Equal(t, txParticipants, tx.participants) + } } diff --git a/go/vt/vtgate/engine/transaction_status.go b/go/vt/vtgate/engine/transaction_status.go index a087db08256..61cc72c08d9 100644 --- a/go/vt/vtgate/engine/transaction_status.go +++ b/go/vt/vtgate/engine/transaction_status.go @@ -84,7 +84,7 @@ func (t *TransactionStatus) TryExecute(ctx context.Context, vcursor VCursor, bin if wantfields { res.Fields = t.getFields() } - if transactionState != nil { + if transactionState != nil && transactionState.Dtid != "" { var participantString []string for _, participant := range transactionState.Participants { participantString = append(participantString, fmt.Sprintf("%s:%s", participant.Keyspace, participant.Shard)) From 016db5d61da22df9396b26a6b51cbccb9cdf282b Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sat, 27 Jul 2024 10:32:04 +0530 Subject: [PATCH 6/7] chore: error message Signed-off-by: Harshit Gangal --- go/test/endtoend/transaction/twopc/twopc_test.go | 2 +- go/vt/vtgate/tx_conn.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 0560ed28ebe..98bc158c4da 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -583,7 +583,7 @@ func TestDTResolveAfterMMCommit(t *testing.T) { require.ErrorContains(t, err, "Fail After MM commit") testWarningAndTransactionStatus(t, conn, - "distributed transaction ID failed during metadata manager commit; transaction will be committed/roll based on the state on recovery", + "distributed transaction ID failed during metadata manager commit; transaction will be committed/rollbacked based on the state on recovery", false, "COMMIT", "ks:40-80,ks:-40") // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 2301dea8ee2..32b2fb55cdc 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -302,7 +302,7 @@ func createWarningMessage(dtid string, txPhase commitPhase) string { case COMMIT2PC_PREPARE: warningMsg += " transaction prepare phase; prepare transaction rollback attempted; conclude on recovery" case COMMIT2PC_STARTCOMMIT: - warningMsg += " metadata manager commit; transaction will be committed/roll based on the state on recovery" + warningMsg += " metadata manager commit; transaction will be committed/rollbacked based on the state on recovery" case COMMIT2PC_PREPARECOMMIT: warningMsg += " resource manager commit; transaction will be committed on recovery" case COMMIT2PC_CONCLUDE: From 6e22f8aa8a1d1cd1fea506ffe8d206da930cabbd Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 30 Jul 2024 16:25:06 +0530 Subject: [PATCH 7/7] addressed review comments Signed-off-by: Harshit Gangal --- go/vt/vtgate/tx_conn.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 32b2fb55cdc..f8b08def10c 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -62,11 +62,11 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt type commitPhase int const ( - COMMIT2PC_CREATETRANSACTION = iota - COMMIT2PC_PREPARE = iota - COMMIT2PC_STARTCOMMIT = iota - COMMIT2PC_PREPARECOMMIT - COMMIT2PC_CONCLUDE + Commit2pcCreateTransaction commitPhase = iota + Commit2pcPrepare + Commit2pcStartCommit + Commit2pcPrepareCommit + Commit2pcConclude ) // Begin begins a new transaction. If one is already in progress, it commits it @@ -213,7 +213,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards) }() - txPhase = COMMIT2PC_CREATETRANSACTION + txPhase = Commit2pcCreateTransaction if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { return err } @@ -224,7 +224,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } } - txPhase = COMMIT2PC_PREPARE + txPhase = Commit2pcPrepare prepareAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare if terr := checkTestFailure(ctx, "RMPrepare_-40_FailNow", s.Target); terr != nil { @@ -243,7 +243,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } } - txPhase = COMMIT2PC_STARTCOMMIT + txPhase = Commit2pcStartCommit err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err @@ -255,7 +255,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } } - txPhase = COMMIT2PC_PREPARECOMMIT + txPhase = Commit2pcPrepareCommit prepareCommitAction := func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare if terr := checkTestFailure(ctx, "RMCommit_-40_FailNow", s.Target); terr != nil { @@ -271,19 +271,19 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err // At this point, application can continue forward. // The transaction is already committed. // This step is to clean up the transaction metadata. - txPhase = COMMIT2PC_CONCLUDE + txPhase = Commit2pcConclude _ = txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) return nil } func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) { switch txPhase { - case COMMIT2PC_CREATETRANSACTION: + case Commit2pcCreateTransaction: // Normal rollback is safe because nothing was prepared yet. if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil { log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr) } - case COMMIT2PC_PREPARE: + case Commit2pcPrepare: // Rollback the prepared and unprepared transactions. if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil { log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) @@ -297,15 +297,15 @@ func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession func createWarningMessage(dtid string, txPhase commitPhase) string { warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) switch txPhase { - case COMMIT2PC_CREATETRANSACTION: + case Commit2pcCreateTransaction: warningMsg += " transaction record creation; rollback attempted; conclude on recovery" - case COMMIT2PC_PREPARE: + case Commit2pcPrepare: warningMsg += " transaction prepare phase; prepare transaction rollback attempted; conclude on recovery" - case COMMIT2PC_STARTCOMMIT: + case Commit2pcStartCommit: warningMsg += " metadata manager commit; transaction will be committed/rollbacked based on the state on recovery" - case COMMIT2PC_PREPARECOMMIT: + case Commit2pcPrepareCommit: warningMsg += " resource manager commit; transaction will be committed on recovery" - case COMMIT2PC_CONCLUDE: + case Commit2pcConclude: warningMsg += " transaction conclusion" } return warningMsg