Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: optimize mempool memory usage for RPC clients #896

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion chain/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,29 @@ type mempoolConfig struct {
// TODO(yy): interface rpcclient.FutureGetRawTransactionResult so we
// can remove this hack.
rawTxReceiver func(chainhash.Hash, getRawTxReceiver) *btcutil.Tx

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0, in
// which `gettxspendingprevout` can be used to fetch mempool spent for
// a given input so there's no need to create the `inputs` map used in
// `mempool` here.
hasPrevoutRPC bool
}

// newMempool creates a new mempool object.
func newMempool(cfg *mempoolConfig) *mempool {
m := &mempool{
cfg: cfg,
txs: make(map[chainhash.Hash]bool),
inputs: newCachedInputs(),
initFin: make(chan struct{}),
quit: make(chan struct{}),
}

// Init the `inputs` map if the bitcoind version doesn't support
// `gettxspendingprevout`.
if !cfg.hasPrevoutRPC {
m.inputs = newCachedInputs()
}

// Mount the default methods.
m.cfg.rawMempoolGetter = m.getRawMempool
m.cfg.rawTxReceiver = getRawTxIgnoreErr
Expand Down Expand Up @@ -236,6 +247,11 @@ func (m *mempool) containsTx(hash chainhash.Hash) bool {
//
// NOTE: must be used inside a lock.
func (m *mempool) containsInput(op wire.OutPoint) (chainhash.Hash, bool) {
// TODO(yy): port `getprevout` to bitcoind and use it here?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this occur on a higher level rn? Or you mean pulling down that check into the mempool logic itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so mempool already has an rpc client in m.cfg.client, which can be used to call the rpc method. If we use it here, then both zmq and rpc clients can use this containsInput to do the lookup without checking hasPrevRPC flag.

if m.inputs == nil {
return chainhash.Hash{}, false
}

return m.inputs.hasInput(op)
}

Expand Down Expand Up @@ -332,6 +348,11 @@ func (m *mempool) deleteUnmarked() {
//
// NOTE: must be used inside a lock.
func (m *mempool) removeInputs(tx chainhash.Hash) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

m.inputs.removeInputsFromTx(tx)
}

Expand All @@ -340,6 +361,11 @@ func (m *mempool) removeInputs(tx chainhash.Hash) {
//
// NOTE: must be used inside a lock.
func (m *mempool) updateInputs(tx *wire.MsgTx) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

// Iterate the tx's inputs.
for _, input := range tx.TxIn {
outpoint := input.PreviousOutPoint
Expand Down
114 changes: 114 additions & 0 deletions chain/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,117 @@ func TestBatchGetRawTxesOnWait(t *testing.T) {
// Assert the mock methods are called as expected.
mockRPC.AssertExpectations(t)
}

// TestNewMempool tests that `newMempool` behaves as expected.
func TestNewMempool(t *testing.T) {
// Create a new mempool with an empty config.
cfg := &mempoolConfig{}
m := newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.NotNil(t, m.inputs)

// Create a new config to check that the mempool is initialized without
// the `inputs` map when `hasPrevoutRPC` is true.
cfg = &mempoolConfig{hasPrevoutRPC: true}
m = newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.Nil(t, m.inputs)
}

// TestMempoolAddNoInputs adds a coinbase tx, a normal tx, and a replacement tx
// to the mempool and checks the mempool's internal state is updated as
// expected when the `hasPrevoutRPC` is set.
func TestMempoolAddNoInputs(t *testing.T) {
require := require.New(t)

m := newMempool(&mempoolConfig{
batchWaitInterval: 0,
getRawTxBatchSize: 1,
hasPrevoutRPC: true,
})

// Create a coinbase transaction.
tx0 := &wire.MsgTx{
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: math.MaxUint32,
},
},
},
}

// Create a normal transaction that has two inputs.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
op2 := wire.OutPoint{Hash: chainhash.Hash{2}}
tx1 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
{PreviousOutPoint: op2},
},
}

// Create a replacement transaction that spends one of the inputs as
// tx1.
op3 := wire.OutPoint{Hash: chainhash.Hash{3}}
tx2 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op2},
{PreviousOutPoint: op3},
},
}

// Now add all the transactions.
m.add(tx0)
m.add(tx1)
m.add(tx2)

// Check transactions are updated, mempool should now contain two
// transactions.
require.False(m.containsTx(tx0.TxHash()))
require.True(m.containsTx(tx1.TxHash()))
require.True(m.containsTx(tx2.TxHash()))

// Check inputs are NOT updated here because we don't track them when
// `hasPrevoutRPC` is true.
//
// Mempool should NOT contain op1.
txid, found := m.containsInput(op1)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op2.
txid, found = m.containsInput(op2)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op3.
txid, found = m.containsInput(op3)
require.False(found)
require.Empty(txid)

// Check the mempool's internal state.
//
// We should see two transactions in the mempool, tx1 and tx2.
require.Len(m.txs, 2)

// The mempool's inputs should be nil.
require.Nil(m.inputs)
}