Skip to content

Commit

Permalink
added framework to test vttablet failure metrics and conclude those t…
Browse files Browse the repository at this point in the history
…ransactions

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Oct 16, 2024
1 parent 4131de8 commit 1e22b76
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 25 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/metric/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ func start(t *testing.T) (*mysql.Conn, func()) {
func cleanup(t *testing.T) {
cluster.PanicHandler(t)
twopcutil.ClearOutTable(t, vtParams, "twopc_user")
twopcutil.ClearOutTable(t, vtParams, "twopc_t1")
}
88 changes: 82 additions & 6 deletions go/test/endtoend/transaction/twopc/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"

"vitess.io/vitess/go/test/endtoend/cluster"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
Expand Down Expand Up @@ -164,6 +163,7 @@ func TestVTTablet2PCMetrics(t *testing.T) {

conn := vtgateConn.Session("", nil)
ctx, cancel := context.WithCancel(context.Background())
ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
defer cancel()

for i := 1; i <= 20; i++ {
Expand All @@ -176,8 +176,7 @@ func TestVTTablet2PCMetrics(t *testing.T) {
multi := len(conn.SessionPb().ShardSessions) > 1

// fail after mm commit.
newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
_, err = conn.Execute(ctx, "commit", nil)
if multi {
assert.ErrorContains(t, err, "Fail After MM commit")
} else {
Expand Down Expand Up @@ -208,6 +207,76 @@ func TestVTTablet2PCMetrics(t *testing.T) {
}
}

// TestVTTablet2PCMetricsFailCommitPrepared tests 2pc metrics on VTTablet on commit prepared failure..';;/
func TestVTTablet2PCMetricsFailCommitPrepared(t *testing.T) {
defer cleanup(t)

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

conn := vtgateConn.Session("", nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_R", "", ""), nil)
execute(t, newCtx, conn, "begin")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (4, 1)")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (6, 2)")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (9, 3)")
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "commit prepared: retryable error")
dtidRetryable := getDTIDFromWarnings(ctx, t, conn)
require.NotEmpty(t, dtidRetryable)

cpFail := getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars)
require.EqualValues(t, 1, cpFail["Retryable"])
require.Nil(t, cpFail["NonRetryable"])

newCtx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_NR", "", ""), nil)
execute(t, newCtx, conn, "begin")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (20, 11)")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (22, 21)")
execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (25, 31)")
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "commit prepared: non retryable error")
dtidNonRetryable := getDTIDFromWarnings(ctx, t, conn)
require.NotEmpty(t, dtidNonRetryable)

cpFail = getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars)
require.EqualValues(t, 1, cpFail["Retryable"]) // old counter value
require.EqualValues(t, 1, cpFail["NonRetryable"])

// restart to trigger unresolved transactions
err = clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().RestartOnlyTablet()
require.NoError(t, err)

// dtid with retryable error should be resolved.
waitForDTIDResolve(ctx, t, conn, dtidRetryable, 5*time.Second)

// dtid with non retryable error should remain unresolved.
qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil)
require.NoError(t, err)
require.NotZero(t, qr.Rows, "should remain unresolved")

// running conclude transaction for it.
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"DistributedTransaction", "conclude", "--dtid", dtidNonRetryable)
require.NoError(t, err)
require.Contains(t, out, "Successfully concluded the distributed transaction")
// now verifying
qr, err = conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil)
require.NoError(t, err)
require.Empty(t, qr.Rows)
}

func execute(t *testing.T, ctx context.Context, conn *vtgateconn.VTGateSession, query string) {
t.Helper()

_, err := conn.Execute(ctx, query, nil)
require.NoError(t, err)
}

func getUnresolvedTxCount(t *testing.T) float64 {
unresolvedCount := 0.0
for _, shard := range clusterInstance.Keyspaces[0].Shards {
Expand Down Expand Up @@ -290,6 +359,11 @@ func getVarValue[T any](t *testing.T, key string, varFunc func() map[string]any)
func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, waitTime time.Duration) {
t.Helper()

dtid := getDTIDFromWarnings(ctx, t, conn)
waitForDTIDResolve(ctx, t, conn, dtid, waitTime)
}

func getDTIDFromWarnings(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession) string {
qr, err := conn.Execute(ctx, "show warnings", nil)
require.NoError(t, err)
require.Len(t, qr.Rows, 1)
Expand All @@ -302,8 +376,10 @@ func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSe
// extract transaction ID
indx := strings.Index(w.Msg, " ")
require.Greater(t, indx, 0)
dtid := w.Msg[:indx]
return w.Msg[:indx]
}

func waitForDTIDResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, dtid string, waitTime time.Duration) {
unresolved := true
totalTime := time.After(waitTime)
for unresolved {
Expand All @@ -312,7 +388,7 @@ func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSe
t.Errorf("transaction resolution exceeded wait time of %v", waitTime)
unresolved = false // break the loop.
case <-time.After(100 * time.Millisecond):
qr, err = conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtid), nil)
require.NoError(t, err)
unresolved = len(qr.Rows) != 0
}
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/transaction/twopc/metric/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@ create table twopc_user
id bigint,
name varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_t1
(
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
11 changes: 11 additions & 0 deletions go/test/endtoend/transaction/twopc/metric/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"vindexes": {
"xxhash": {
"type": "xxhash"
},
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
Expand All @@ -13,6 +16,14 @@
"name": "xxhash"
}
]
},
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}
25 changes: 25 additions & 0 deletions go/vt/vttablet/tabletserver/debug_2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ limitations under the License.
package tabletserver

import (
"context"
"os"
"path"
"strconv"
"time"

"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/log"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

const DebugTwoPc = true
Expand All @@ -46,3 +50,24 @@ func commitPreparedDelayForTest(tsv *TabletServer) {
time.Sleep(time.Duration(delVal) * time.Second)
}
}

// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true.
func checkTestFailure(ctx context.Context, shard string) error {
if shard != "80-" {
return nil
}
callerID := callerid.EffectiveCallerIDFromContext(ctx)
if callerID == nil {
return nil
}
switch callerID.Principal {
case "CP_80-_R":
// retryable error.
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "commit prepared: retryable error")
case "CP_80-_NR":
// non retryable error.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "commit prepared: non retryable error")
default:
return nil
}
}
26 changes: 17 additions & 9 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ import (

// DTExecutor is used for executing a distributed transactional request.
type DTExecutor struct {
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
qe *QueryEngine
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
qe *QueryEngine
shardFunc func() string
}

// NewDTExecutor creates a new distributed transaction executor.
func NewDTExecutor(ctx context.Context, te *TxEngine, qe *QueryEngine, logStats *tabletenv.LogStats) *DTExecutor {
func NewDTExecutor(ctx context.Context, logStats *tabletenv.LogStats, te *TxEngine, qe *QueryEngine, shardFunc func() string) *DTExecutor {
return &DTExecutor{
ctx: ctx,
te: te,
qe: qe,
logStats: logStats,
ctx: ctx,
logStats: logStats,
te: te,
qe: qe,
shardFunc: shardFunc,
}
}

Expand Down Expand Up @@ -168,6 +170,12 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
if DebugTwoPc {
if err := checkTestFailure(dte.ctx, dte.shardFunc()); err != nil {
log.Errorf("failing test on commit prepared: %v", err)
return err
}
}
if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License.

package tabletserver

import (
"context"
)

// This file defines debug constants that are always false.
// This file is used for building production code.
// We use go build directives to include a file that defines the constant to true
Expand All @@ -28,3 +32,7 @@ package tabletserver
const DebugTwoPc = false

func commitPreparedDelayForTest(tsv *TabletServer) {}

func checkTestFailure(context.Context, string) error {
return nil
}
24 changes: 14 additions & 10 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr
"Prepare", "prepare", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.Prepare(transactionID, dtid)
},
)
Expand All @@ -716,7 +716,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar
"CommitPrepared", "commit_prepared", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
if DebugTwoPc {
commitPreparedDelayForTest(tsv)
}
Expand All @@ -732,7 +732,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
"RollbackPrepared", "rollback_prepared", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.RollbackPrepared(dtid, originalID)
},
)
Expand Down Expand Up @@ -765,7 +765,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.
"CreateTransaction", "create_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.CreateTransaction(dtid, participants)
},
)
Expand All @@ -779,7 +779,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target
"StartCommit", "start_commit", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.StartCommit(transactionID, dtid)
},
)
Expand All @@ -793,7 +793,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target
"SetRollback", "set_rollback", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.SetRollback(dtid, transactionID)
},
)
Expand All @@ -807,7 +807,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp
"ConcludeTransaction", "conclude_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.ConcludeTransaction(dtid)
},
)
Expand All @@ -820,7 +820,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta
"ReadTransaction", "read_transaction", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
metadata, err = txe.ReadTransaction(dtid)
return err
},
Expand All @@ -835,7 +835,7 @@ func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *que
"UnresolvedTransactions", "unresolved_transaction", nil,
target, nil, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats)
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
transactions, err = txe.UnresolvedTransactions(time.Duration(abandonAgeSeconds) * time.Second)
return err
},
Expand Down Expand Up @@ -1865,7 +1865,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) {
func (tsv *TabletServer) registerTwopczHandler() {
tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
txe := NewDTExecutor(ctx, tsv.te, tsv.qe, tabletenv.NewLogStats(ctx, "twopcz"))
txe := NewDTExecutor(ctx, tabletenv.NewLogStats(ctx, "twopcz"), tsv.te, tsv.qe, tsv.getShard)
twopczHandler(txe, w, r)
})
}
Expand Down Expand Up @@ -2154,3 +2154,7 @@ func skipQueryPlanCache(options *querypb.ExecuteOptions) bool {
}
return options.SkipQueryPlanCache || options.HasCreatedTempTables
}

func (tsv *TabletServer) getShard() string {
return tsv.sm.Target().Shard
}

0 comments on commit 1e22b76

Please sign in to comment.