Skip to content

Commit

Permalink
chore: batch blocks when syncing the reorg blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
0x6e616d committed Aug 10, 2024
1 parent 3cd1b8d commit ea29ff4
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 35 deletions.
6 changes: 2 additions & 4 deletions internal/pkg/bcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Client struct {

func New(ctx context.Context, rpcURL string) (*Client, error) {
httpClient := &http.Client{
Timeout: 3 * time.Second,
Timeout: 10 * time.Second,
}
rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient))
if err != nil {
Expand Down Expand Up @@ -71,15 +71,13 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe
}

func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) {
timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

query := ethereum.FilterQuery{
BlockHash: &blockHash,
}

// Get the logs
logs, err := c.defaultClient.FilterLogs(timeOutCtx, query)
logs, err := c.defaultClient.FilterLogs(ctx, query)
if err != nil {
log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err)
return nil, err
Expand Down
51 changes: 26 additions & 25 deletions internal/pkg/listener/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New

blocksNeedToConsume := onchainBlockNo - consumedBlockNo

totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize)))
totalBatches := calculateBatchBlocks(int(blocksNeedToConsume))

s.l.Infow("Total batches", "total", totalBatches)
skip := consumedBlockNo + 1
Expand Down Expand Up @@ -358,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu
return nil, fmt.Errorf("reorged block numbers don't match")
}

var g errgroup.Group
reorgedBlocks := make([]*types.NewBlock, 0)
totalBatches := calculateBatchBlocks(len(newBlocks))

reorgedBlocks := make([]*types.NewBlock, len(newBlocks))
for i, newBlock := range newBlocks {
s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64())
i := i
newBlock := newBlock
s.l.Infow("Total batches", "total", totalBatches)
skip := newBlocks[0].Number.Uint64()
to := newBlocks[len(newBlocks)-1].Number.Uint64()
idx := uint64(0)
for i := 0; i < totalBatches; i++ {
fromBlock := skip
toBlock := skip + MaxBatchBlocksSize - 1

g.Go(func() error {
blockHash := newBlock.Hash()
reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash)
if errLogs != nil {
s.l.Errorw("Failed to get logs", "err", errLogs)
return errLogs
}
if toBlock > to {
toBlock = to
}

reorgedBlocks[i] = &types.NewBlock{
Header: newBlock,
Logs: reorgedLogs,
ReorgedBlockHash: reorgedBlockHashes[i],
}
blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock)
if err != nil {
return nil, err
}

return nil
})
}
for j, block := range blocks {
block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j]
}

err = g.Wait()
if err != nil {
return nil, err
idx += toBlock - fromBlock + 1
reorgedBlocks = append(reorgedBlocks, blocks...)
}

return reorgedBlocks, nil
Expand All @@ -396,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co
result := fmt.Sprintf("%s:%s", address.String(), hashedABI)
return result
}

func calculateBatchBlocks(blocks int) int {
return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize)))
}
10 changes: 4 additions & 6 deletions internal/pkg/listener/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,19 @@ func Test_handleReorgBlock(t *testing.T) {

listenerSrv, err := MakeService("test-event-listener", bcClient, keeper)
require.NoError(t, err)
block, err := bcClient.GetHeader(ctx)
header, err := bcClient.GetHeader(ctx)
require.NoError(t, err)

blocks, err := listenerSrv.handleReorgBlocks(ctx, block)
err = keeper.SetHead(ctx, header, constant.ZeroHash)
require.NoError(t, err)

assert.Len(t, blocks, 0)

// pass through two blocks
time.Sleep(24 * time.Second)

block, err = bcClient.GetHeader(ctx)
header, err = bcClient.GetHeader(ctx)
require.NoError(t, err)

blocks, err = listenerSrv.handleReorgBlocks(ctx, block)
blocks, err := listenerSrv.handleReorgBlocks(ctx, header)
require.NoError(t, err)

assert.Equal(t, true, len(blocks) > 0)
Expand Down

0 comments on commit ea29ff4

Please sign in to comment.