diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index 81b157704b6..a6c07ece524 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -539,9 +539,9 @@ func (itc *internalTabletConn) CreateTransaction( } // StartCommit is part of queryservice.QueryService -func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error { - err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid) - return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err)) +func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) { + state, err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid) + return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err)) } // SetRollback is part of queryservice.QueryService diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index ed977e7bcb0..644135f1e3b 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -232,7 +232,7 @@ func (t *explainTablet) CreateTransaction(ctx context.Context, target *querypb.T } // StartCommit is part of the QueryService interface. -func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { +func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { t.mu.Lock() t.currentTime = t.vte.batchTime.Wait() t.mu.Unlock() diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 968b41d38d3..82ec72a694a 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -259,7 +259,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } txPhase = Commit2pcStartCommit - err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) + _, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err } diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 04342d57aa4..59def25ab61 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -158,7 +158,7 @@ func (client *QueryClient) CreateTransaction(dtid string, participants []*queryp } // StartCommit issues a StartCommit to TabletServer for the current transaction. -func (client *QueryClient) StartCommit(dtid string) error { +func (client *QueryClient) StartCommit(dtid string) (querypb.StartCommitState, error) { defer func() { client.transactionID = 0 }() return client.server.StartCommit(client.ctx, client.target, client.transactionID, dtid) } diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 58a1cacbe10..44dde2280a4 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -529,8 +529,9 @@ func TestMMCommitFlow(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "Duplicate entry") - err = client.StartCommit("aa") + state, err := client.StartCommit("aa") require.NoError(t, err) + assert.Equal(t, querypb.StartCommitState_Success, state) err = client.SetRollback("aa", 0) require.EqualError(t, err, "could not transition to ROLLBACK: aa (CallerID: dev)") diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index a1a1283b4ad..e3c179ce856 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -183,11 +183,8 @@ func (q *query) StartCommit(ctx context.Context, request *querypb.StartCommitReq request.EffectiveCallerId, request.ImmediateCallerId, ) - if err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid); err != nil { - return nil, vterrors.ToGRPC(err) - } - - return &querypb.StartCommitResponse{}, nil + state, err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid) + return &querypb.StartCommitResponse{State: state}, vterrors.ToGRPC(err) } // SetRollback is part of the queryservice.QueryServer interface diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index 16f93a44372..d2d5604d808 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -351,11 +351,12 @@ func (conn *gRPCQueryClient) CreateTransaction(ctx context.Context, target *quer // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. -func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error { +func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) { conn.mu.RLock() defer conn.mu.RUnlock() if conn.cc == nil { - return tabletconn.ConnClosed + // This can be marked as fail as not other process will try to commit this transaction. + return querypb.StartCommitState_Fail, tabletconn.ConnClosed } req := &querypb.StartCommitRequest{ @@ -365,11 +366,12 @@ func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Ta TransactionId: transactionID, Dtid: dtid, } - _, err := conn.c.StartCommit(ctx, req) - if err != nil { - return tabletconn.ErrorFromGRPC(err) + resp, err := conn.c.StartCommit(ctx, req) + err = tabletconn.ErrorFromGRPC(err) + if resp != nil { + return resp.State, err } - return nil + return querypb.StartCommitState_Unknown, err } // SetRollback transitions the 2pc transaction to the Rollback state. diff --git a/go/vt/vttablet/queryservice/queryservice.go b/go/vt/vttablet/queryservice/queryservice.go index 1e48c9b7b1d..d6972bfb6a3 100644 --- a/go/vt/vttablet/queryservice/queryservice.go +++ b/go/vt/vttablet/queryservice/queryservice.go @@ -63,7 +63,7 @@ type QueryService interface { // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. - StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) + StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) // SetRollback transitions the 2pc transaction to the Rollback state. // If a transaction id is provided, that transaction is also rolled back. diff --git a/go/vt/vttablet/queryservice/wrapped.go b/go/vt/vttablet/queryservice/wrapped.go index 2bc8346dac2..c72a472a5cb 100644 --- a/go/vt/vttablet/queryservice/wrapped.go +++ b/go/vt/vttablet/queryservice/wrapped.go @@ -146,11 +146,13 @@ func (ws *wrappedService) CreateTransaction(ctx context.Context, target *querypb }) } -func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { - return ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { - innerErr := conn.StartCommit(ctx, target, transactionID, dtid) +func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { + err = ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + var innerErr error + state, innerErr = conn.StartCommit(ctx, target, transactionID, dtid) return canRetry(ctx, innerErr), innerErr }) + return state, err } func (ws *wrappedService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error) { diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 6fbf0032b2a..148c630543e 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -404,14 +404,18 @@ func (sbc *SandboxConn) CreateTransaction(ctx context.Context, target *querypb.T // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. -func (sbc *SandboxConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { +func (sbc *SandboxConn) StartCommit(context.Context, *querypb.Target, int64, string) (state querypb.StartCommitState, err error) { sbc.panicIfNeeded() sbc.StartCommitCount.Add(1) if sbc.MustFailStartCommit > 0 { sbc.MustFailStartCommit-- - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err") + return querypb.StartCommitState_Fail, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err") } - return sbc.getError() + err = sbc.getError() + if err != nil { + return querypb.StartCommitState_Unknown, err + } + return querypb.StartCommitState_Success, nil } // SetRollback transitions the 2pc transaction to the Rollback state. diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index ac08e095f4d..2d62b017433 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -286,9 +286,9 @@ func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *queryp } // StartCommit is part of the queryservice.QueryService interface -func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { +func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { if f.HasError { - return f.TabletError + return querypb.StartCommitState_Fail, f.TabletError } if f.Panics { panic(fmt.Errorf("test-triggered panic")) @@ -300,7 +300,7 @@ func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Targ if dtid != Dtid { f.t.Errorf("StartCommit: invalid dtid: got %s expected %s", dtid, Dtid) } - return nil + return querypb.StartCommitState_Success, nil } // SetRollback is part of the queryservice.QueryService interface diff --git a/go/vt/vttablet/tabletconntest/tabletconntest.go b/go/vt/vttablet/tabletconntest/tabletconntest.go index 0674de3f663..416d4de9d00 100644 --- a/go/vt/vttablet/tabletconntest/tabletconntest.go +++ b/go/vt/vttablet/tabletconntest/tabletconntest.go @@ -294,26 +294,31 @@ func testStartCommit(t *testing.T, conn queryservice.QueryService, f *FakeQueryS t.Log("testStartCommit") ctx := context.Background() ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID) - err := conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) - if err != nil { - t.Fatalf("StartCommit failed: %v", err) - } + state, err := conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) + assert.Equal(t, querypb.StartCommitState_Success, state, "Unexpected state from StartCommit") + assert.NoError(t, err, "StartCommit failed") } func testStartCommitError(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) { t.Log("testStartCommitError") f.HasError = true - testErrorHelper(t, f, "StartCommit", func(ctx context.Context) error { - return conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) + var state querypb.StartCommitState + testErrorHelper(t, f, "StartCommit", func(ctx context.Context) (err error) { + state, err = conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) + return err }) f.HasError = false + assert.Equal(t, querypb.StartCommitState_Unknown, state, "Unexpected state from StartCommit") } func testStartCommitPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) { t.Log("testStartCommitPanics") - testPanicHelper(t, f, "StartCommit", func(ctx context.Context) error { - return conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) + var state querypb.StartCommitState + testPanicHelper(t, f, "StartCommit", func(ctx context.Context) (err error) { + state, err = conn.StartCommit(ctx, TestTarget, commitTransactionID, Dtid) + return err }) + assert.Equal(t, querypb.StartCommitState_Unknown, state, "Unexpected state from StartCommit") } func testSetRollback(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) { diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 628b79d55f2..35aa7a08b46 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -254,8 +254,8 @@ func (ftc *fakeTabletConn) CreateTransaction(ctx context.Context, target *queryp } // fakeTabletConn implements the QueryService interface. -func (ftc *fakeTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { - return nil +func (ftc *fakeTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { + return querypb.StartCommitState_Unknown, nil } // fakeTabletConn implements the QueryService interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 126c99814b8..895293ba3c2 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -235,16 +235,16 @@ func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Ta // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. -func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { +func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) (querypb.StartCommitState, error) { if !dte.te.twopcEnabled { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") + return querypb.StartCommitState_Fail, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer dte.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now()) dte.logStats.TransactionID = transactionID conn, err := dte.te.txPool.GetAndLock(transactionID, "for 2pc commit") if err != nil { - return err + return querypb.StartCommitState_Fail, err } defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn) @@ -254,15 +254,17 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback) }) // return the error, defer call above will roll back the transaction. - return vterrors.VT10002("cannot commit the transaction on a reserved connection") + return querypb.StartCommitState_Fail, vterrors.VT10002("cannot commit the transaction on a reserved connection") } err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateCommit) if err != nil { - return err + return querypb.StartCommitState_Fail, err } - _, err = dte.te.txPool.Commit(dte.ctx, conn) - return err + if _, err = dte.te.txPool.Commit(dte.ctx, conn); err != nil { + return querypb.StartCommitState_Unknown, err + } + return querypb.StartCommitState_Success, nil } // SetRollback transitions the 2pc transaction to the Rollback state. diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 103112bd7c7..d5322f352f9 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -27,22 +27,21 @@ import ( "testing" "time" - "vitess.io/vitess/go/event/syslogger" - "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" - "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/event/syslogger" "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" + "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" ) func TestTxExecutorEmptyPrepare(t *testing.T) { @@ -349,14 +348,16 @@ func TestExecutorStartCommit(t *testing.T) { commitTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = _binary'aa' and state = %d", int(querypb.TransactionState_COMMIT), int(querypb.TransactionState_PREPARE)) db.AddQuery(commitTransition, &sqltypes.Result{RowsAffected: 1}) txid := newTxForPrep(ctx, tsv) - err := txe.StartCommit(txid, "aa") + state, err := txe.StartCommit(txid, "aa") require.NoError(t, err) + assert.Equal(t, querypb.StartCommitState_Success, state) db.AddQuery(commitTransition, &sqltypes.Result{}) txid = newTxForPrep(ctx, tsv) - err = txe.StartCommit(txid, "aa") + state, err = txe.StartCommit(txid, "aa") require.Error(t, err) require.Contains(t, err.Error(), "could not transition to COMMIT: aa") + assert.Equal(t, querypb.StartCommitState_Fail, state) } func TestExecutorStartCommitFailure(t *testing.T) { @@ -379,8 +380,9 @@ func TestExecutorStartCommitFailure(t *testing.T) { db.AddQuery(rollbackTransition, sqltypes.MakeTestResult(nil)) // try 2pc commit of Metadata Manager. - err = txe.StartCommit(txid, "aa") + state, err := txe.StartCommit(txid, "aa") require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot commit the transaction on a reserved connection") + assert.Equal(t, querypb.StartCommitState_Fail, state) } func TestExecutorSetRollback(t *testing.T) { @@ -671,7 +673,10 @@ func TestNoTwopc(t *testing.T) { fun: func() error { return txe.CreateTransaction("aa", nil) }, }, { desc: "StartCommit", - fun: func() error { return txe.StartCommit(1, "aa") }, + fun: func() error { + _, err := txe.StartCommit(1, "aa") + return err + }, }, { desc: "SetRollback", fun: func() error { return txe.SetRollback("aa", 1) }, diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index f96911971be..5fbf13f2c5e 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -773,16 +773,18 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb. // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. -func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) { - return tsv.execRequest( +func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { + err = tsv.execRequest( ctx, tsv.loadQueryTimeout(), "StartCommit", "start_commit", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) - return txe.StartCommit(transactionID, dtid) + state, err = txe.StartCommit(transactionID, dtid) + return err }, ) + return state, err } // SetRollback transitions the 2pc transaction to the Rollback state. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index be842e8e76e..1ae65cac070 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -323,13 +323,15 @@ func TestTabletServerStartCommit(t *testing.T) { commitTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = _binary'aa' and state = %d", int(querypb.TransactionState_COMMIT), int(querypb.TransactionState_PREPARE)) db.AddQuery(commitTransition, &sqltypes.Result{RowsAffected: 1}) txid := newTxForPrep(ctx, tsv) - err := tsv.StartCommit(ctx, &target, txid, "aa") + state, err := tsv.StartCommit(ctx, &target, txid, "aa") require.NoError(t, err) + assert.Equal(t, querypb.StartCommitState_Success, state, "StartCommit state") db.AddQuery(commitTransition, &sqltypes.Result{}) txid = newTxForPrep(ctx, tsv) - err = tsv.StartCommit(ctx, &target, txid, "aa") + state, err = tsv.StartCommit(ctx, &target, txid, "aa") assert.EqualError(t, err, "could not transition to COMMIT: aa", "Prepare err") + assert.Equal(t, querypb.StartCommitState_Unknown, state, "StartCommit state") } func TestTabletserverSetRollback(t *testing.T) {