From 1e22b763be92aedab6f98a098a94724a1949489c Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 16 Oct 2024 17:30:55 +0530 Subject: [PATCH] added framework to test vttablet failure metrics and conclude those transactions Signed-off-by: Harshit Gangal --- .../transaction/twopc/metric/main_test.go | 1 + .../transaction/twopc/metric/metric_test.go | 88 +++++++++++++++++-- .../transaction/twopc/metric/schema.sql | 7 ++ .../transaction/twopc/metric/vschema.json | 11 +++ go/vt/vttablet/tabletserver/debug_2pc.go | 25 ++++++ go/vt/vttablet/tabletserver/dt_executor.go | 26 ++++-- go/vt/vttablet/tabletserver/production.go | 8 ++ go/vt/vttablet/tabletserver/tabletserver.go | 24 ++--- 8 files changed, 165 insertions(+), 25 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/metric/main_test.go b/go/test/endtoend/transaction/twopc/metric/main_test.go index 800c758b27c..73cc380a900 100644 --- a/go/test/endtoend/transaction/twopc/metric/main_test.go +++ b/go/test/endtoend/transaction/twopc/metric/main_test.go @@ -113,4 +113,5 @@ func start(t *testing.T) (*mysql.Conn, func()) { func cleanup(t *testing.T) { cluster.PanicHandler(t) twopcutil.ClearOutTable(t, vtParams, "twopc_user") + twopcutil.ClearOutTable(t, vtParams, "twopc_t1") } diff --git a/go/test/endtoend/transaction/twopc/metric/metric_test.go b/go/test/endtoend/transaction/twopc/metric/metric_test.go index c5755228e65..40645628f45 100644 --- a/go/test/endtoend/transaction/twopc/metric/metric_test.go +++ b/go/test/endtoend/transaction/twopc/metric/metric_test.go @@ -26,9 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" - "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/vtgate/vtgateconn" @@ -164,6 +163,7 @@ func TestVTTablet2PCMetrics(t *testing.T) { conn := vtgateConn.Session("", nil) ctx, cancel := context.WithCancel(context.Background()) + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) defer cancel() for i := 1; i <= 20; i++ { @@ -176,8 +176,7 @@ func TestVTTablet2PCMetrics(t *testing.T) { multi := len(conn.SessionPb().ShardSessions) > 1 // fail after mm commit. - newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) - _, err = conn.Execute(newCtx, "commit", nil) + _, err = conn.Execute(ctx, "commit", nil) if multi { assert.ErrorContains(t, err, "Fail After MM commit") } else { @@ -208,6 +207,76 @@ func TestVTTablet2PCMetrics(t *testing.T) { } } +// TestVTTablet2PCMetricsFailCommitPrepared tests 2pc metrics on VTTablet on commit prepared failure..';;/ +func TestVTTablet2PCMetricsFailCommitPrepared(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + conn := vtgateConn.Session("", nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_R", "", ""), nil) + execute(t, newCtx, conn, "begin") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (4, 1)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (6, 2)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (9, 3)") + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "commit prepared: retryable error") + dtidRetryable := getDTIDFromWarnings(ctx, t, conn) + require.NotEmpty(t, dtidRetryable) + + cpFail := getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars) + require.EqualValues(t, 1, cpFail["Retryable"]) + require.Nil(t, cpFail["NonRetryable"]) + + newCtx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_NR", "", ""), nil) + execute(t, newCtx, conn, "begin") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (20, 11)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (22, 21)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (25, 31)") + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "commit prepared: non retryable error") + dtidNonRetryable := getDTIDFromWarnings(ctx, t, conn) + require.NotEmpty(t, dtidNonRetryable) + + cpFail = getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars) + require.EqualValues(t, 1, cpFail["Retryable"]) // old counter value + require.EqualValues(t, 1, cpFail["NonRetryable"]) + + // restart to trigger unresolved transactions + err = clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().RestartOnlyTablet() + require.NoError(t, err) + + // dtid with retryable error should be resolved. + waitForDTIDResolve(ctx, t, conn, dtidRetryable, 5*time.Second) + + // dtid with non retryable error should remain unresolved. + qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil) + require.NoError(t, err) + require.NotZero(t, qr.Rows, "should remain unresolved") + + // running conclude transaction for it. + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "DistributedTransaction", "conclude", "--dtid", dtidNonRetryable) + require.NoError(t, err) + require.Contains(t, out, "Successfully concluded the distributed transaction") + // now verifying + qr, err = conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil) + require.NoError(t, err) + require.Empty(t, qr.Rows) +} + +func execute(t *testing.T, ctx context.Context, conn *vtgateconn.VTGateSession, query string) { + t.Helper() + + _, err := conn.Execute(ctx, query, nil) + require.NoError(t, err) +} + func getUnresolvedTxCount(t *testing.T) float64 { unresolvedCount := 0.0 for _, shard := range clusterInstance.Keyspaces[0].Shards { @@ -290,6 +359,11 @@ func getVarValue[T any](t *testing.T, key string, varFunc func() map[string]any) func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, waitTime time.Duration) { t.Helper() + dtid := getDTIDFromWarnings(ctx, t, conn) + waitForDTIDResolve(ctx, t, conn, dtid, waitTime) +} + +func getDTIDFromWarnings(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession) string { qr, err := conn.Execute(ctx, "show warnings", nil) require.NoError(t, err) require.Len(t, qr.Rows, 1) @@ -302,8 +376,10 @@ func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSe // extract transaction ID indx := strings.Index(w.Msg, " ") require.Greater(t, indx, 0) - dtid := w.Msg[:indx] + return w.Msg[:indx] +} +func waitForDTIDResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, dtid string, waitTime time.Duration) { unresolved := true totalTime := time.After(waitTime) for unresolved { @@ -312,7 +388,7 @@ func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSe t.Errorf("transaction resolution exceeded wait time of %v", waitTime) unresolved = false // break the loop. case <-time.After(100 * time.Millisecond): - qr, err = conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) + qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtid), nil) require.NoError(t, err) unresolved = len(qr.Rows) != 0 } diff --git a/go/test/endtoend/transaction/twopc/metric/schema.sql b/go/test/endtoend/transaction/twopc/metric/schema.sql index a52d39a10a3..da6e5cf289a 100644 --- a/go/test/endtoend/transaction/twopc/metric/schema.sql +++ b/go/test/endtoend/transaction/twopc/metric/schema.sql @@ -3,4 +3,11 @@ create table twopc_user id bigint, name varchar(64), primary key (id) +) Engine=InnoDB; + +create table twopc_t1 +( + id bigint, + col bigint, + primary key (id) ) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/metric/vschema.json b/go/test/endtoend/transaction/twopc/metric/vschema.json index 9a248563ecd..c6be1426a87 100644 --- a/go/test/endtoend/transaction/twopc/metric/vschema.json +++ b/go/test/endtoend/transaction/twopc/metric/vschema.json @@ -3,6 +3,9 @@ "vindexes": { "xxhash": { "type": "xxhash" + }, + "reverse_bits": { + "type": "reverse_bits" } }, "tables": { @@ -13,6 +16,14 @@ "name": "xxhash" } ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file diff --git a/go/vt/vttablet/tabletserver/debug_2pc.go b/go/vt/vttablet/tabletserver/debug_2pc.go index a0de20104db..5db72be0fba 100644 --- a/go/vt/vttablet/tabletserver/debug_2pc.go +++ b/go/vt/vttablet/tabletserver/debug_2pc.go @@ -19,12 +19,16 @@ limitations under the License. package tabletserver import ( + "context" "os" "path" "strconv" "time" + "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) const DebugTwoPc = true @@ -46,3 +50,24 @@ func commitPreparedDelayForTest(tsv *TabletServer) { time.Sleep(time.Duration(delVal) * time.Second) } } + +// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true. +func checkTestFailure(ctx context.Context, shard string) error { + if shard != "80-" { + return nil + } + callerID := callerid.EffectiveCallerIDFromContext(ctx) + if callerID == nil { + return nil + } + switch callerID.Principal { + case "CP_80-_R": + // retryable error. + return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "commit prepared: retryable error") + case "CP_80-_NR": + // non retryable error. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "commit prepared: non retryable error") + default: + return nil + } +} diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 9d280c36a41..126c99814b8 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -33,19 +33,21 @@ import ( // DTExecutor is used for executing a distributed transactional request. type DTExecutor struct { - ctx context.Context - logStats *tabletenv.LogStats - te *TxEngine - qe *QueryEngine + ctx context.Context + logStats *tabletenv.LogStats + te *TxEngine + qe *QueryEngine + shardFunc func() string } // NewDTExecutor creates a new distributed transaction executor. -func NewDTExecutor(ctx context.Context, te *TxEngine, qe *QueryEngine, logStats *tabletenv.LogStats) *DTExecutor { +func NewDTExecutor(ctx context.Context, logStats *tabletenv.LogStats, te *TxEngine, qe *QueryEngine, shardFunc func() string) *DTExecutor { return &DTExecutor{ - ctx: ctx, - te: te, - qe: qe, - logStats: logStats, + ctx: ctx, + logStats: logStats, + te: te, + qe: qe, + shardFunc: shardFunc, } } @@ -168,6 +170,12 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { } dte.te.txPool.RollbackAndRelease(ctx, conn) }() + if DebugTwoPc { + if err := checkTestFailure(dte.ctx, dte.shardFunc()); err != nil { + log.Errorf("failing test on commit prepared: %v", err) + return err + } + } if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/production.go b/go/vt/vttablet/tabletserver/production.go index 70cb8b092fa..e0d8cb4fd66 100644 --- a/go/vt/vttablet/tabletserver/production.go +++ b/go/vt/vttablet/tabletserver/production.go @@ -18,6 +18,10 @@ limitations under the License. package tabletserver +import ( + "context" +) + // This file defines debug constants that are always false. // This file is used for building production code. // We use go build directives to include a file that defines the constant to true @@ -28,3 +32,7 @@ package tabletserver const DebugTwoPc = false func commitPreparedDelayForTest(tsv *TabletServer) {} + +func checkTestFailure(context.Context, string) error { + return nil +} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 261bd900f41..f96911971be 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -703,7 +703,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr "Prepare", "prepare", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.Prepare(transactionID, dtid) }, ) @@ -716,7 +716,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar "CommitPrepared", "commit_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) if DebugTwoPc { commitPreparedDelayForTest(tsv) } @@ -732,7 +732,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T "RollbackPrepared", "rollback_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.RollbackPrepared(dtid, originalID) }, ) @@ -765,7 +765,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb. "CreateTransaction", "create_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.CreateTransaction(dtid, participants) }, ) @@ -779,7 +779,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target "StartCommit", "start_commit", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.StartCommit(transactionID, dtid) }, ) @@ -793,7 +793,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target "SetRollback", "set_rollback", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.SetRollback(dtid, transactionID) }, ) @@ -807,7 +807,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp "ConcludeTransaction", "conclude_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.ConcludeTransaction(dtid) }, ) @@ -820,7 +820,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta "ReadTransaction", "read_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) metadata, err = txe.ReadTransaction(dtid) return err }, @@ -835,7 +835,7 @@ func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *que "UnresolvedTransactions", "unresolved_transaction", nil, target, nil, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) transactions, err = txe.UnresolvedTransactions(time.Duration(abandonAgeSeconds) * time.Second) return err }, @@ -1865,7 +1865,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { func (tsv *TabletServer) registerTwopczHandler() { tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, tabletenv.NewLogStats(ctx, "twopcz")) + txe := NewDTExecutor(ctx, tabletenv.NewLogStats(ctx, "twopcz"), tsv.te, tsv.qe, tsv.getShard) twopczHandler(txe, w, r) }) } @@ -2154,3 +2154,7 @@ func skipQueryPlanCache(options *querypb.ExecuteOptions) bool { } return options.SkipQueryPlanCache || options.HasCreatedTempTables } + +func (tsv *TabletServer) getShard() string { + return tsv.sm.Target().Shard +}