Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: PrunedBlockDispatcher bugfix. #903

Merged
merged 2 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
return nil, err
}

// cancelChan is needed in case a block request with the same hash is
// already registered and fails via the returned errChan. Because we
// don't register a new request in this case this cancelChan is used
// to signal the failure of the dependant block request.
cancelChan := make(chan error, 1)
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved

// Now that we know the block has been pruned for sure, request it from
// our backend peers.
blockChan, errChan := c.prunedBlockDispatcher.Query(
[]*chainhash.Hash{hash},
[]*chainhash.Hash{hash}, cancelChan,
)

for {
Expand All @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {

case err := <-errChan:
if err != nil {
// An error was returned for this block request.
// We have to make sure we remove the blockhash
// from the list of queried blocks.
// Moreover because in case a block is requested
// more than once no redundant block requests
// are registered but rather a reply channel is
// added to the pending block request. This
// means we need to cancel all dependent
// `GetBlock` calls via the cancel channel when
// the fetching of the block was NOT successful.
c.prunedBlockDispatcher.CancelRequest(
*hash, err,
)

return nil, err
}

// errChan fired before blockChan with a nil error, wait
// for the block now.

// The cancelChan is only used when there is already a pending
// block request for this hash and that block request fails via
// the error channel above.
case err := <-cancelChan:
return nil, err

case <-c.quit:
return nil, ErrBitcoindClientShuttingDown
}
Expand Down
109 changes: 98 additions & 11 deletions chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"net"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -143,7 +144,14 @@ type PrunedBlockDispatcher struct {
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock
blockMtx sync.Mutex

// blockQueryCancel signals the cancellation of a `GetBlock` request.
//
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blockQueryCancel map[chainhash.Hash][]chan<- error

blockMtx sync.Mutex

// currentPeers represents the set of peers we're currently connected
// to. Each peer found here will have a worker spawned within the
Expand Down Expand Up @@ -198,12 +206,13 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) (
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
blockQueryCancel: make(map[chainhash.Hash][]chan<- error),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -387,7 +396,25 @@ func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) {
// "full-node".
func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) {
var eligible []string // nolint:prealloc

// First we sort the peers by the measured ping time, to choose the best
// peers to fetch blocks from.
sort.Slice(peers, func(i, j int) bool {
return peers[i].PingTime < peers[j].PingTime
})

for _, peer := range peers {
// We cannot use the inbound peers here because the referenced
// port in the `addr` field is not the listen port for the p2p
// connection but a random outgoing port of the peer.
if peer.Inbound {
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
log.Debugf("Inbound peer %v not considering for "+
"outbound connection to fetch pruned blocks",
peer)

continue
}

rawServices, err := hex.DecodeString(peer.Services)
if err != nil {
return nil, err
Expand Down Expand Up @@ -502,9 +529,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) {

// Query submits a request to query the information of the given blocks.
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
cancelChan chan<- error,
opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) {

reqs, blockChan, err := d.newRequest(blocks)
reqs, blockChan, err := d.newRequest(blocks, cancelChan)
if err != nil {
errChan := make(chan error, 1)
errChan <- err
Expand All @@ -515,14 +543,18 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
if len(reqs) > 0 {
errChan = d.workManager.Query(reqs, opts...)
}

return blockChan, errChan
}

// newRequest construct a new query request for the given blocks to submit to
// the internal workManager. A channel is also returned through which the
// requested blocks are sent through.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
[]*query.Request, <-chan *wire.MsgBlock, error) {
//
// NOTE: The cancelChan must be buffered.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash,
cancelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock,
error) {

// Make sure the channel is buffered enough to handle all blocks.
blockChan := make(chan *wire.MsgBlock, len(blocks))
Expand Down Expand Up @@ -550,6 +582,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
} else {
log.Debugf("Received new request for pending query of "+
"block %v", *block)

d.blockQueryCancel[*block] = append(
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
d.blockQueryCancel[*block], cancelChan,
)
}

d.blocksQueried[*block] = append(
Expand Down Expand Up @@ -614,6 +650,8 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
Finished: false,
}
}
copyblockChans := make([]chan *wire.MsgBlock, len(blockChans))
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
copy(copyblockChans, blockChans)

err := blockchain.CheckBlockSanity(
btcutil.NewBlock(block), d.cfg.ChainParams.PowLimit,
Expand Down Expand Up @@ -667,7 +705,7 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
go func() {
defer d.wg.Done()

for _, blockChan := range blockChans {
for _, blockChan := range copyblockChans {
select {
case blockChan <- block:
case <-d.quit:
Expand All @@ -678,3 +716,52 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,

return progress
}

// CancelRequest removes all information regarding a failed block request.
// When for example the Peer disconnects or runs in a timeout we make sure
// that all related information is deleted and a new request for this block
// can be registered. Moreover will also cancel all depending goroutines.
func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash,
err error) {

// failDependant is a helper function which fails all dependant
// goroutines via their cancel channels.
failDependant := func(cancelChans []chan<- error) {
defer d.wg.Done()

for _, cancel := range cancelChans {
select {
case cancel <- err:
case <-d.quit:
return
}
}
}

d.blockMtx.Lock()

// Before removing the block hash we get the cancelChans which were
// registered for block requests that had already an ongoing pending
// request.
cancelChans, ok := d.blockQueryCancel[blockHash]
var copycancelChans []chan<- error
if ok {
copycancelChans = make([]chan<- error, len(cancelChans))
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
copy(copycancelChans, cancelChans)
}

// Remove all data related to this block request to make sure the same
// block can be registered again in the future.
delete(d.blocksQueried, blockHash)
delete(d.blockQueryCancel, blockHash)

d.blockMtx.Unlock()

// In case there are goroutines depending on this block request we
// make sure we cancel them.
// We do this in a goroutine to not block the initial request.
if ok {
d.wg.Add(1)
go failDependant(copycancelChans)
}
}
Loading
Loading