Skip to content

Commit

Permalink
Add subscribe blocks enpoinds
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-malachyn committed Sep 18, 2024
1 parent 160a1bc commit be4594f
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 2 deletions.
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.Block, <-chan error, error) {
return c.grpc.SubscribeBlocksFromLatest(ctx, blockStatus)
}

func (c *Client) Close() error {
return c.grpc.Close()
}
Expand Down
11 changes: 11 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
return entities.BlockStatus_BLOCK_FINALIZED
case flow.BlockStatusSealed:
return entities.BlockStatus_BLOCK_SEALED
default:
return entities.BlockStatus_BLOCK_UNKNOWN
}
}

func CadenceValueToMessage(value cadence.Value, encodingVersion flow.EventEncodingVersion) ([]byte, error) {
switch encodingVersion {
case flow.EventEncodingVersionCCF:
Expand Down
124 changes: 124 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,3 +929,127 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
request := &access.SubscribeBlocksFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
request := &access.SubscribeBlocksFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlocksFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.Block, <-chan error, error) {
request := &access.SubscribeBlocksFromLatestRequest{
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlocksFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.Block)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlocksFromClient[Client interface {
Recv() (*access.SubscribeBlocksResponse, error)
}](
ctx context.Context,
client Client,
blocksChan chan<- flow.Block,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next block response
blockResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving block: %w", err))
return
}

block, err := convert.MessageToBlock(blockResponse.GetBlock())
if err != nil {
sendErr(fmt.Errorf("error converting message to block: %w", err))
return
}

select {
case <-ctx.Done():
return
case blocksChan <- block:
}
}
}
175 changes: 175 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,3 +1720,178 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR

return m.responses[m.offset], nil
}

func TestClient_SubscribeBlocks(t *testing.T) {
blocks := test.BlockGenerator()

generateBlockResponses := func(count uint64) []*access.SubscribeBlocksResponse {
var resBlocks []*access.SubscribeBlocksResponse

for i := uint64(0); i < count; i++ {
b, err := convert.BlockToMessage(*blocks.New())
require.NoError(t, err)

resBlocks = append(resBlocks, &access.SubscribeBlocksResponse{
Block: b,
})
}

return resBlocks
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].Block.Id)
blockCh, errCh, err := c.SubscribeBlocksFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
responses: generateBlockResponses(responseCount),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualBlock := <-blockCh
expectedBlock, err := convert.MessageToBlock(stream.responses[i].GetBlock())
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlock)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlocksFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockCh, errCh, err := c.SubscribeBlocksFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlocks(t, blockCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockClientStream[SubscribeBlocksResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlocksResponse
}

// TODO: wtf is offset doing?
func (s *mockBlockClientStream[SubscribeBlocksResponse]) Recv() (*SubscribeBlocksResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) {
defer done()
for range blocksCh {
require.FailNow(t, "should not receive blocks")
}
}
4 changes: 2 additions & 2 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.4
replace github.com/onflow/flow-go-sdk => ../

require (
github.com/onflow/cadence v1.0.0-preview.38
github.com/onflow/cadence v1.0.0-preview.52
github.com/onflow/flow-cli/flowkit v1.11.0
github.com/onflow/flow-go-sdk v0.41.17
github.com/spf13/afero v1.11.0
Expand Down Expand Up @@ -41,7 +41,7 @@ require (
github.com/logrusorgru/aurora/v4 v4.0.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/onflow/atree v0.7.0-rc.2 // indirect
github.com/onflow/atree v0.8.0-rc.6 // indirect
github.com/onflow/crypto v0.25.1 // indirect
github.com/onflow/flow/protobuf/go/flow v0.4.3 // indirect
github.com/onflow/sdks v0.6.0-preview.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ github.com/onflow/atree v0.6.0/go.mod h1:gBHU0M05qCbv9NN0kijLWMgC47gHVNBIp4KmsVF
github.com/onflow/atree v0.6.1-0.20230711151834-86040b30171f/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ0tHf4DRuM=
github.com/onflow/atree v0.6.1-0.20240429171449-cb486ceb1f9c/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ0tHf4DRuM=
github.com/onflow/atree v0.7.0-rc.2/go.mod h1:xvP61FoOs95K7IYdIYRnNcYQGf4nbF/uuJ0tHf4DRuM=
github.com/onflow/atree v0.8.0-rc.6/go.mod h1:yccR+LR7xc1Jdic0mrjocbHvUD7lnVvg8/Ct1AA5zBo=
github.com/onflow/cadence v0.42.7 h1:Qp9VYX901saO7wPwF/rwV4cMS+0mfWxnm9EqbYElYy4=
github.com/onflow/cadence v0.42.7/go.mod h1:raU8va8QRyTa/eUbhej4mbyW2ETePfSaywoo36MddgE=
github.com/onflow/cadence v1.0.0-M3/go.mod h1:odXGZZ/wGNA5mwT8bC9v8u8EXACHllB2ABSZK65TGL8=
Expand All @@ -115,6 +116,7 @@ github.com/onflow/cadence v1.0.0-preview.31/go.mod h1:3LM1VgE9HkJ815whY/F0LYWULw
github.com/onflow/cadence v1.0.0-preview.35/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.36/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU=
github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg=
github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
Expand Down

0 comments on commit be4594f

Please sign in to comment.