diff --git a/access/grpc/client.go b/access/grpc/client.go index d02474fe9..a87b935ac 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -329,6 +329,29 @@ func (c *Client) SubscribeEventsByBlockHeight( return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval)) } +func (c *Client) SubscribeBlockDigestsFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus) +} + +func (c *Client) SubscribeBlockDigestsFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus) +} + +func (c *Client) SubscribeBlockDigestsFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, +) (<-chan flow.BlockDigest, <-chan error, error) { + return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus) +} + func (c *Client) SubscribeBlocksFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index 93a89b841..5bdaf217e 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -207,6 +207,22 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { }, nil } +func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest { + return flow.BlockDigest{ + BlockID: flow.BytesToID(m.GetBlockId()), + Height: m.GetBlockHeight(), + Timestamp: m.GetBlockTimestamp().AsTime(), + } +} + +func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse { + return &access.SubscribeBlockDigestsResponse{ + BlockId: IdentifierToMessage(blockDigest.BlockID), + BlockHeight: blockDigest.Height, + BlockTimestamp: timestamppb.New(blockDigest.Timestamp), + } +} + func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus { switch blockStatus { case flow.BlockStatusFinalized: diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 42bde2b70..dc21cb43d 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1516,3 +1516,138 @@ func receiveBlockHeadersFromClient[Client interface { } } } + +func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( + ctx context.Context, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + + request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{ + StartBlockId: startBlockID.Bytes(), + BlockStatus: status, + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + + request := &access.SubscribeBlockDigestsFromStartHeightRequest{ + StartBlockHeight: startHeight, + BlockStatus: status, + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func (c *BaseClient) SubscribeBlockDigestsFromLatest( + ctx context.Context, + blockStatus flow.BlockStatus, + opts ...grpc.CallOption, +) (<-chan flow.BlockDigest, <-chan error, error) { + status := convert.BlockStatusToEntity(blockStatus) + if status == entities.BlockStatus_BLOCK_UNKNOWN { + return nil, nil, newRPCError(errors.New("unknown block status")) + } + + request := &access.SubscribeBlockDigestsFromLatestRequest{ + BlockStatus: status, + } + + subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + blocksChan := make(chan flow.BlockDigest) + errChan := make(chan error) + + go func() { + defer close(blocksChan) + defer close(errChan) + receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) + }() + + return blocksChan, errChan, nil +} + +func receiveBlockDigestFromClient[Client interface { + Recv() (*access.SubscribeBlockDigestsResponse, error) +}]( + ctx context.Context, + client Client, + blockDigestsChan chan<- flow.BlockDigest, + errChan chan<- error, +) { + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + for { + // Receive the next blockDigest response + blockDigestResponse, err := client.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + + sendErr(fmt.Errorf("error receiving blockDigest: %w", err)) + return + } + + blockDigest := convert.MessageToBlockDigest(blockDigestResponse) + + select { + case <-ctx.Done(): + return + case blockDigestsChan <- blockDigest: + } + } +} diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index f297ff5a2..0c1596292 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2713,3 +2713,178 @@ func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan require.FailNow(t, "should not receive block headers") } } + +func TestClient_SubscribeBlockDigest(t *testing.T) { + blockHeaders := test.BlockHeaderGenerator() + + generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse { + var resBlockDigests []*access.SubscribeBlockDigestsResponse + + for i := uint64(0); i < count; i++ { + blockHeader := blockHeaders.New() + + digest := flow.BlockDigest{ + BlockID: blockHeader.ID, + Height: blockHeader.Height, + Timestamp: blockHeader.Timestamp, + } + + resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest)) + } + + return resBlockDigests + } + + t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + startHeight := uint64(1) + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusSealed) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything). + Return(stream, nil) + + startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId) + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusSealed) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + responses: generateBlockDigestResponses(responseCount), + } + + rpc. + On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualDigest := <-blockDigestsCh + expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + require.Equal(t, expectedDigest, actualDigest) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything). + Return(stream, nil) + + blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoBlockDigests(t, blockDigestsCh, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) +} + +type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*SubscribeBlockDigestsResponse +} + +func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) { + if s.err != nil { + return nil, s.err + } + + if s.offset >= len(s.responses) { + <-s.ctx.Done() + return nil, io.EOF + } + defer func() { s.offset++ }() + + return s.responses[s.offset], nil +} + +func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) { + defer done() + for range blockDigestsChan { + require.FailNow(t, "should not receive block digests") + } +} diff --git a/block.go b/block.go index 5ee39fcc9..3e9a0211e 100644 --- a/block.go +++ b/block.go @@ -76,3 +76,10 @@ type BlockSeal struct { // block produces the same receipt among all verifying nodes ExecutionReceiptID Identifier } + +// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp +type BlockDigest struct { + BlockID Identifier + Height uint64 + Timestamp time.Time +}