Skip to content

Commit

Permalink
rpc: retrieve unresolved transactions (#16356)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Jul 11, 2024
1 parent cd0c2b5 commit ba5297d
Show file tree
Hide file tree
Showing 23 changed files with 2,525 additions and 877 deletions.
1,516 changes: 839 additions & 677 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

433 changes: 433 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Large diffs are not rendered by default.

350 changes: 180 additions & 170 deletions go/vt/proto/queryservice/queryservice.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions go/vt/proto/queryservice/queryservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ func (itc *internalTabletConn) ReadTransaction(ctx context.Context, target *quer
return metadata, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// UnresolvedTransactions is part of queryservice.QueryService
func (itc *internalTabletConn) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
transactions, err = itc.tablet.qsc.QueryService().UnresolvedTransactions(ctx, target)
return transactions, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// BeginExecute is part of queryservice.QueryService
func (itc *internalTabletConn) BeginExecute(
ctx context.Context,
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func (client *QueryClient) ReadTransaction(dtid string) (*querypb.TransactionMet
return client.server.ReadTransaction(client.ctx, client.target, dtid)
}

// UnresolvedTransactions invokes the UnresolvedTransactions API of TabletServer.
func (client *QueryClient) UnresolvedTransactions() ([]*querypb.TransactionMetadata, error) {
return client.server.UnresolvedTransactions(client.ctx, client.target)
}

// SetServingType is for testing transitions.
// It currently supports only primary->replica and back.
func (client *QueryClient) SetServingType(tabletType topodatapb.TabletType) error {
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,32 @@ func TestManualTwopcz(t *testing.T) {
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

// TestUnresolvedTransactions tests the UnresolvedTransactions API.
func TestUnresolvedTransactions(t *testing.T) {
client := framework.NewClient()

participants := []*querypb.Target{
{Keyspace: "ks1", Shard: "-80"},
{Keyspace: "ks1", Shard: "80-"},
}
err := client.CreateTransaction("dtid01", participants)
require.NoError(t, err)

// expected no transaction to show here, as 1 second not passed.
transactions, err := client.UnresolvedTransactions()
require.NoError(t, err)
require.Empty(t, transactions)

// abandon age is 1 second.
time.Sleep(2 * time.Second)

transactions, err = client.UnresolvedTransactions()
require.NoError(t, err)
want := []*querypb.TransactionMetadata{{
Dtid: "dtid01",
State: querypb.TransactionState_PREPARE,
Participants: participants,
}}
utils.MustMatch(t, want, transactions)
}
15 changes: 15 additions & 0 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,21 @@ func (q *query) ReadTransaction(ctx context.Context, request *querypb.ReadTransa
return &querypb.ReadTransactionResponse{Metadata: result}, nil
}

// UnresolvedTransactions is part of the queryservice.QueryServer interface
func (q *query) UnresolvedTransactions(ctx context.Context, request *querypb.UnresolvedTransactionsRequest) (response *querypb.UnresolvedTransactionsResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
transactions, err := q.server.UnresolvedTransactions(ctx, request.Target)
if err != nil {
return nil, vterrors.ToGRPC(err)
}

return &querypb.UnresolvedTransactionsResponse{Transactions: transactions}, nil
}

// BeginExecute is part of the queryservice.QueryServer interface
func (q *query) BeginExecute(ctx context.Context, request *querypb.BeginExecuteRequest) (response *querypb.BeginExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
Expand Down
20 changes: 20 additions & 0 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,26 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp
return response.Metadata, nil
}

// UnresolvedTransactions returns all unresolved distributed transactions.
func (conn *gRPCQueryClient) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, tabletconn.ConnClosed
}

req := &querypb.UnresolvedTransactionsRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
}
response, err := conn.c.UnresolvedTransactions(ctx, req)
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
}
return response.Transactions, nil
}

// BeginExecute starts a transaction and runs an Execute.
func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state queryservice.TransactionState, result *sqltypes.Result, err error) {
conn.mu.RLock()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type QueryService interface {
// ReadTransaction returns the metadata for the specified dtid.
ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)

// UnresolvedTransactions returns the list of unresolved distributed transactions.
UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error)

// Execute for query execution
Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error)
// StreamExecute for query execution with streaming
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T
return metadata, err
}

func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = ws.wrapper(ctx, target, ws.impl, "UnresolvedTransactions", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
transactions, innerErr = conn.UnresolvedTransactions(ctx, target)
return canRetry(ctx, innerErr), innerErr
})
return transactions, err
}

func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (qr *sqltypes.Result, err error) {
inDedicatedConn := transactionID != 0 || reservedID != 0
err = ws.wrapper(ctx, target, ws.impl, "Execute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar
return nil, nil
}

// UnresolvedTransactions is part of the QueryService interface.
func (sbc *SandboxConn) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) {
// TODO implement me
panic("implement me")
}

// BeginExecute is part of the QueryService interface.
func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {
sbc.panicIfNeeded()
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,18 @@ func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.
return Metadata, nil
}

// UnresolvedTransactions is part of the queryservice.QueryService interface
func (f *FakeQueryService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
if f.HasError {
return nil, f.TabletError
}
if f.Panics {
panic(fmt.Errorf("test-triggered panic"))
}
f.checkTargetCallerID(ctx, "UnresolvedTransactions", target)
return []*querypb.TransactionMetadata{Metadata}, nil
}

// ExecuteQuery is a fake test query.
const ExecuteQuery = "executeQuery"

Expand Down
30 changes: 30 additions & 0 deletions go/vt/vttablet/tabletconntest/tabletconntest.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,33 @@ func testReadTransactionPanics(t *testing.T, conn queryservice.QueryService, f *
})
}

func testUnresolvedTransactions(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactions")
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
transactions, err := conn.UnresolvedTransactions(ctx, TestTarget)
require.NoError(t, err)
require.True(t, proto.Equal(transactions[0], Metadata))
}

func testUnresolvedTransactionsError(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsError")
f.HasError = true
testErrorHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
f.HasError = false
}

func testUnresolvedTransactionsPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsPanics")
testPanicHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
}

func testExecute(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testExecute")
f.ExpectedTransactionID = ExecuteTransactionID
Expand Down Expand Up @@ -936,6 +963,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollback,
testConcludeTransaction,
testReadTransaction,
testUnresolvedTransactions,
testExecute,
testBeginExecute,
testStreamExecute,
Expand All @@ -956,6 +984,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackError,
testConcludeTransactionError,
testReadTransactionError,
testUnresolvedTransactionsError,
testExecuteError,
testBeginExecuteErrorInBegin,
testBeginExecuteErrorInExecute,
Expand All @@ -979,6 +1008,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackPanics,
testConcludeTransactionPanics,
testReadTransactionPanics,
testUnresolvedTransactionsPanics,
testExecutePanics,
testBeginExecutePanics,
testStreamExecutePanics,
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"context"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

// DTExecutor is used for executing a distributed transactional request.
Expand Down Expand Up @@ -290,3 +288,8 @@ func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
}
return nil
}

// UnresolvedTransactions returns the list of unresolved distributed transactions.
func (dte *DTExecutor) UnresolvedTransactions() ([]*querypb.TransactionMetadata, error) {
return dte.te.twoPC.UnresolvedTransactions(dte.ctx, time.Now().Add(-dte.te.abandonAge))
}
24 changes: 19 additions & 5 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand All @@ -67,11 +71,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// logPoolFull is for throttling transaction / query pool full messages in the log.
Expand Down Expand Up @@ -748,6 +747,21 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta
return metadata, err
}

// UnresolvedTransactions returns the unresolved distributed transaction record.
func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
"UnresolvedTransactions", "unresolved_transaction", nil,
target, nil, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
transactions, err = txe.UnresolvedTransactions()
return err
},
)
return
}

// Execute executes the query and returns the result as response.
func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
span, ctx := trace.NewSpan(ctx, "TabletServer.Execute")
Expand Down
Loading

0 comments on commit ba5297d

Please sign in to comment.