Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe block digest endpoints #764

Merged
merged 15 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
Expand Down
27 changes: 27 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,33 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest {
return flow.BlockDigest{
BlockID: flow.BytesToID(m.GetBlockId()),
Height: m.GetBlockHeight(),
Timestamp: m.GetBlockTimestamp().AsTime(),
}
}

func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse {
return &access.SubscribeBlockDigestsResponse{
BlockId: IdentifierToMessage(blockDigest.BlockID),
BlockHeight: blockDigest.Height,
BlockTimestamp: timestamppb.New(blockDigest.Timestamp),
}
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
return entities.BlockStatus_BLOCK_FINALIZED
case flow.BlockStatusSealed:
return entities.BlockStatus_BLOCK_SEALED
default:
return entities.BlockStatus_BLOCK_UNKNOWN
}
}

func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) {
switch encodingVersion {
case flow.EventEncodingVersionCCF:
Expand Down
120 changes: 120 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,3 +1155,123 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: convert.BlockStatusToEntity(blockStatus),
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromLatestRequest{
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlockDigestFromClient[Client interface {
Recv() (*access.SubscribeBlockDigestsResponse, error)
}](
ctx context.Context,
client Client,
blockDigestsChan chan<- flow.BlockDigest,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next blockDigest response
blockDigestResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving blockDigest: %w", err))
return
}

blockDigest := convert.MessageToBlockDigest(blockDigestResponse)

select {
case <-ctx.Done():
return
case blockDigestsChan <- blockDigest:
}
}
}
175 changes: 175 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,151 @@ func TestClient_SubscribeEvents(t *testing.T) {
}))
}

func TestClient_SubscribeBlockDigest(t *testing.T) {
blockHeaders := test.BlockHeaderGenerator()

generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse {
var resBlockDigests []*access.SubscribeBlockDigestsResponse

for i := uint64(0); i < count; i++ {
blockHeader := blockHeaders.New()

digest := flow.BlockDigest{
BlockID: blockHeader.ID,
Height: blockHeader.Height,
Timestamp: blockHeader.Timestamp,
}

resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest))
}

return resBlockDigests
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId)
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlockDigests(t, blockDigestsCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
cancel()

require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))

}
func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse {
responseEvents := make([]*entities.Event, 0, len(events))
for _, e := range events {
Expand Down Expand Up @@ -2207,3 +2352,33 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR

return m.responses[m.offset], nil
}

type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlockDigestsResponse
}

func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) {
defer done()
for range blockDigestsChan {
require.FailNow(t, "should not receive block digests")
}
}
7 changes: 7 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,10 @@ type BlockSeal struct {
// block produces the same receipt among all verifying nodes
ExecutionReceiptID Identifier
}

// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp
type BlockDigest struct {
BlockID Identifier
Height uint64
Timestamp time.Time
}
Loading