Skip to content

Commit

Permalink
Modified StartCommit rpc on vttablet to emit the StartCommit state
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Oct 30, 2024
1 parent a027384 commit d9db2f4
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 63 deletions.
6 changes: 3 additions & 3 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,9 @@ func (itc *internalTabletConn) CreateTransaction(
}

// StartCommit is part of queryservice.QueryService
func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error {
err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) {
state, err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// SetRollback is part of queryservice.QueryService
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (t *explainTablet) CreateTransaction(ctx context.Context, target *querypb.T
}

// StartCommit is part of the QueryService interface.
func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
t.mu.Lock()
t.currentTime = t.vte.batchTime.Wait()
t.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
}

txPhase = Commit2pcStartCommit
err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
_, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (client *QueryClient) CreateTransaction(dtid string, participants []*queryp
}

// StartCommit issues a StartCommit to TabletServer for the current transaction.
func (client *QueryClient) StartCommit(dtid string) error {
func (client *QueryClient) StartCommit(dtid string) (querypb.StartCommitState, error) {
defer func() { client.transactionID = 0 }()
return client.server.StartCommit(client.ctx, client.target, client.transactionID, dtid)
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,9 @@ func TestMMCommitFlow(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "Duplicate entry")

err = client.StartCommit("aa")
state, err := client.StartCommit("aa")
require.NoError(t, err)
assert.Equal(t, querypb.StartCommitState_Success, state)

err = client.SetRollback("aa", 0)
require.EqualError(t, err, "could not transition to ROLLBACK: aa (CallerID: dev)")
Expand Down
7 changes: 2 additions & 5 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,8 @@ func (q *query) StartCommit(ctx context.Context, request *querypb.StartCommitReq
request.EffectiveCallerId,
request.ImmediateCallerId,
)
if err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid); err != nil {
return nil, vterrors.ToGRPC(err)
}

return &querypb.StartCommitResponse{}, nil
state, err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid)
return &querypb.StartCommitResponse{State: state}, vterrors.ToGRPC(err)
}

// SetRollback is part of the queryservice.QueryServer interface
Expand Down
14 changes: 8 additions & 6 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,12 @@ func (conn *gRPCQueryClient) CreateTransaction(ctx context.Context, target *quer

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error {
func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return tabletconn.ConnClosed
// This can be marked as fail as not other process will try to commit this transaction.
return querypb.StartCommitState_Fail, tabletconn.ConnClosed
}

req := &querypb.StartCommitRequest{
Expand All @@ -365,11 +366,12 @@ func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Ta
TransactionId: transactionID,
Dtid: dtid,
}
_, err := conn.c.StartCommit(ctx, req)
if err != nil {
return tabletconn.ErrorFromGRPC(err)
resp, err := conn.c.StartCommit(ctx, req)
err = tabletconn.ErrorFromGRPC(err)
if resp != nil {
return resp.State, err
}
return nil
return querypb.StartCommitState_Unknown, err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type QueryService interface {

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error)

// SetRollback transitions the 2pc transaction to the Rollback state.
// If a transaction id is provided, that transaction is also rolled back.
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ func (ws *wrappedService) CreateTransaction(ctx context.Context, target *querypb
})
}

func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
return ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.StartCommit(ctx, target, transactionID, dtid)
func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
err = ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
state, innerErr = conn.StartCommit(ctx, target, transactionID, dtid)
return canRetry(ctx, innerErr), innerErr
})
return state, err
}

func (ws *wrappedService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error) {
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,18 @@ func (sbc *SandboxConn) CreateTransaction(ctx context.Context, target *querypb.T

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (sbc *SandboxConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
func (sbc *SandboxConn) StartCommit(context.Context, *querypb.Target, int64, string) (state querypb.StartCommitState, err error) {
sbc.panicIfNeeded()
sbc.StartCommitCount.Add(1)
if sbc.MustFailStartCommit > 0 {
sbc.MustFailStartCommit--
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err")
return querypb.StartCommitState_Fail, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err")
}
return sbc.getError()
err = sbc.getError()
if err != nil {
return querypb.StartCommitState_Unknown, err
}
return querypb.StartCommitState_Success, nil
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *queryp
}

// StartCommit is part of the queryservice.QueryService interface
func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
if f.HasError {
return f.TabletError
return querypb.StartCommitState_Fail, f.TabletError
}
if f.Panics {
panic(fmt.Errorf("test-triggered panic"))
Expand All @@ -300,7 +300,7 @@ func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Targ
if dtid != Dtid {
f.t.Errorf("StartCommit: invalid dtid: got %s expected %s", dtid, Dtid)
}
return nil
return querypb.StartCommitState_Success, nil
}

// SetRollback is part of the queryservice.QueryService interface
Expand Down
21 changes: 13 additions & 8 deletions go/vt/vttablet/tabletconntest/tabletconntest.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,26 +294,31 @@ func testStartCommit(t *testing.T, conn queryservice.QueryService, f *FakeQueryS
t.Log("testStartCommit")
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
err := conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
if err != nil {
t.Fatalf("StartCommit failed: %v", err)
}
state, err := conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
assert.Equal(t, querypb.StartCommitState_Success, state, "Unexpected state from StartCommit")
assert.NoError(t, err, "StartCommit failed")
}

func testStartCommitError(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testStartCommitError")
f.HasError = true
testErrorHelper(t, f, "StartCommit", func(ctx context.Context) error {
return conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
var state querypb.StartCommitState
testErrorHelper(t, f, "StartCommit", func(ctx context.Context) (err error) {
state, err = conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
return err
})
f.HasError = false
assert.Equal(t, querypb.StartCommitState_Unknown, state, "Unexpected state from StartCommit")
}

func testStartCommitPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testStartCommitPanics")
testPanicHelper(t, f, "StartCommit", func(ctx context.Context) error {
return conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
var state querypb.StartCommitState
testPanicHelper(t, f, "StartCommit", func(ctx context.Context) (err error) {
state, err = conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid)
return err
})
assert.Equal(t, querypb.StartCommitState_Unknown, state, "Unexpected state from StartCommit")
}

func testSetRollback(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func (ftc *fakeTabletConn) CreateTransaction(ctx context.Context, target *queryp
}

// fakeTabletConn implements the QueryService interface.
func (ftc *fakeTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
return nil
func (ftc *fakeTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
return querypb.StartCommitState_Unknown, nil
}

// fakeTabletConn implements the QueryService interface.
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,16 @@ func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Ta

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) (querypb.StartCommitState, error) {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
return querypb.StartCommitState_Fail, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer dte.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
dte.logStats.TransactionID = transactionID

conn, err := dte.te.txPool.GetAndLock(transactionID, "for 2pc commit")
if err != nil {
return err
return querypb.StartCommitState_Fail, err
}
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

Expand All @@ -254,15 +254,17 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback)
})
// return the error, defer call above will roll back the transaction.
return vterrors.VT10002("cannot commit the transaction on a reserved connection")
return querypb.StartCommitState_Fail, vterrors.VT10002("cannot commit the transaction on a reserved connection")
}

err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateCommit)
if err != nil {
return err
return querypb.StartCommitState_Fail, err
}
_, err = dte.te.txPool.Commit(dte.ctx, conn)
return err
if _, err = dte.te.txPool.Commit(dte.ctx, conn); err != nil {
return querypb.StartCommitState_Unknown, err
}
return querypb.StartCommitState_Success, nil
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
31 changes: 18 additions & 13 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,21 @@ import (
"testing"
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

func TestTxExecutorEmptyPrepare(t *testing.T) {
Expand Down Expand Up @@ -349,14 +348,16 @@ func TestExecutorStartCommit(t *testing.T) {
commitTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = _binary'aa' and state = %d", int(querypb.TransactionState_COMMIT), int(querypb.TransactionState_PREPARE))
db.AddQuery(commitTransition, &sqltypes.Result{RowsAffected: 1})
txid := newTxForPrep(ctx, tsv)
err := txe.StartCommit(txid, "aa")
state, err := txe.StartCommit(txid, "aa")
require.NoError(t, err)
assert.Equal(t, querypb.StartCommitState_Success, state)

db.AddQuery(commitTransition, &sqltypes.Result{})
txid = newTxForPrep(ctx, tsv)
err = txe.StartCommit(txid, "aa")
state, err = txe.StartCommit(txid, "aa")
require.Error(t, err)
require.Contains(t, err.Error(), "could not transition to COMMIT: aa")
assert.Equal(t, querypb.StartCommitState_Fail, state)
}

func TestExecutorStartCommitFailure(t *testing.T) {
Expand All @@ -379,8 +380,9 @@ func TestExecutorStartCommitFailure(t *testing.T) {
db.AddQuery(rollbackTransition, sqltypes.MakeTestResult(nil))

// try 2pc commit of Metadata Manager.
err = txe.StartCommit(txid, "aa")
state, err := txe.StartCommit(txid, "aa")
require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot commit the transaction on a reserved connection")
assert.Equal(t, querypb.StartCommitState_Fail, state)
}

func TestExecutorSetRollback(t *testing.T) {
Expand Down Expand Up @@ -671,7 +673,10 @@ func TestNoTwopc(t *testing.T) {
fun: func() error { return txe.CreateTransaction("aa", nil) },
}, {
desc: "StartCommit",
fun: func() error { return txe.StartCommit(1, "aa") },
fun: func() error {
_, err := txe.StartCommit(1, "aa")
return err
},
}, {
desc: "SetRollback",
fun: func() error { return txe.SetRollback("aa", 1) },
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,16 +773,18 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
return tsv.execRequest(
func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
"StartCommit", "start_commit", nil,
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard)
return txe.StartCommit(transactionID, dtid)
state, err = txe.StartCommit(transactionID, dtid)
return err
},
)
return state, err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,15 @@ func TestTabletServerStartCommit(t *testing.T) {
commitTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = _binary'aa' and state = %d", int(querypb.TransactionState_COMMIT), int(querypb.TransactionState_PREPARE))
db.AddQuery(commitTransition, &sqltypes.Result{RowsAffected: 1})
txid := newTxForPrep(ctx, tsv)
err := tsv.StartCommit(ctx, &target, txid, "aa")
state, err := tsv.StartCommit(ctx, &target, txid, "aa")
require.NoError(t, err)
assert.Equal(t, querypb.StartCommitState_Success, state, "StartCommit state")

db.AddQuery(commitTransition, &sqltypes.Result{})
txid = newTxForPrep(ctx, tsv)
err = tsv.StartCommit(ctx, &target, txid, "aa")
state, err = tsv.StartCommit(ctx, &target, txid, "aa")
assert.EqualError(t, err, "could not transition to COMMIT: aa", "Prepare err")
assert.Equal(t, querypb.StartCommitState_Unknown, state, "StartCommit state")
}

func TestTabletserverSetRollback(t *testing.T) {
Expand Down

0 comments on commit d9db2f4

Please sign in to comment.