diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go index aa61b3f..220c52c 100644 --- a/internal/pkg/bcclient/client.go +++ b/internal/pkg/bcclient/client.go @@ -25,7 +25,7 @@ type Client struct { func New(ctx context.Context, rpcURL string) (*Client, error) { httpClient := &http.Client{ - Timeout: 3 * time.Second, + Timeout: 10 * time.Second, } rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient)) if err != nil { @@ -71,15 +71,13 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe } func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) { - timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() query := ethereum.FilterQuery{ BlockHash: &blockHash, } // Get the logs - logs, err := c.defaultClient.FilterLogs(timeOutCtx, query) + logs, err := c.defaultClient.FilterLogs(ctx, query) if err != nil { log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err) return nil, err diff --git a/internal/pkg/listener/service.go b/internal/pkg/listener/service.go index a1d5866..2cf9a52 100644 --- a/internal/pkg/listener/service.go +++ b/internal/pkg/listener/service.go @@ -317,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New blocksNeedToConsume := onchainBlockNo - consumedBlockNo - totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize))) + totalBatches := calculateBatchBlocks(int(blocksNeedToConsume)) s.l.Infow("Total batches", "total", totalBatches) skip := consumedBlockNo + 1 @@ -358,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu return nil, fmt.Errorf("reorged block numbers don't match") } - var g errgroup.Group + reorgedBlocks := make([]*types.NewBlock, 0) + totalBatches := calculateBatchBlocks(len(newBlocks)) - reorgedBlocks := make([]*types.NewBlock, len(newBlocks)) - for i, newBlock := range newBlocks { - s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64()) - i := i - newBlock := newBlock + s.l.Infow("Total batches", "total", totalBatches) + skip := newBlocks[0].Number.Uint64() + to := newBlocks[len(newBlocks)-1].Number.Uint64() + idx := uint64(0) + for i := 0; i < totalBatches; i++ { + fromBlock := skip + toBlock := skip + MaxBatchBlocksSize - 1 - g.Go(func() error { - blockHash := newBlock.Hash() - reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash) - if errLogs != nil { - s.l.Errorw("Failed to get logs", "err", errLogs) - return errLogs - } + if toBlock > to { + toBlock = to + } - reorgedBlocks[i] = &types.NewBlock{ - Header: newBlock, - Logs: reorgedLogs, - ReorgedBlockHash: reorgedBlockHashes[i], - } + blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock) + if err != nil { + return nil, err + } - return nil - }) - } + for j, block := range blocks { + block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j] + } - err = g.Wait() - if err != nil { - return nil, err + idx += toBlock - fromBlock + 1 + reorgedBlocks = append(reorgedBlocks, blocks...) } return reorgedBlocks, nil @@ -396,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co result := fmt.Sprintf("%s:%s", address.String(), hashedABI) return result } + +func calculateBatchBlocks(blocks int) int { + return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize))) +} diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go index af1f6ca..e1a686e 100644 --- a/internal/pkg/listener/service_test.go +++ b/internal/pkg/listener/service_test.go @@ -80,21 +80,19 @@ func Test_handleReorgBlock(t *testing.T) { listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) require.NoError(t, err) - block, err := bcClient.GetHeader(ctx) + header, err := bcClient.GetHeader(ctx) require.NoError(t, err) - blocks, err := listenerSrv.handleReorgBlocks(ctx, block) + err = keeper.SetHead(ctx, header, constant.ZeroHash) require.NoError(t, err) - assert.Len(t, blocks, 0) - // pass through two blocks time.Sleep(24 * time.Second) - block, err = bcClient.GetHeader(ctx) + header, err = bcClient.GetHeader(ctx) require.NoError(t, err) - blocks, err = listenerSrv.handleReorgBlocks(ctx, block) + blocks, err := listenerSrv.handleReorgBlocks(ctx, header) require.NoError(t, err) assert.Equal(t, true, len(blocks) > 0)