Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify distributed transaction commit flow #16468

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERInAtomicRecovery = ErrorCode(302)

// unknown
ERUnknownError = ErrorCode(1105)
Expand Down
97 changes: 85 additions & 12 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
Expand Down Expand Up @@ -580,6 +582,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After MM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during metadata manager commit; transaction will be committed/rollbacked based on the state on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -656,6 +662,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After RM prepared")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true /* transaction concluded */, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -714,6 +724,10 @@ func TestDTResolveDuringRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM prepare")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -776,6 +790,10 @@ func TestDTResolveDuringRMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during resource manager commit; transaction will be committed on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -851,18 +869,9 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After TR created")

t.Run("ReadTransactionState", func(t *testing.T) {
errStr := err.Error()
indx := strings.Index(errStr, "Fail")
require.Greater(t, indx, 0)
dtid := errStr[0 : indx-2]
res, err := conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)
resStr := fmt.Sprintf("%v", res.Rows)
require.Contains(t, resStr, `[[VARCHAR("ks:80-`)
require.Contains(t, resStr, `VARCHAR("PREPARE") DATETIME("`)
require.Contains(t, resStr, `+0000 UTC") VARCHAR("ks:40-80")]]`)
})
testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction record creation; rollback attempted; conclude on recovery",
false, "PREPARE", "ks:40-80")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
Expand All @@ -882,3 +891,67 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}

type warn struct {
level string
code uint16
msg string
}

func toWarn(row sqltypes.Row) warn {
code, _ := row[1].ToUint16()
return warn{
level: row[0].ToString(),
code: code,
msg: row[2].ToString(),
}
}

type txStatus struct {
dtid string
state string
rTime string
participants string
}

func toTxStatus(row sqltypes.Row) txStatus {
return txStatus{
dtid: row[0].ToString(),
state: row[1].ToString(),
rTime: row[2].ToString(),
participants: row[3].ToString(),
}
}

func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSession, warnMsg string,
txConcluded bool, txState string, txParticipants string) {
t.Helper()

qr, err := conn.Execute(context.Background(), "show warnings", nil)
require.NoError(t, err)
require.Len(t, qr.Rows, 1)

// validate warning output
w := toWarn(qr.Rows[0])
assert.Equal(t, "Warning", w.level)
assert.EqualValues(t, 302, w.code)
assert.Contains(t, w.msg, warnMsg)

// extract transaction ID
indx := strings.Index(w.msg, " ")
require.Greater(t, indx, 0)
dtid := w.msg[:indx]

qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)

// validate transaction status
if txConcluded {
require.Empty(t, qr.Rows)
} else {
tx := toTxStatus(qr.Rows[0])
assert.Equal(t, dtid, tx.dtid)
assert.Equal(t, txState, tx.state)
assert.Equal(t, txParticipants, tx.participants)
}
}
48 changes: 48 additions & 0 deletions go/vt/vtgate/debug_2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,52 @@ limitations under the License.

package vtgate

import (
"context"

"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

const DebugTwoPc = true

// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true.
func checkTestFailure(ctx context.Context, expectCaller string, target *querypb.Target) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put this in this file? and then a corresponding function in the other file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple reasons

  1. This is an unused function in the production build.
  2. Anyone can end up using it if this implementation remains in production code.
  3. Simple no-op production code will avoid any incidental calls to it.

callerID := callerid.EffectiveCallerIDFromContext(ctx)
if callerID == nil || callerID.GetPrincipal() != expectCaller {
return nil
}
switch callerID.Principal {
case "TRCreated_FailNow":
log.Errorf("Fail After TR created")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created")
case "RMPrepare_-40_FailNow":
if target.Shard != "-40" {
return nil
}
log.Errorf("Fail During RM prepare")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare")
case "RMPrepared_FailNow":
log.Errorf("Fail After RM prepared")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared")
case "MMCommitted_FailNow":
log.Errorf("Fail After MM commit")
// commit decision is made. Transaction should be committed.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit")
case "RMCommit_-40_FailNow":
if target.Shard != "-40" {
return nil
}
log.Errorf("Fail During RM commit")
// commit decision is made. Transaction should be a committed.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit")
default:
return nil
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/transaction_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *TransactionStatus) TryExecute(ctx context.Context, vcursor VCursor, bin
if wantfields {
res.Fields = t.getFields()
}
if transactionState != nil {
if transactionState != nil && transactionState.Dtid != "" {
var participantString []string
for _, participant := range transactionState.Participants {
participantString = append(participantString, fmt.Sprintf("%s:%s", participant.Keyspace, participant.Shard))
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ limitations under the License.

package vtgate

import (
"context"

querypb "vitess.io/vitess/go/vt/proto/query"
)

// This file defines debug constants that are always false.
// This file is used for building production code.
// We use go build directives to include a file that defines the constant to true
Expand All @@ -26,3 +32,7 @@ package vtgate
// production performance.

const DebugTwoPc = false

func checkTestFailure(_ context.Context, _ string, _ *querypb.Target) error {
return nil
}
Loading
Loading