From 51ca19d270f3415a004a14e0b83b89ae78e5a3e9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 28 Nov 2023 16:49:26 +0800 Subject: [PATCH 1/2] chain: optionally create `inputs` map based on `hasPrevoutRPC` If we can use `gettxspendingprevout` to check the mempool spend of an input, there's no need to maintain the large `inputs` map anymore. --- chain/mempool.go | 28 ++++++++++- chain/mempool_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 1 deletion(-) diff --git a/chain/mempool.go b/chain/mempool.go index db6cd67f70..88a116177d 100644 --- a/chain/mempool.go +++ b/chain/mempool.go @@ -155,6 +155,12 @@ type mempoolConfig struct { // TODO(yy): interface rpcclient.FutureGetRawTransactionResult so we // can remove this hack. rawTxReceiver func(chainhash.Hash, getRawTxReceiver) *btcutil.Tx + + // hasPrevoutRPC is set when the bitcoind version is >= 24.0.0, in + // which `gettxspendingprevout` can be used to fetch mempool spent for + // a given input so there's no need to create the `inputs` map used in + // `mempool` here. + hasPrevoutRPC bool } // newMempool creates a new mempool object. @@ -162,11 +168,16 @@ func newMempool(cfg *mempoolConfig) *mempool { m := &mempool{ cfg: cfg, txs: make(map[chainhash.Hash]bool), - inputs: newCachedInputs(), initFin: make(chan struct{}), quit: make(chan struct{}), } + // Init the `inputs` map if the bitcoind version doesn't support + // `gettxspendingprevout`. + if !cfg.hasPrevoutRPC { + m.inputs = newCachedInputs() + } + // Mount the default methods. m.cfg.rawMempoolGetter = m.getRawMempool m.cfg.rawTxReceiver = getRawTxIgnoreErr @@ -236,6 +247,11 @@ func (m *mempool) containsTx(hash chainhash.Hash) bool { // // NOTE: must be used inside a lock. func (m *mempool) containsInput(op wire.OutPoint) (chainhash.Hash, bool) { + // TODO(yy): port `getprevout` to bitcoind and use it here? + if m.inputs == nil { + return chainhash.Hash{}, false + } + return m.inputs.hasInput(op) } @@ -332,6 +348,11 @@ func (m *mempool) deleteUnmarked() { // // NOTE: must be used inside a lock. func (m *mempool) removeInputs(tx chainhash.Hash) { + // We won't have the `inputs` map if `hasPrevoutRPC` is true. + if m.inputs == nil { + return + } + m.inputs.removeInputsFromTx(tx) } @@ -340,6 +361,11 @@ func (m *mempool) removeInputs(tx chainhash.Hash) { // // NOTE: must be used inside a lock. func (m *mempool) updateInputs(tx *wire.MsgTx) { + // We won't have the `inputs` map if `hasPrevoutRPC` is true. + if m.inputs == nil { + return + } + // Iterate the tx's inputs. for _, input := range tx.TxIn { outpoint := input.PreviousOutPoint diff --git a/chain/mempool_test.go b/chain/mempool_test.go index 2f2f786a65..503781758d 100644 --- a/chain/mempool_test.go +++ b/chain/mempool_test.go @@ -893,3 +893,117 @@ func TestBatchGetRawTxesOnWait(t *testing.T) { // Assert the mock methods are called as expected. mockRPC.AssertExpectations(t) } + +// TestNewMempool tests that `newMempool` behaves as expected. +func TestNewMempool(t *testing.T) { + // Create a new mempool with an empty config. + cfg := &mempoolConfig{} + m := newMempool(cfg) + + // Validate that the mempool is initialized as expected. + require.Equal(t, cfg, m.cfg) + require.NotNil(t, m.cfg.rawMempoolGetter) + require.NotNil(t, m.cfg.rawTxReceiver) + require.NotNil(t, m.txs) + require.NotNil(t, m.initFin) + require.NotNil(t, m.quit) + require.NotNil(t, m.inputs) + + // Create a new config to check that the mempool is initialized without + // the `inputs` map when `hasPrevoutRPC` is true. + cfg = &mempoolConfig{hasPrevoutRPC: true} + m = newMempool(cfg) + + // Validate that the mempool is initialized as expected. + require.Equal(t, cfg, m.cfg) + require.NotNil(t, m.cfg.rawMempoolGetter) + require.NotNil(t, m.cfg.rawTxReceiver) + require.NotNil(t, m.txs) + require.NotNil(t, m.initFin) + require.NotNil(t, m.quit) + require.Nil(t, m.inputs) +} + +// TestMempoolAddNoInputs adds a coinbase tx, a normal tx, and a replacement tx +// to the mempool and checks the mempool's internal state is updated as +// expected when the `hasPrevoutRPC` is set. +func TestMempoolAddNoInputs(t *testing.T) { + require := require.New(t) + + m := newMempool(&mempoolConfig{ + batchWaitInterval: 0, + getRawTxBatchSize: 1, + hasPrevoutRPC: true, + }) + + // Create a coinbase transaction. + tx0 := &wire.MsgTx{ + TxIn: []*wire.TxIn{ + { + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: math.MaxUint32, + }, + }, + }, + } + + // Create a normal transaction that has two inputs. + op1 := wire.OutPoint{Hash: chainhash.Hash{1}} + op2 := wire.OutPoint{Hash: chainhash.Hash{2}} + tx1 := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op1}, + {PreviousOutPoint: op2}, + }, + } + + // Create a replacement transaction that spends one of the inputs as + // tx1. + op3 := wire.OutPoint{Hash: chainhash.Hash{3}} + tx2 := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op2}, + {PreviousOutPoint: op3}, + }, + } + + // Now add all the transactions. + m.add(tx0) + m.add(tx1) + m.add(tx2) + + // Check transactions are updated, mempool should now contain two + // transactions. + require.False(m.containsTx(tx0.TxHash())) + require.True(m.containsTx(tx1.TxHash())) + require.True(m.containsTx(tx2.TxHash())) + + // Check inputs are NOT updated here because we don't track them when + // `hasPrevoutRPC` is true. + // + // Mempool should NOT contain op1. + txid, found := m.containsInput(op1) + require.False(found) + require.Empty(txid) + + // Mempool should NOT contain op2. + txid, found = m.containsInput(op2) + require.False(found) + require.Empty(txid) + + // Mempool should NOT contain op3. + txid, found = m.containsInput(op3) + require.False(found) + require.Empty(txid) + + // Check the mempool's internal state. + // + // We should see two transactions in the mempool, tx1 and tx2. + require.Len(m.txs, 2) + + // The mempool's inputs should be nil. + require.Nil(m.inputs) +} From f2da461b735517b3144bb611b20a74e068f4e670 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 28 Nov 2023 17:34:54 +0800 Subject: [PATCH 2/2] chain: use `gettxspendingprevout` in RPC clients This commit creates a new method `getTxSpendingPrevOut` that's used by both ZMQ and RPC clients to lookup mempool spend for a given input. --- chain/bitcoind_events.go | 19 ++++--- chain/bitcoind_rpc_events.go | 15 ++++- chain/bitcoind_zmq_events.go | 103 +++++++++++++++++++---------------- 3 files changed, 77 insertions(+), 60 deletions(-) diff --git a/chain/bitcoind_events.go b/chain/bitcoind_events.go index dcce473cb2..6675e9476a 100644 --- a/chain/bitcoind_events.go +++ b/chain/bitcoind_events.go @@ -44,6 +44,15 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client, "should be specified, not both") } + // Check if the bitcoind node is on a version that has the + // gettxspendingprevout RPC. If it does, then we don't need to maintain + // a mempool for ZMQ clients and can maintain a smaller mempool for RPC + // clients. + hasRPC, err := hasSpendingPrevoutRPC(client) + if err != nil { + return nil, err + } + if cfg.PollingConfig != nil { if client == nil { return nil, fmt.Errorf("rpc client must be given " + @@ -52,7 +61,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client, } pollingEvents := newBitcoindRPCPollingEvents( - cfg.PollingConfig, client, bClient, + cfg.PollingConfig, client, bClient, hasRPC, ) return pollingEvents, nil @@ -63,14 +72,6 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client, "rpcpolling is disabled") } - // Check if the bitcoind node is on a version that has the - // gettxspendingprevout RPC. If it does, then we don't need to maintain - // a mempool for ZMQ clients. - hasRPC, err := hasSpendingPrevoutRPC(client) - if err != nil { - return nil, err - } - return newBitcoindZMQEvents(cfg.ZMQConfig, client, bClient, hasRPC) } diff --git a/chain/bitcoind_rpc_events.go b/chain/bitcoind_rpc_events.go index 3f9804f5ee..10fe1beb9d 100644 --- a/chain/bitcoind_rpc_events.go +++ b/chain/bitcoind_rpc_events.go @@ -77,7 +77,7 @@ var _ BitcoindEvents = (*bitcoindRPCPollingEvents)(nil) // newBitcoindRPCPollingEvents instantiates a new bitcoindRPCPollingEvents // object. func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client, - bClient batchClient) *bitcoindRPCPollingEvents { + bClient batchClient, hasRPC bool) *bitcoindRPCPollingEvents { if cfg.BlockPollingInterval == 0 { cfg.BlockPollingInterval = defaultBlockPollInterval @@ -100,6 +100,7 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client, client: bClient, getRawTxBatchSize: cfg.RPCBatchSize, batchWaitInterval: cfg.RPCBatchInterval, + hasPrevoutRPC: hasRPC, } if cfg.RPCBatchSize == 0 { @@ -165,8 +166,16 @@ func (b *bitcoindRPCPollingEvents) LookupInputSpend( b.mempool.RLock() defer b.mempool.RUnlock() - // Check whether the input is in mempool. - return b.mempool.containsInput(op) + // If `gettxspendingprevout` is not supported, we need to loop it up in + // our local mempool. + if !b.mempool.cfg.hasPrevoutRPC { + // Check whether the input is in mempool. + return b.mempool.containsInput(op) + } + + // Otherwise, we can use the `gettxspendingprevout` RPC to look up the + // input. + return getTxSpendingPrevOut(op, b.client) } // blockEventHandlerRPC is a goroutine that uses the rpc client to check if we diff --git a/chain/bitcoind_zmq_events.go b/chain/bitcoind_zmq_events.go index f7b31ff82c..0f8b2f5b91 100644 --- a/chain/bitcoind_zmq_events.go +++ b/chain/bitcoind_zmq_events.go @@ -249,54 +249,8 @@ func (b *bitcoindZMQEvents) LookupInputSpend( } // Otherwise, we aren't maintaining a mempool and can use the - // gettxspendingprevout RPC. Create an RPC-style prevout that will be - // the lone item in an array. - prevoutReq := &getTxSpendingPrevOutReq{ - Txid: op.Hash.String(), Vout: op.Index, - } - - // The RPC takes an array of prevouts so we have an array with a single - // item since we don't yet batch calls to LookupInputSpend. - prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq} - - req, err := json.Marshal(prevoutArr) - if err != nil { - return chainhash.Hash{}, false - } - - resp, err := b.client.RawRequest( - "gettxspendingprevout", []json.RawMessage{req}, - ) - if err != nil { - return chainhash.Hash{}, false - } - - var prevoutResps []getTxSpendingPrevOutResp - err = json.Unmarshal(resp, &prevoutResps) - if err != nil { - return chainhash.Hash{}, false - } - - // We should only get a single item back since we only requested with a - // single item. - if len(prevoutResps) != 1 { - return chainhash.Hash{}, false - } - - result := prevoutResps[0] - - // If the "spendingtxid" field is empty, then the utxo has no spend in - // the mempool at the moment. - if result.SpendingTxid == "" { - return chainhash.Hash{}, false - } - - spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid) - if err != nil { - return chainhash.Hash{}, false - } - - return *spendHash, true + // gettxspendingprevout RPC. + return getTxSpendingPrevOut(op, b.client) } // blockEventHandler reads raw blocks events from the ZMQ block socket and @@ -541,3 +495,56 @@ func (b *bitcoindZMQEvents) mempoolPoller() { } } } + +// getTxSpendingPrevOut makes an RPC call to `gettxspendingprevout` and returns +// the result. +func getTxSpendingPrevOut(op wire.OutPoint, + client *rpcclient.Client) (chainhash.Hash, bool) { + + prevoutReq := &getTxSpendingPrevOutReq{ + Txid: op.Hash.String(), Vout: op.Index, + } + + // The RPC takes an array of prevouts so we have an array with a single + // item since we don't yet batch calls to LookupInputSpend. + prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq} + + req, err := json.Marshal(prevoutArr) + if err != nil { + return chainhash.Hash{}, false + } + + resp, err := client.RawRequest( + "gettxspendingprevout", []json.RawMessage{req}, + ) + if err != nil { + return chainhash.Hash{}, false + } + + var prevoutResps []getTxSpendingPrevOutResp + err = json.Unmarshal(resp, &prevoutResps) + if err != nil { + return chainhash.Hash{}, false + } + + // We should only get a single item back since we only requested with a + // single item. + if len(prevoutResps) != 1 { + return chainhash.Hash{}, false + } + + result := prevoutResps[0] + + // If the "spendingtxid" field is empty, then the utxo has no spend in + // the mempool at the moment. + if result.SpendingTxid == "" { + return chainhash.Hash{}, false + } + + spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid) + if err != nil { + return chainhash.Hash{}, false + } + + return *spendHash, true +}