Skip to content

Commit

Permalink
chain: PrunedBlockDispatcher bugfix.
Browse files Browse the repository at this point in the history
In case we fail the request via the neutriono block peer fetcher
we make sure we fail all dependant `GetBlock` calls and remove
this block request completely so that a new request for this same
block hash can be made.
  • Loading branch information
ziggie1984 committed Jan 25, 2024
1 parent 5df09dd commit 577dd4a
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 30 deletions.
28 changes: 27 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
return nil, err
}

// cancelChan is needed in case a block request with the same hash is
// already registered and fails via the returned errChan. Because we
// don't register a new request in this case this cancelChan is used
// to signal the failure of the dependant block request.
cancelChan := make(chan error, 1)

// Now that we know the block has been pruned for sure, request it from
// our backend peers.
blockChan, errChan := c.prunedBlockDispatcher.Query(
[]*chainhash.Hash{hash},
[]*chainhash.Hash{hash}, cancelChan,
)

for {
Expand All @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {

case err := <-errChan:
if err != nil {
// An error was returned for this block request.
// We have to make sure we remove the blockhash
// from the list of queried blocks.
// Moreover because in case a block is requested
// more than once no redundant block requests
// are registered but rather a reply channel is
// added to the pending block request. This
// means we need to cancel all dependent
// `GetBlock` calls via the cancel channel when
// the fetching of the block was NOT successful.
c.prunedBlockDispatcher.CancelRequest(
*hash, err,
)

return nil, err
}

// errChan fired before blockChan with a nil error, wait
// for the block now.

// The cancelChan is only used when there is already a pending
// block request for this hash and that block request fails via
// the error channel above.
case err := <-cancelChan:
return nil, err

case <-c.quit:
return nil, ErrBitcoindClientShuttingDown
}
Expand Down
96 changes: 85 additions & 11 deletions chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,14 @@ type PrunedBlockDispatcher struct {
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock
blockMtx sync.Mutex

// blockQueryCancel signals the cancelation of a `GetBlock` request.
//
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blockQueryCancel map[chainhash.Hash][]chan<- error

blockMtx sync.Mutex

// currentPeers represents the set of peers we're currently connected
// to. Each peer found here will have a worker spawned within the
Expand Down Expand Up @@ -198,12 +205,13 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) (
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
blockQueryCancel: make(map[chainhash.Hash][]chan<- error),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -502,9 +510,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) {

// Query submits a request to query the information of the given blocks.
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
cancelChan chan<- error,
opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) {

reqs, blockChan, err := d.newRequest(blocks)
reqs, blockChan, err := d.newRequest(blocks, cancelChan)
if err != nil {
errChan := make(chan error, 1)
errChan <- err
Expand All @@ -515,14 +524,18 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
if len(reqs) > 0 {
errChan = d.workManager.Query(reqs, opts...)
}

return blockChan, errChan
}

// newRequest construct a new query request for the given blocks to submit to
// the internal workManager. A channel is also returned through which the
// requested blocks are sent through.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
[]*query.Request, <-chan *wire.MsgBlock, error) {
//
// NOTE: The cancelChan must be a buffered.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash,
cancelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock,
error) {

// Make sure the channel is buffered enough to handle all blocks.
blockChan := make(chan *wire.MsgBlock, len(blocks))
Expand Down Expand Up @@ -550,6 +563,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
} else {
log.Debugf("Received new request for pending query of "+
"block %v", *block)

d.blockQueryCancel[*block] = append(
d.blockQueryCancel[*block], cancelChan,
)
}

d.blocksQueried[*block] = append(
Expand Down Expand Up @@ -614,6 +631,8 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
Finished: false,
}
}
copyblockChans := make([]chan *wire.MsgBlock, len(blockChans))
copy(copyblockChans, blockChans)

err := blockchain.CheckBlockSanity(
btcutil.NewBlock(block), d.cfg.ChainParams.PowLimit,
Expand Down Expand Up @@ -667,7 +686,7 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
go func() {
defer d.wg.Done()

for _, blockChan := range blockChans {
for _, blockChan := range copyblockChans {
select {
case blockChan <- block:
case <-d.quit:
Expand All @@ -678,3 +697,58 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,

return progress
}

// CancelRequest removes all information regarding a failed block request.
// When for example the Peer disconnects or runs in a timeout we make sure
// that all related information is deleted and a new request for this block
// can be registered. Moreover will also cancel all depending goroutines.
func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash,
err error) {

// failDependant is a helper function which fails all dependant
// goroutines via their cancel channels.
failDependant := func(cancelChans []chan<- error) {
defer d.wg.Done()

for _, cancel := range cancelChans {
select {
case cancel <- err:
case <-d.quit:
return
}
}
}

d.blockMtx.Lock()

// Before removing the block hash we get the cancelChans which were
// registered for block requests that had already an ongoing pending
// request.
cancelChans, ok := d.blockQueryCancel[blockHash]
var copycancelChans []chan<- error
if ok {
copycancelChans = make([]chan<- error, len(cancelChans))
copy(copycancelChans, cancelChans)
}

// Remove all data related to this block request to make sure the same
// block can be registered again in the future.
delete(d.blocksQueried, blockHash)
delete(d.blockQueryCancel, blockHash)

d.blockMtx.Unlock()

// In case there are goroutines depending on this block request we
// make sure we cancel them.
// We do this in a goroutine to not block the initial request.
if copycancelChans != nil {
d.wg.Add(1)
go failDependant(cancelChans)
}
}

func deepCopySlice(original []int) []int {

Check failure on line 750 in chain/pruned_block_dispatcher.go

View workflow job for this annotation

GitHub Actions / Format, compilation and lint check

`deepCopySlice` is unused (deadcode)
copySlice := make([]int, len(original))
copy(copySlice, original)
return copySlice
}
Loading

0 comments on commit 577dd4a

Please sign in to comment.