Skip to content

Commit

Permalink
Merge pull request btcsuite#887 from Crypt-iQ/bitcoind24
Browse files Browse the repository at this point in the history
chain: detect and use gettxspendingprevout RPC instead of mempool
  • Loading branch information
Roasbeef authored and buck54321 committed Apr 20, 2024
1 parent bf58743 commit 236f11e
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 23 deletions.
37 changes: 36 additions & 1 deletion chain/bitcoind_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chain

import (
"encoding/json"
"fmt"

"github.com/ltcsuite/ltcd/chaincfg/chainhash"
Expand Down Expand Up @@ -62,5 +63,39 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
"rpcpolling is disabled")
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client)
// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
// gettxspendingprevout RPC.
func hasSpendingPrevoutRPC(client *rpcclient.Client) (bool, error) {
// Fetch the bitcoind version.
resp, err := client.RawRequest("getnetworkinfo", nil)
if err != nil {
return false, err
}

info := struct {
Version int64 `json:"version"`
}{}

if err := json.Unmarshal(resp, &info); err != nil {
return false, err
}

// Bitcoind returns a single value representing the semantic version:
// 10000 * CLIENT_VERSION_MAJOR + 100 * CLIENT_VERSION_MINOR
// + 1 * CLIENT_VERSION_BUILD
//
// The gettxspendingprevout call was added in version 24.0.0, so we
// return for versions >= 240000.
return info.Version >= 240000, nil
}
133 changes: 113 additions & 20 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chain

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -68,9 +69,17 @@ type bitcoindZMQEvents struct {

// mempool holds all the transactions that we currently see as being in
// the mempool. This is used so that we know which transactions we have
// already sent notifications for.
// already sent notifications for. This will be nil if we are using the
// gettxspendingprevout endpoint.
mempool *mempool

// client is an rpc client to the bitcoind backend.
client *rpcclient.Client

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0 and
// doesn't need to maintain its own mempool.
hasPrevoutRPC bool

wg sync.WaitGroup
quit chan struct{}
}
Expand All @@ -80,8 +89,10 @@ type bitcoindZMQEvents struct {
var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)

// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client) (*bitcoindZMQEvents, error) {
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -122,22 +133,29 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

return &bitcoindZMQEvents{
cfg: cfg,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}, nil
zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
hasPrevoutRPC: hasRPC,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}

return zmqEvents, nil
}

// Start spins off the bitcoindZMQEvent goroutines.
func (b *bitcoindZMQEvents) Start() error {
// Load the mempool so we don't miss transactions.
if err := b.mempool.LoadMempool(); err != nil {
return err
// Load the mempool so we don't miss transactions, but only if we need
// one.
if !b.hasPrevoutRPC {
if err := b.mempool.LoadMempool(); err != nil {
return err
}
}

b.wg.Add(3)
Expand Down Expand Up @@ -174,16 +192,84 @@ func (b *bitcoindZMQEvents) BlockNotifications() <-chan *wire.MsgBlock {
return b.blockNtfns
}

// getTxSpendingPrevOutReq is the rpc request format for bitcoind's
// gettxspendingprevout call.
type getTxSpendingPrevOutReq struct {
Txid string `json:"txid"`
Vout uint32 `json:"vout"`
}

// getTxSpendingPrevOutResp is the rpc response format for bitcoind's
// gettxspendingprevout call. It returns the "spendingtxid" if one exists in
// the mempool.
type getTxSpendingPrevOutResp struct {
Txid string `json:"txid"`
Vout float64 `json:"vout"`
SpendingTxid string `json:"spendingtxid"`
}

// LookupInputSpend returns the transaction that spends the given outpoint
// found in the mempool.
func (b *bitcoindZMQEvents) LookupInputSpend(
op wire.OutPoint) (chainhash.Hash, bool) {

b.mempool.RLock()
defer b.mempool.RUnlock()
if !b.hasPrevoutRPC {
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -358,8 +444,10 @@ func (b *bitcoindZMQEvents) txEventHandler() {
continue
}

// Add the tx to mempool.
b.mempool.Add(tx)
// Add the tx to mempool if we're using one.
if !b.hasPrevoutRPC {
b.mempool.Add(tx)
}

select {
case b.txNtfns <- tx:
Expand All @@ -386,6 +474,11 @@ func (b *bitcoindZMQEvents) txEventHandler() {
func (b *bitcoindZMQEvents) mempoolPoller() {
defer b.wg.Done()

if b.hasPrevoutRPC {
// Exit if we're not using a mempool.
return
}

// We'll wait to start the main reconciliation loop until we're doing
// the initial mempool load.
b.mempool.WaitForInit()
Expand Down
16 changes: 15 additions & 1 deletion chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (

if _, ok := d.blocksQueried[*block]; !ok {
log.Debugf("Queuing new block %v for request", *block)
inv := wire.NewInvVect(wire.InvTypeBlock, block)
inv := wire.NewInvVect(wire.InvTypeWitnessBlock, block)
if err := getData.AddInvVect(inv); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -632,6 +632,20 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
}
}

err = blockchain.ValidateWitnessCommitment(btcutil.NewBlock(block))
if err != nil {
d.blockMtx.Unlock()

log.Warnf("Received invalid block %v from peer %v: %v",
blockHash, peer, err)
d.banPeer(peer)

return query.Progress{
Progressed: false,
Finished: false,
}
}

// Once validated, we can safely remove it.
delete(d.blocksQueried, blockHash)

Expand Down
2 changes: 1 addition & 1 deletion chain/pruned_block_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer {

for _, inv := range msg.InvList {
// Invs should always be for blocks.
require.Equal(h.t, wire.InvTypeBlock, inv.Type)
require.Equal(h.t, wire.InvTypeWitnessBlock, inv.Type)

// Invs should always be for known blocks.
block, ok := h.blocks[inv.Hash]
Expand Down

0 comments on commit 236f11e

Please sign in to comment.