Skip to content

Commit

Permalink
feat: also share block heights when listening for new blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Oct 8, 2024
1 parent dc10ff9 commit ba5dc9f
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 134 deletions.
162 changes: 108 additions & 54 deletions proto/tendermint/rpc/grpc/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/tendermint/rpc/grpc/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ message ValidatorSetResponse {

message NewHeightEvent {
int64 height = 1;
bytes hash = 2;
}

//----------------------------------------
Expand Down
49 changes: 23 additions & 26 deletions rpc/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcast
}, nil
}

type blockAPI struct {
type BlockAPI struct {
sync.Mutex
ctx context.Context
heightListeners map[chan int64]struct{}
heightListeners map[chan NewHeightEvent]struct{}
newBlockSubscription types2.Subscription
}

func NewBlockAPI(ctx context.Context) *blockAPI {
return &blockAPI{
func NewBlockAPI(ctx context.Context) *BlockAPI {
return &BlockAPI{
ctx: ctx,
// TODO(rach-id) make 1000 configurable if there is a need for it
heightListeners: make(map[chan int64]struct{}, 1000),
heightListeners: make(map[chan NewHeightEvent]struct{}, 1000),
}
}

func (blockAPI *blockAPI) StartNewBlockEventListener() {
func (blockAPI *BlockAPI) StartNewBlockEventListener() {
env := core.GetEnvironment()
if blockAPI.newBlockSubscription == nil {
var err error
Expand Down Expand Up @@ -85,38 +85,38 @@ func (blockAPI *blockAPI) StartNewBlockEventListener() {
env.Logger.Debug("couldn't cast event data to new block")
continue
}
blockAPI.broadcastToListeners(data.Block.Height)
blockAPI.broadcastToListeners(data.Block.Height, data.Block.DataHash)
}
}

}

func (blockAPI *blockAPI) broadcastToListeners(height int64) {
func (blockAPI *BlockAPI) broadcastToListeners(height int64, hash []byte) {
for ch := range blockAPI.heightListeners {
select {
case <-blockAPI.ctx.Done():
return
case ch <- height:
case ch <- NewHeightEvent{Height: height, Hash: hash}:
}
}
}

func (blockAPI *blockAPI) addHeightListener() chan int64 {
func (blockAPI *BlockAPI) addHeightListener() chan NewHeightEvent {
blockAPI.Lock()
defer blockAPI.Unlock()
ch := make(chan int64, 50)
ch := make(chan NewHeightEvent, 50)
blockAPI.heightListeners[ch] = struct{}{}
return ch
}

func (blockAPI *blockAPI) removeHeightListener(ch chan int64) {
func (blockAPI *BlockAPI) removeHeightListener(ch chan NewHeightEvent) {
blockAPI.Lock()
defer blockAPI.Unlock()
delete(blockAPI.heightListeners, ch)
close(ch)
}

func (blockAPI *blockAPI) BlockByHash(req *BlockByHashRequest, stream BlockAPI_BlockByHashServer) error {
func (blockAPI *BlockAPI) BlockByHash(req *BlockByHashRequest, stream BlockAPI_BlockByHashServer) error {
blockStore := core.GetEnvironment().BlockStore
blockMeta := blockStore.LoadBlockMetaByHash(req.Hash)
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
Expand All @@ -136,7 +136,7 @@ func (blockAPI *blockAPI) BlockByHash(req *BlockByHashRequest, stream BlockAPI_B
return nil
}

func (blockAPI *blockAPI) BlockByHeight(req *BlockByHeightRequest, stream BlockAPI_BlockByHeightServer) error {
func (blockAPI *BlockAPI) BlockByHeight(req *BlockByHeightRequest, stream BlockAPI_BlockByHeightServer) error {
blockStore := core.GetEnvironment().BlockStore
blockMeta := blockStore.LoadBlockMeta(req.Height)
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
Expand All @@ -156,21 +156,21 @@ func (blockAPI *blockAPI) BlockByHeight(req *BlockByHeightRequest, stream BlockA
return nil
}

func (blockAPI *blockAPI) BlockMetaByHash(ctx context.Context, req *BlockMetaByHashRequest) (*BlockMetaByHashResponse, error) {
func (blockAPI *BlockAPI) BlockMetaByHash(ctx context.Context, req *BlockMetaByHashRequest) (*BlockMetaByHashResponse, error) {
blockMeta := core.GetEnvironment().BlockStore.LoadBlockMetaByHash(req.Hash).ToProto()
return &BlockMetaByHashResponse{
BlockMeta: blockMeta,
}, nil
}

func (blockAPI *blockAPI) BlockMetaByHeight(ctx context.Context, req *BlockMetaByHeightRequest) (*BlockMetaByHeightResponse, error) {
func (blockAPI *BlockAPI) BlockMetaByHeight(ctx context.Context, req *BlockMetaByHeightRequest) (*BlockMetaByHeightResponse, error) {
blockMeta := core.GetEnvironment().BlockStore.LoadBlockMeta(req.Height).ToProto()
return &BlockMetaByHeightResponse{
BlockMeta: blockMeta,
}, nil
}

func (blockAPI *blockAPI) Commit(_ context.Context, req *CommitRequest) (*CommitResponse, error) {
func (blockAPI *BlockAPI) Commit(_ context.Context, req *CommitRequest) (*CommitResponse, error) {
commit := core.GetEnvironment().BlockStore.LoadBlockCommit(req.Height).ToProto()

return &CommitResponse{
Expand All @@ -181,7 +181,7 @@ func (blockAPI *blockAPI) Commit(_ context.Context, req *CommitRequest) (*Commit
}, nil
}

func (blockAPI *blockAPI) ValidatorSet(_ context.Context, req *ValidatorSetRequest) (*ValidatorSetResponse, error) {
func (blockAPI *BlockAPI) ValidatorSet(_ context.Context, req *ValidatorSetRequest) (*ValidatorSetResponse, error) {
validatorSet, err := core.GetEnvironment().StateStore.LoadValidators(req.Height)
if err != nil {
return nil, err
Expand All @@ -205,19 +205,16 @@ func (blockAPI *blockAPI) ValidatorSet(_ context.Context, req *ValidatorSetReque
}, nil
}

func (blockAPI *blockAPI) SubscribeNewHeights(_ *SubscribeNewHeightsRequest, stream BlockAPI_SubscribeNewHeightsServer) error {
heightChan := blockAPI.addHeightListener()
defer blockAPI.removeHeightListener(heightChan)
func (blockAPI *BlockAPI) SubscribeNewHeights(_ *SubscribeNewHeightsRequest, stream BlockAPI_SubscribeNewHeightsServer) error {
eventsChan := blockAPI.addHeightListener()
defer blockAPI.removeHeightListener(eventsChan)

for {
select {
case <-blockAPI.ctx.Done():
return nil
case height := <-heightChan:
event := &NewHeightEvent{
Height: height,
}
if err := stream.Send(event); err != nil {
case event := <-eventsChan:
if err := stream.Send(&event); err != nil {
return err
}
case <-stream.Context().Done():
Expand Down
Loading

0 comments on commit ba5dc9f

Please sign in to comment.