Skip to content

Commit

Permalink
Merge pull request #896 from yyforyongyu/optimize-mempool
Browse files Browse the repository at this point in the history
chain: optimize mempool memory usage for RPC clients
  • Loading branch information
Roasbeef authored Nov 29, 2023
2 parents 9c13542 + f2da461 commit 5df09dd
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 61 deletions.
19 changes: 10 additions & 9 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
"should be specified, not both")
}

// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients and can maintain a smaller mempool for RPC
// clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

if cfg.PollingConfig != nil {
if client == nil {
return nil, fmt.Errorf("rpc client must be given " +
Expand All @@ -52,7 +61,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
}

pollingEvents := newBitcoindRPCPollingEvents(
cfg.PollingConfig, client, bClient,
cfg.PollingConfig, client, bClient, hasRPC,
)

return pollingEvents, nil
Expand All @@ -63,14 +72,6 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
"rpcpolling is disabled")
}

// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, bClient, hasRPC)
}

Expand Down
15 changes: 12 additions & 3 deletions chain/bitcoind_rpc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ BitcoindEvents = (*bitcoindRPCPollingEvents)(nil)
// newBitcoindRPCPollingEvents instantiates a new bitcoindRPCPollingEvents
// object.
func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
bClient batchClient) *bitcoindRPCPollingEvents {
bClient batchClient, hasRPC bool) *bitcoindRPCPollingEvents {

if cfg.BlockPollingInterval == 0 {
cfg.BlockPollingInterval = defaultBlockPollInterval
Expand All @@ -100,6 +100,7 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
hasPrevoutRPC: hasRPC,
}

if cfg.RPCBatchSize == 0 {
Expand Down Expand Up @@ -165,8 +166,16 @@ func (b *bitcoindRPCPollingEvents) LookupInputSpend(
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
// If `gettxspendingprevout` is not supported, we need to loop it up in
// our local mempool.
if !b.mempool.cfg.hasPrevoutRPC {
// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we can use the `gettxspendingprevout` RPC to look up the
// input.
return getTxSpendingPrevOut(op, b.client)
}

// blockEventHandlerRPC is a goroutine that uses the rpc client to check if we
Expand Down
103 changes: 55 additions & 48 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,54 +249,8 @@ func (b *bitcoindZMQEvents) LookupInputSpend(
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
// gettxspendingprevout RPC.
return getTxSpendingPrevOut(op, b.client)
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -541,3 +495,56 @@ func (b *bitcoindZMQEvents) mempoolPoller() {
}
}
}

// getTxSpendingPrevOut makes an RPC call to `gettxspendingprevout` and returns
// the result.
func getTxSpendingPrevOut(op wire.OutPoint,
client *rpcclient.Client) (chainhash.Hash, bool) {

prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}
28 changes: 27 additions & 1 deletion chain/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,29 @@ type mempoolConfig struct {
// TODO(yy): interface rpcclient.FutureGetRawTransactionResult so we
// can remove this hack.
rawTxReceiver func(chainhash.Hash, getRawTxReceiver) *btcutil.Tx

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0, in
// which `gettxspendingprevout` can be used to fetch mempool spent for
// a given input so there's no need to create the `inputs` map used in
// `mempool` here.
hasPrevoutRPC bool
}

// newMempool creates a new mempool object.
func newMempool(cfg *mempoolConfig) *mempool {
m := &mempool{
cfg: cfg,
txs: make(map[chainhash.Hash]bool),
inputs: newCachedInputs(),
initFin: make(chan struct{}),
quit: make(chan struct{}),
}

// Init the `inputs` map if the bitcoind version doesn't support
// `gettxspendingprevout`.
if !cfg.hasPrevoutRPC {
m.inputs = newCachedInputs()
}

// Mount the default methods.
m.cfg.rawMempoolGetter = m.getRawMempool
m.cfg.rawTxReceiver = getRawTxIgnoreErr
Expand Down Expand Up @@ -236,6 +247,11 @@ func (m *mempool) containsTx(hash chainhash.Hash) bool {
//
// NOTE: must be used inside a lock.
func (m *mempool) containsInput(op wire.OutPoint) (chainhash.Hash, bool) {
// TODO(yy): port `getprevout` to bitcoind and use it here?
if m.inputs == nil {
return chainhash.Hash{}, false
}

return m.inputs.hasInput(op)
}

Expand Down Expand Up @@ -332,6 +348,11 @@ func (m *mempool) deleteUnmarked() {
//
// NOTE: must be used inside a lock.
func (m *mempool) removeInputs(tx chainhash.Hash) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

m.inputs.removeInputsFromTx(tx)
}

Expand All @@ -340,6 +361,11 @@ func (m *mempool) removeInputs(tx chainhash.Hash) {
//
// NOTE: must be used inside a lock.
func (m *mempool) updateInputs(tx *wire.MsgTx) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

// Iterate the tx's inputs.
for _, input := range tx.TxIn {
outpoint := input.PreviousOutPoint
Expand Down
114 changes: 114 additions & 0 deletions chain/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,117 @@ func TestBatchGetRawTxesOnWait(t *testing.T) {
// Assert the mock methods are called as expected.
mockRPC.AssertExpectations(t)
}

// TestNewMempool tests that `newMempool` behaves as expected.
func TestNewMempool(t *testing.T) {
// Create a new mempool with an empty config.
cfg := &mempoolConfig{}
m := newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.NotNil(t, m.inputs)

// Create a new config to check that the mempool is initialized without
// the `inputs` map when `hasPrevoutRPC` is true.
cfg = &mempoolConfig{hasPrevoutRPC: true}
m = newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.Nil(t, m.inputs)
}

// TestMempoolAddNoInputs adds a coinbase tx, a normal tx, and a replacement tx
// to the mempool and checks the mempool's internal state is updated as
// expected when the `hasPrevoutRPC` is set.
func TestMempoolAddNoInputs(t *testing.T) {
require := require.New(t)

m := newMempool(&mempoolConfig{
batchWaitInterval: 0,
getRawTxBatchSize: 1,
hasPrevoutRPC: true,
})

// Create a coinbase transaction.
tx0 := &wire.MsgTx{
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: math.MaxUint32,
},
},
},
}

// Create a normal transaction that has two inputs.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
op2 := wire.OutPoint{Hash: chainhash.Hash{2}}
tx1 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
{PreviousOutPoint: op2},
},
}

// Create a replacement transaction that spends one of the inputs as
// tx1.
op3 := wire.OutPoint{Hash: chainhash.Hash{3}}
tx2 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op2},
{PreviousOutPoint: op3},
},
}

// Now add all the transactions.
m.add(tx0)
m.add(tx1)
m.add(tx2)

// Check transactions are updated, mempool should now contain two
// transactions.
require.False(m.containsTx(tx0.TxHash()))
require.True(m.containsTx(tx1.TxHash()))
require.True(m.containsTx(tx2.TxHash()))

// Check inputs are NOT updated here because we don't track them when
// `hasPrevoutRPC` is true.
//
// Mempool should NOT contain op1.
txid, found := m.containsInput(op1)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op2.
txid, found = m.containsInput(op2)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op3.
txid, found = m.containsInput(op3)
require.False(found)
require.Empty(txid)

// Check the mempool's internal state.
//
// We should see two transactions in the mempool, tx1 and tx2.
require.Len(m.txs, 2)

// The mempool's inputs should be nil.
require.Nil(m.inputs)
}

0 comments on commit 5df09dd

Please sign in to comment.