diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 88edf1b05a..44d0fdda2d 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { return nil, err } + // cancelChan is needed in case a block request with the same hash is + // already registered and fails via the returned errChan. Because we + // don't register a new request in this case this cancelChan is used + // to signal the failure of the dependant block request. + cancelChan := make(chan error, 1) + // Now that we know the block has been pruned for sure, request it from // our backend peers. blockChan, errChan := c.prunedBlockDispatcher.Query( - []*chainhash.Hash{hash}, + []*chainhash.Hash{hash}, cancelChan, ) for { @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { case err := <-errChan: if err != nil { + // An error was returned for this block request. + // We have to make sure we remove the blockhash + // from the list of queried blocks. + // Moreover because in case a block is requested + // more than onced no redundant block requests + // are registered but rather a reply channel is + // added to the pending block request. This + // means we need to cancel all dependent + // `GetBlock` calls via the cancel channel when + // the fetching of the block was NOT successful. + c.prunedBlockDispatcher.CancelRequest( + *hash, err, + ) + return nil, err } // errChan fired before blockChan with a nil error, wait // for the block now. + // The cancelChan is only used when there is already a pending + // block request for this hash and that block request fails via + // the error channel above. + case err := <-cancelChan: + return nil, err + case <-c.quit: return nil, ErrBitcoindClientShuttingDown } diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index 9c2d5e6168..111f557cc7 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -143,7 +143,14 @@ type PrunedBlockDispatcher struct { // NOTE: The blockMtx lock must always be held when accessing this // field. blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock - blockMtx sync.Mutex + + // blockQueryCancel signals the cancelation of a `GetBlock` request. + // + // NOTE: The blockMtx lock must always be held when accessing this + // field. + blockQueryCancel map[chainhash.Hash][]chan<- error + + blockMtx sync.Mutex // currentPeers represents the set of peers we're currently connected // to. Each peer found here will have a worker spawned within the @@ -501,10 +508,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) { } // Query submits a request to query the information of the given blocks. -func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, +func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, chanelChan chan<- error, opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) { - reqs, blockChan, err := d.newRequest(blocks) + reqs, blockChan, err := d.newRequest(blocks, chanelChan) if err != nil { errChan := make(chan error, 1) errChan <- err @@ -515,17 +522,20 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, if len(reqs) > 0 { errChan = d.workManager.Query(reqs, opts...) } + return blockChan, errChan } // newRequest construct a new query request for the given blocks to submit to // the internal workManager. A channel is also returned through which the // requested blocks are sent through. -func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( - []*query.Request, <-chan *wire.MsgBlock, error) { +func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash, + chanelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock, + error) { // Make sure the channel is buffered enough to handle all blocks. blockChan := make(chan *wire.MsgBlock, len(blocks)) + cancelChan := make(chan error) d.blockMtx.Lock() defer d.blockMtx.Unlock() @@ -550,6 +560,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( } else { log.Debugf("Received new request for pending query of "+ "block %v", *block) + + d.blockQueryCancel[*block] = append( + d.blockQueryCancel[*block], cancelChan, + ) } d.blocksQueried[*block] = append( @@ -678,3 +692,42 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, return progress } + +// CancelRequest removes all information regarding a failed block request. +// When for example the Peer disconnects or runs in a timeout we make sure +// that all related information is deleted and a new request for this block +// can be registered. Moreover will also cancel all depending goroutines. +func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash, + err error) { + + d.blockMtx.Lock() + + // Before removing the block hash we get the cancelChans which were + // registered for block requests which already had an ongoing pending + // request registered. + cancelChans, ok := d.blockQueryCancel[blockHash] + + // Remove all data related to this block request to make sure the same + // block can be registered again in the future. + delete(d.blocksQueried, blockHash) + delete(d.blockQueryCancel, blockHash) + + d.blockMtx.Unlock() + + // In case there are dependant goroutines depending on this block + // request we make sure we cancel them. + if ok { + d.wg.Add(1) + go func() { + defer d.wg.Done() + + for _, cancel := range cancelChans { + select { + case cancel <- err: + case <-d.quit: + return + } + } + }() + } +} diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go index b7f801d8b5..d4c229b980 100644 --- a/chain/pruned_block_dispatcher_test.go +++ b/chain/pruned_block_dispatcher_test.go @@ -263,7 +263,9 @@ func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( h.t.Helper() - blockChan, errChan := h.dispatcher.Query(blocks) + cancelChan := make(chan error, 1) + + blockChan, errChan := h.dispatcher.Query(blocks, cancelChan) select { case err := <-errChan: require.NoError(h.t, err)