Skip to content

Commit

Permalink
Merge pull request btcsuite#903 from ziggie1984/prune-node-deadlock-b…
Browse files Browse the repository at this point in the history
…locksync

Plus lightninglabs/neutrino#273
Author: Elle Mouton <[email protected]>
Date:   Mon May 22 15:53:49 2023 +0200

chain: PrunedBlockDispatcher bugfix.
  • Loading branch information
Roasbeef authored and buck54321 committed Apr 20, 2024
1 parent 0cd9e68 commit aa8640b
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 39 deletions.
28 changes: 27 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
return nil, err
}

// cancelChan is needed in case a block request with the same hash is
// already registered and fails via the returned errChan. Because we
// don't register a new request in this case this cancelChan is used
// to signal the failure of the dependant block request.
cancelChan := make(chan error, 1)

// Now that we know the block has been pruned for sure, request it from
// our backend peers.
blockChan, errChan := c.prunedBlockDispatcher.Query(
[]*chainhash.Hash{hash},
[]*chainhash.Hash{hash}, cancelChan,
)

for {
Expand All @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {

case err := <-errChan:
if err != nil {
// An error was returned for this block request.
// We have to make sure we remove the blockhash
// from the list of queried blocks.
// Moreover because in case a block is requested
// more than once no redundant block requests
// are registered but rather a reply channel is
// added to the pending block request. This
// means we need to cancel all dependent
// `GetBlock` calls via the cancel channel when
// the fetching of the block was NOT successful.
c.prunedBlockDispatcher.CancelRequest(
*hash, err,
)

return nil, err
}

// errChan fired before blockChan with a nil error, wait
// for the block now.

// The cancelChan is only used when there is already a pending
// block request for this hash and that block request fails via
// the error channel above.
case err := <-cancelChan:
return nil, err

case <-c.quit:
return nil, ErrBitcoindClientShuttingDown
}
Expand Down
111 changes: 99 additions & 12 deletions chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"net"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -143,7 +144,14 @@ type PrunedBlockDispatcher struct {
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock
blockMtx sync.Mutex

// blockQueryCancel signals the cancellation of a `GetBlock` request.
//
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blockQueryCancel map[chainhash.Hash][]chan<- error

blockMtx sync.Mutex

// currentPeers represents the set of peers we're currently connected
// to. Each peer found here will have a worker spawned within the
Expand Down Expand Up @@ -198,12 +206,13 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) (
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
blockQueryCancel: make(map[chainhash.Hash][]chan<- error),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -386,8 +395,26 @@ func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) {
// requests, i.e., any peer which is not considered a segwit-enabled
// "full-node".
func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) {
var eligible []string
var eligible []string // nolint:prealloc

// First we sort the peers by the measured ping time, to choose the best
// peers to fetch blocks from.
sort.Slice(peers, func(i, j int) bool {
return peers[i].PingTime < peers[j].PingTime
})

for _, peer := range peers {
// We cannot use the inbound peers here because the referenced
// port in the `addr` field is not the listen port for the p2p
// connection but a random outgoing port of the peer.
if peer.Inbound {
log.Debugf("Inbound peer %v not considering for "+
"outbound connection to fetch pruned blocks",
peer)

continue
}

rawServices, err := hex.DecodeString(peer.Services)
if err != nil {
return nil, err
Expand Down Expand Up @@ -502,9 +529,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) {

// Query submits a request to query the information of the given blocks.
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
cancelChan chan<- error,
opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) {

reqs, blockChan, err := d.newRequest(blocks)
reqs, blockChan, err := d.newRequest(blocks, cancelChan)
if err != nil {
errChan := make(chan error, 1)
errChan <- err
Expand All @@ -515,14 +543,18 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
if len(reqs) > 0 {
errChan = d.workManager.Query(reqs, opts...)
}

return blockChan, errChan
}

// newRequest construct a new query request for the given blocks to submit to
// the internal workManager. A channel is also returned through which the
// requested blocks are sent through.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
[]*query.Request, <-chan *wire.MsgBlock, error) {
//
// NOTE: The cancelChan must be buffered.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash,
cancelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock,
error) {

// Make sure the channel is buffered enough to handle all blocks.
blockChan := make(chan *wire.MsgBlock, len(blocks))
Expand Down Expand Up @@ -550,6 +582,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
} else {
log.Debugf("Received new request for pending query of "+
"block %v", *block)

d.blockQueryCancel[*block] = append(
d.blockQueryCancel[*block], cancelChan,
)
}

d.blocksQueried[*block] = append(
Expand Down Expand Up @@ -614,6 +650,8 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
Finished: false,
}
}
copyblockChans := make([]chan *wire.MsgBlock, len(blockChans))
copy(copyblockChans, blockChans)

err := blockchain.CheckBlockSanity(
ltcutil.NewBlock(block), d.cfg.ChainParams.PowLimit,
Expand Down Expand Up @@ -667,7 +705,7 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
go func() {
defer d.wg.Done()

for _, blockChan := range blockChans {
for _, blockChan := range copyblockChans {
select {
case blockChan <- block:
case <-d.quit:
Expand All @@ -678,3 +716,52 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,

return progress
}

// CancelRequest removes all information regarding a failed block request.
// When for example the Peer disconnects or runs in a timeout we make sure
// that all related information is deleted and a new request for this block
// can be registered. Moreover will also cancel all depending goroutines.
func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash,
err error) {

// failDependant is a helper function which fails all dependant
// goroutines via their cancel channels.
failDependant := func(cancelChans []chan<- error) {
defer d.wg.Done()

for _, cancel := range cancelChans {
select {
case cancel <- err:
case <-d.quit:
return
}
}
}

d.blockMtx.Lock()

// Before removing the block hash we get the cancelChans which were
// registered for block requests that had already an ongoing pending
// request.
cancelChans, ok := d.blockQueryCancel[blockHash]
var copycancelChans []chan<- error
if ok {
copycancelChans = make([]chan<- error, len(cancelChans))
copy(copycancelChans, cancelChans)
}

// Remove all data related to this block request to make sure the same
// block can be registered again in the future.
delete(d.blocksQueried, blockHash)
delete(d.blockQueryCancel, blockHash)

d.blockMtx.Unlock()

// In case there are goroutines depending on this block request we
// make sure we cancel them.
// We do this in a goroutine to not block the initial request.
if ok {
d.wg.Add(1)
go failDependant(copycancelChans)
}
}
Loading

0 comments on commit aa8640b

Please sign in to comment.