Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: detect and use gettxspendingprevout RPC instead of mempool #887

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion chain/bitcoind_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chain

import (
"encoding/json"
"fmt"

"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -62,5 +63,39 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
"rpcpolling is disabled")
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client)
// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
// gettxspendingprevout RPC.
func hasSpendingPrevoutRPC(client *rpcclient.Client) (bool, error) {
// Fetch the bitcoind version.
resp, err := client.RawRequest("getnetworkinfo", nil)
if err != nil {
return false, err
}

info := struct {
Version int64 `json:"version"`
}{}

if err := json.Unmarshal(resp, &info); err != nil {
return false, err
}

// Bitcoind returns a single value representing the semantic version:
// 10000 * CLIENT_VERSION_MAJOR + 100 * CLIENT_VERSION_MINOR
// + 1 * CLIENT_VERSION_BUILD
//
// The gettxspendingprevout call was added in version 24.0.0, so we
// return for versions >= 240000.
return info.Version >= 240000, nil
}
133 changes: 113 additions & 20 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chain

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -68,9 +69,17 @@ type bitcoindZMQEvents struct {

// mempool holds all the transactions that we currently see as being in
// the mempool. This is used so that we know which transactions we have
// already sent notifications for.
// already sent notifications for. This will be nil if we are using the
// gettxspendingprevout endpoint.
mempool *mempool

// client is an rpc client to the bitcoind backend.
client *rpcclient.Client

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0 and
// doesn't need to maintain its own mempool.
hasPrevoutRPC bool

wg sync.WaitGroup
quit chan struct{}
}
Expand All @@ -80,8 +89,10 @@ type bitcoindZMQEvents struct {
var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)

// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client) (*bitcoindZMQEvents, error) {
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -122,22 +133,29 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

return &bitcoindZMQEvents{
cfg: cfg,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}, nil
zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
hasPrevoutRPC: hasRPC,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}

return zmqEvents, nil
}

// Start spins off the bitcoindZMQEvent goroutines.
func (b *bitcoindZMQEvents) Start() error {
// Load the mempool so we don't miss transactions.
if err := b.mempool.LoadMempool(); err != nil {
return err
// Load the mempool so we don't miss transactions, but only if we need
// one.
if !b.hasPrevoutRPC {
if err := b.mempool.LoadMempool(); err != nil {
return err
}
}

b.wg.Add(3)
Expand Down Expand Up @@ -174,16 +192,84 @@ func (b *bitcoindZMQEvents) BlockNotifications() <-chan *wire.MsgBlock {
return b.blockNtfns
}

// getTxSpendingPrevOutReq is the rpc request format for bitcoind's
// gettxspendingprevout call.
type getTxSpendingPrevOutReq struct {
Txid string `json:"txid"`
Vout uint32 `json:"vout"`
}

// getTxSpendingPrevOutResp is the rpc response format for bitcoind's
// gettxspendingprevout call. It returns the "spendingtxid" if one exists in
// the mempool.
type getTxSpendingPrevOutResp struct {
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
Txid string `json:"txid"`
Vout float64 `json:"vout"`
SpendingTxid string `json:"spendingtxid"`
}

// LookupInputSpend returns the transaction that spends the given outpoint
// found in the mempool.
func (b *bitcoindZMQEvents) LookupInputSpend(
op wire.OutPoint) (chainhash.Hash, bool) {

b.mempool.RLock()
defer b.mempool.RUnlock()
if !b.hasPrevoutRPC {
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -358,8 +444,10 @@ func (b *bitcoindZMQEvents) txEventHandler() {
continue
}

// Add the tx to mempool.
b.mempool.Add(tx)
// Add the tx to mempool if we're using one.
if !b.hasPrevoutRPC {
b.mempool.Add(tx)
}

select {
case b.txNtfns <- tx:
Expand All @@ -386,6 +474,11 @@ func (b *bitcoindZMQEvents) txEventHandler() {
func (b *bitcoindZMQEvents) mempoolPoller() {
defer b.wg.Done()

if b.hasPrevoutRPC {
// Exit if we're not using a mempool.
return
}

// We'll wait to start the main reconciliation loop until we're doing
// the initial mempool load.
b.mempool.WaitForInit()
Expand Down
16 changes: 15 additions & 1 deletion chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (

if _, ok := d.blocksQueried[*block]; !ok {
log.Debugf("Queuing new block %v for request", *block)
inv := wire.NewInvVect(wire.InvTypeBlock, block)
inv := wire.NewInvVect(wire.InvTypeWitnessBlock, block)
if err := getData.AddInvVect(inv); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -632,6 +632,20 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
}
}

err = blockchain.ValidateWitnessCommitment(btcutil.NewBlock(block))
if err != nil {
d.blockMtx.Unlock()

log.Warnf("Received invalid block %v from peer %v: %v",
blockHash, peer, err)
d.banPeer(peer)

return query.Progress{
Progressed: false,
Finished: false,
}
}

// Once validated, we can safely remove it.
delete(d.blocksQueried, blockHash)

Expand Down
2 changes: 1 addition & 1 deletion chain/pruned_block_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer {

for _, inv := range msg.InvList {
// Invs should always be for blocks.
require.Equal(h.t, wire.InvTypeBlock, inv.Type)
require.Equal(h.t, wire.InvTypeWitnessBlock, inv.Type)

// Invs should always be for known blocks.
block, ok := h.blocks[inv.Hash]
Expand Down
Loading