Skip to content

Commit

Permalink
Syncer stuck fix (#255)
Browse files Browse the repository at this point in the history
* ping best peer after some time

* small fix

* small reorg

* fetch peer statuses if timeout occurrs

* scale blockTimeout in regards to round timeout
  • Loading branch information
goran-ethernal authored May 30, 2024
1 parent bd0ec06 commit f1f36f9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 9 deletions.
4 changes: 4 additions & 0 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error {
return args.Error(0)
}

func (tp *syncerMock) UpdateBlockTimeout(time.Duration) {
tp.Called()
}

func init() {
// setup custom hash header func
setupHeaderHashFunc()
Expand Down
10 changes: 4 additions & 6 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ type Polybft struct {
// runtime handles consensus runtime features like epoch, state and event management
runtime *consensusRuntime

// block time duration
blockTime time.Duration

// dataDir is the data directory to store the info
dataDir string

Expand Down Expand Up @@ -489,9 +486,6 @@ func (p *Polybft) Initialize() error {
return fmt.Errorf("cannot create topics: %w", err)
}

// set block time
p.blockTime = time.Duration(p.config.BlockTime)

// initialize polybft consensus data directory
p.dataDir = filepath.Join(p.config.Config.Path, "polybft")
// create the data dir if not exists
Expand Down Expand Up @@ -776,9 +770,13 @@ func (p *Polybft) GetValidatorsWithTx(blockNumber uint64, parents []*types.Heade
func (p *Polybft) SetBlockTime(blockTime time.Duration) {
// if block time is greater than default base round timeout,
// set base round timeout as twice the block time
syncerBlockTimeout := blockTime * 3
if blockTime >= core.DefaultBaseRoundTimeout {
p.ibft.SetBaseRoundTimeout(blockTime * baseRoundTimeoutScaleFactor)
syncerBlockTimeout *= baseRoundTimeoutScaleFactor
}

p.syncer.UpdateBlockTimeout(syncerBlockTimeout)
}

// ProcessHeaders updates the snapshot based on the verified headers
Expand Down
31 changes: 28 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer
import (
"errors"
"fmt"
"sync"
"time"

"github.com/0xPolygon/polygon-edge/helper/progress"
Expand Down Expand Up @@ -38,6 +39,8 @@ type syncer struct {

// Channel to notify Sync that a new status arrived
newStatusCh chan struct{}

lock sync.RWMutex
}

func NewSyncer(
Expand All @@ -58,6 +61,14 @@ func NewSyncer(
}
}

// UpdateBlockTimeout updates block timeout in syncer
func (s *syncer) UpdateBlockTimeout(timeout time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()

s.blockTimeout = timeout
}

// Start starts goroutine processes
func (s *syncer) Start() error {
if err := s.syncPeerClient.Start(); err != nil {
Expand Down Expand Up @@ -165,8 +176,18 @@ func (s *syncer) Sync(callback func(*types.FullBlock) bool) error {
skipList := make(map[peer.ID]bool)

for {
s.lock.RLock()
blockTimeout := s.blockTimeout
s.lock.RUnlock()

// Wait for a new event to arrive
<-s.newStatusCh
select {
case <-s.newStatusCh:
s.logger.Debug("new peer status arrived, start syncing")
case <-time.After(blockTimeout):
s.logger.Debug("timeout while waiting for new peer status, start manual syncing")
s.initializePeerMap() // fetch peer statuses just in case
}

// fetch local latest block
if header := s.blockchain.Header(); header != nil {
Expand Down Expand Up @@ -214,7 +235,11 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
localLatest := s.blockchain.Header().Number
shouldTerminate := false

blockCh, err := s.syncPeerClient.GetBlocks(peerID, localLatest+1, s.blockTimeout)
s.lock.RLock()
blockTimeout := s.blockTimeout
s.lock.RUnlock()

blockCh, err := s.syncPeerClient.GetBlocks(peerID, localLatest+1, blockTimeout)
if err != nil {
return 0, false, err
}
Expand Down Expand Up @@ -266,7 +291,7 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
shouldTerminate = newBlockCallback(fullBlock)

lastReceivedNumber = block.Number()
case <-time.After(s.blockTimeout):
case <-time.After(blockTimeout):
return lastReceivedNumber, shouldTerminate, errTimeout
}
}
Expand Down
2 changes: 2 additions & 0 deletions syncer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Syncer interface {
HasSyncPeer() bool
// Sync starts routine to sync blocks
Sync(func(*types.FullBlock) bool) error
// UpdateBlockTimeout updates block timeout in syncer
UpdateBlockTimeout(time.Duration)
}

type Progression interface {
Expand Down

0 comments on commit f1f36f9

Please sign in to comment.