Skip to content

Commit

Permalink
chain: detect and use gettxspendingprevout RPC instead of mempool
Browse files Browse the repository at this point in the history
This change uses the gettxspendingprevout RPC if the bitcoind version
is greater or equal to version 24.0.0. In this case, the mempool is
no longer used for zmq clients. The original behavior must be kept
for rpc polling clients since they notify on new transactions and can
only learn about them from polling the bitcoind mempool and keeping
track of its own btcwallet mempool.
  • Loading branch information
Crypt-iQ committed Sep 7, 2023
1 parent 07be54b commit 1a77a35
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 16 deletions.
37 changes: 36 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chain

import (
"encoding/json"
"fmt"
"net"
"sync"
Expand Down Expand Up @@ -174,6 +175,14 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
}
}

// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

bc := &BitcoindConn{
cfg: *cfg,
client: client,
Expand All @@ -182,14 +191,40 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
quit: make(chan struct{}),
}

bc.events, err = NewBitcoindEventSubscriber(cfg, client)
bc.events, err = NewBitcoindEventSubscriber(cfg, client, hasRPC)
if err != nil {
return nil, err
}

return bc, nil
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
// gettxspendingprevout RPC.
func hasSpendingPrevoutRPC(client *rpcclient.Client) (bool, error) {
// Fetch the bitcoind version.
resp, err := client.RawRequest("getnetworkinfo", nil)
if err != nil {
return false, err
}

info := struct {
Version int64 `json:"version"`
}{}

if err := json.Unmarshal(resp, &info); err != nil {
return false, err
}

// Bitcoind returns a single value representing the semantic version:
// 10000 * CLIENT_VERSION_MAJOR + 100 * CLIENT_VERSION_MINOR
// + 1 * CLIENT_VERSION_BUILD
//
// The gettxspendingprevout call was added in version 24.0.0, so we
// return for versions >= 240000.
return info.Version >= 240000, nil
}

// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If
// successful, a goroutine is spawned to read events from the ZMQ connection.
// It's possible for this function to fail due to a limited number of connection
Expand Down
4 changes: 2 additions & 2 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var _ rpcClient = (*rpcclient.Client)(nil)
// NewBitcoindEventSubscriber initialises a new BitcoinEvents object impl
// depending on the config passed.
func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
client *rpcclient.Client) (BitcoindEvents, error) {
client *rpcclient.Client, hasRPC bool) (BitcoindEvents, error) {

if cfg.PollingConfig != nil && cfg.ZMQConfig != nil {
return nil, fmt.Errorf("either PollingConfig or ZMQConfig " +
Expand All @@ -62,5 +62,5 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
"rpcpolling is disabled")
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client)
return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
}
117 changes: 104 additions & 13 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chain

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -68,9 +69,13 @@ type bitcoindZMQEvents struct {

// mempool holds all the transactions that we currently see as being in
// the mempool. This is used so that we know which transactions we have
// already sent notifications for.
// already sent notifications for. This will be nil if we are using the
// gettxspendingprevout endpoint.
mempool *mempool

// client is an rpc client to the bitcoind backend.
client *rpcclient.Client

wg sync.WaitGroup
quit chan struct{}
}
Expand All @@ -80,8 +85,10 @@ type bitcoindZMQEvents struct {
var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)

// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client) (*bitcoindZMQEvents, error) {
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -122,22 +129,30 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

return &bitcoindZMQEvents{
zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}, nil
}

if !hasRPC {
zmqEvents.mempool = newMempool(client)
}

return zmqEvents, nil
}

// Start spins off the bitcoindZMQEvent goroutines.
func (b *bitcoindZMQEvents) Start() error {
// Load the mempool so we don't miss transactions.
if err := b.mempool.LoadMempool(); err != nil {
return err
if b.mempool != nil {
if err := b.mempool.LoadMempool(); err != nil {
return err
}
}

b.wg.Add(3)
Expand Down Expand Up @@ -174,16 +189,85 @@ func (b *bitcoindZMQEvents) BlockNotifications() <-chan *wire.MsgBlock {
return b.blockNtfns
}

// getTxSpendingPrevOutReq is the rpc request format for bitcoind's
// gettxspendingprevout call.
type getTxSpendingPrevOutReq struct {
Txid string `json:"txid"`
Vout uint32 `json:"vout"`
}

// getTxSpendingPrevOutResp is the rpc response format for bitcoind's
// gettxspendingprevout call. It returns the "spendingtxid" if one exists in
// the mempool.
type getTxSpendingPrevOutResp struct {
Txid string `json:"txid"`
Vout float64 `json:"vout"`
SpendingTxid string `json:"spendingtxid"`
}

// LookupInputSpend returns the transaction that spends the given outpoint
// found in the mempool.
func (b *bitcoindZMQEvents) LookupInputSpend(
op wire.OutPoint) (chainhash.Hash, bool) {

b.mempool.RLock()
defer b.mempool.RUnlock()
if b.mempool != nil {
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
var prevoutArr [1]*getTxSpendingPrevOutReq
prevoutArr[0] = prevoutReq

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -358,8 +442,10 @@ func (b *bitcoindZMQEvents) txEventHandler() {
continue
}

// Add the tx to mempool.
b.mempool.Add(tx)
// Add the tx to mempool if it exists.
if b.mempool != nil {
b.mempool.Add(tx)
}

select {
case b.txNtfns <- tx:
Expand All @@ -386,6 +472,11 @@ func (b *bitcoindZMQEvents) txEventHandler() {
func (b *bitcoindZMQEvents) mempoolPoller() {
defer b.wg.Done()

if b.mempool == nil {
// Exit if we're not using a mempool.
return
}

// We'll wait to start the main reconciliation loop until we're doing
// the initial mempool load.
b.mempool.WaitForInit()
Expand Down

0 comments on commit 1a77a35

Please sign in to comment.