From 64d91e20069e4db0902ab1fe1d90fe5980df4721 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 27 Sep 2024 14:37:51 +0300 Subject: [PATCH] Changed endpoint to return the whole tx result, added msg index check --- access/grpc/client.go | 2 +- access/grpc/grpc.go | 27 +++++++++++++++++++------- access/grpc/grpc_test.go | 42 ++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/access/grpc/client.go b/access/grpc/client.go index e5fcebb32..6cfcff0c2 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo func (c *Client) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, -) (<-chan flow.TransactionStatus, <-chan error, error) { +) (<-chan flow.TransactionResult, <-chan error, error) { return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 8ca586b5d..d3bfe99e7 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1133,7 +1133,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, opts ...grpc.CallOption, -) (<-chan flow.TransactionStatus, <-chan error, error) { +) (<-chan flow.TransactionResult, <-chan error, error) { txMsg, err := convert.TransactionToMessage(tx) if err != nil { return nil, nil, newEntityToMessageError(entityTransaction, err) @@ -1149,7 +1149,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( return nil, nil, newRPCError(err) } - txStatusChan := make(chan flow.TransactionStatus) + txStatusChan := make(chan flow.TransactionResult) errChan := make(chan error) sendErr := func(err error) { @@ -1163,24 +1163,37 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( defer close(txStatusChan) defer close(errChan) + messageIndex := uint64(0) + for { - // Receive the next txStatus response - txStatusResponse, err := subscribeClient.Recv() + // Receive the next txResult response + txResultsResponse, err := subscribeClient.Recv() if err != nil { if err == io.EOF { // End of stream, return gracefully return } - sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + sendErr(fmt.Errorf("error receiving transaction result: %w", err)) + return + } + + if messageIndex != txResultsResponse.GetMessageIndex() { + sendErr(fmt.Errorf("tx result response was lost")) + return + } + + txResult, err := convert.MessageToTransactionResult(txResultsResponse.GetTransactionResults(), c.jsonOptions) + if err != nil { + sendErr(fmt.Errorf("error converting transaction result: %w", err)) return } - txStatus := flow.TransactionStatus(txStatusResponse.GetTransactionResults().Status) + messageIndex++ select { case <-ctx.Done(): return - case txStatusChan <- txStatus: + case txStatusChan <- txResult: } } }() diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index c596f378a..e67052de0 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2154,7 +2154,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { transactions := test.TransactionGenerator() generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse { - var resTransactionStatuses []*access.SendAndSubscribeTransactionStatusesResponse + var resTransactionResults []*access.SendAndSubscribeTransactionStatusesResponse results := test.TransactionResultGenerator(encodingVersion) for i := uint64(0); i < count; i++ { @@ -2163,12 +2163,13 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { response := &access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: transactionResult, + MessageIndex: i, } - resTransactionStatuses = append(resTransactionStatuses, response) + resTransactionResults = append(resTransactionResults, response) } - return resTransactionStatuses + return resTransactionResults } t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { @@ -2183,17 +2184,23 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) - txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go assertNoErrors(t, errCh, wg.Done) + expectedCounter := uint64(0) + for i := uint64(0); i < responseCount; i++ { - actualTxStatus := <-txStatusesCh - expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) - require.Equal(t, expectedTxStatus, actualTxStatus) + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ } cancel() @@ -2212,17 +2219,22 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) - txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go assertNoErrors(t, errCh, wg.Done) + expectedCounter := uint64(0) for i := uint64(0); i < responseCount; i++ { - actualTxStatus := <-txStatusesCh - expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) - require.Equal(t, expectedTxStatus, actualTxStatus) + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ } cancel() @@ -2240,12 +2252,12 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything). Return(stream, nil) - txStatusChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) + txResultChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) - go assertNoTxStatuses(t, txStatusChan, wg.Done) + go assertNoTxResults(t, txResultChan, wg.Done) errorCount := 0 for e := range errCh { @@ -2285,9 +2297,9 @@ func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTr return m.responses[m.offset], nil } -func assertNoTxStatuses[TxStatus any](t *testing.T, txStatusChan <-chan TxStatus, done func()) { +func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) { defer done() - for range txStatusChan { + for range txResultChan { require.FailNow(t, "should not receive txStatus") } }