diff --git a/access/grpc/client.go b/access/grpc/client.go index 149b47375..667bfcc67 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -204,6 +204,13 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo return c.grpc.GetTransactionResultsByBlockID(ctx, blockID) } +func (c *Client) SendAndSubscribeTransactionStatuses( + ctx context.Context, + tx flow.Transaction, +) (<-chan flow.TransactionResult, <-chan error, error) { + return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx) +} + func (c *Client) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) { return c.grpc.GetAccount(ctx, address) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 9b22179ea..f84212df2 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1255,6 +1255,78 @@ func (c *BaseClient) SubscribeBlocksFromLatest( return blocksChan, errChan, nil } +func (c *BaseClient) SendAndSubscribeTransactionStatuses( + ctx context.Context, + tx flow.Transaction, + opts ...grpc.CallOption, +) (<-chan flow.TransactionResult, <-chan error, error) { + txMsg, err := convert.TransactionToMessage(tx) + if err != nil { + return nil, nil, newEntityToMessageError(entityTransaction, err) + } + + req := &access.SendAndSubscribeTransactionStatusesRequest{ + Transaction: txMsg, + EventEncodingVersion: c.eventEncoding, + } + + subscribeClient, err := c.rpcClient.SendAndSubscribeTransactionStatuses(ctx, req, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + txStatusChan := make(chan flow.TransactionResult) + errChan := make(chan error) + + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + go func() { + defer close(txStatusChan) + defer close(errChan) + + messageIndex := uint64(0) + + for { + // Receive the next txResult response + txResultsResponse, err := subscribeClient.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + sendErr(fmt.Errorf("error receiving transaction result: %w", err)) + return + } + + if messageIndex != txResultsResponse.GetMessageIndex() { + sendErr(fmt.Errorf("tx result response was lost")) + return + } + + txResult, err := convert.MessageToTransactionResult(txResultsResponse.GetTransactionResults(), c.jsonOptions) + if err != nil { + sendErr(fmt.Errorf("error converting transaction result: %w", err)) + return + } + + messageIndex++ + + select { + case <-ctx.Done(): + return + case txStatusChan <- txResult: + } + } + }() + + return txStatusChan, errChan, nil +} + func receiveBlocksFromClient[Client interface { Recv() (*access.SubscribeBlocksResponse, error) }]( diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 6dc541a30..a5991e4df 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2381,3 +2381,157 @@ func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) { require.FailNow(t, "should not receive blocks") } } + +func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { + transactions := test.TransactionGenerator() + + generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse { + var resTransactionResults []*access.SendAndSubscribeTransactionStatusesResponse + results := test.TransactionResultGenerator(encodingVersion) + + for i := uint64(0); i < count; i++ { + expectedResult := results.New() + transactionResult, _ := convert.TransactionResultToMessage(expectedResult, encodingVersion) + + response := &access.SendAndSubscribeTransactionStatusesResponse{ + TransactionResults: transactionResult, + MessageIndex: i, + } + + resTransactionResults = append(resTransactionResults, response) + } + + return resTransactionResults + } + + t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + tx := transactions.New() + + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionCCF), + } + + rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) + + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + expectedCounter := uint64(0) + + for i := uint64(0); i < responseCount; i++ { + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - JSON-CDC", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + tx := transactions.New() + + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionJSONCDC), + } + + rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) + + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + expectedCounter := uint64(0) + for i := uint64(0); i < responseCount; i++ { + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything). + Return(stream, nil) + + txResultChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoTxResults(t, txResultChan, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) + +} + +type mockTransactionStatusesClientStream struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*access.SendAndSubscribeTransactionStatusesResponse +} + +func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTransactionStatusesResponse, error) { + if m.err != nil { + return nil, m.err + } + + if m.offset >= len(m.responses) { + <-m.ctx.Done() + return nil, io.EOF + } + defer func() { m.offset++ }() + + return m.responses[m.offset], nil +} + +func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) { + defer done() + for range txResultChan { + require.FailNow(t, "should not receive txStatus") + } +} diff --git a/examples/go.mod b/examples/go.mod index 6fd2b9c50..255c1752a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.4 replace github.com/onflow/flow-go-sdk => ../ require ( - github.com/onflow/cadence v1.0.0-preview.52 + github.com/onflow/cadence v1.0.0 github.com/onflow/flow-cli/flowkit v1.11.0 github.com/onflow/flow-go-sdk v0.41.17 github.com/spf13/afero v1.11.0 diff --git a/examples/go.sum b/examples/go.sum index 1a3ba9dca..78cf35e3b 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -117,6 +117,7 @@ github.com/onflow/cadence v1.0.0-preview.35/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmp github.com/onflow/cadence v1.0.0-preview.36/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0= github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= +github.com/onflow/cadence v1.0.0/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=