Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: optimize mempool memory usage for RPC clients #896

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
"should be specified, not both")
}

// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients and can maintain a smaller mempool for RPC
// clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

if cfg.PollingConfig != nil {
if client == nil {
return nil, fmt.Errorf("rpc client must be given " +
Expand All @@ -52,7 +61,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
}

pollingEvents := newBitcoindRPCPollingEvents(
cfg.PollingConfig, client, bClient,
cfg.PollingConfig, client, bClient, hasRPC,
)

return pollingEvents, nil
Expand All @@ -63,14 +72,6 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
"rpcpolling is disabled")
}

// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, bClient, hasRPC)
}

Expand Down
15 changes: 12 additions & 3 deletions chain/bitcoind_rpc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ BitcoindEvents = (*bitcoindRPCPollingEvents)(nil)
// newBitcoindRPCPollingEvents instantiates a new bitcoindRPCPollingEvents
// object.
func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
bClient batchClient) *bitcoindRPCPollingEvents {
bClient batchClient, hasRPC bool) *bitcoindRPCPollingEvents {

if cfg.BlockPollingInterval == 0 {
cfg.BlockPollingInterval = defaultBlockPollInterval
Expand All @@ -100,6 +100,7 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
hasPrevoutRPC: hasRPC,
}

if cfg.RPCBatchSize == 0 {
Expand Down Expand Up @@ -165,8 +166,16 @@ func (b *bitcoindRPCPollingEvents) LookupInputSpend(
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
// If `gettxspendingprevout` is not supported, we need to loop it up in
// our local mempool.
if !b.mempool.cfg.hasPrevoutRPC {
// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we can use the `gettxspendingprevout` RPC to look up the
// input.
return getTxSpendingPrevOut(op, b.client)
}

// blockEventHandlerRPC is a goroutine that uses the rpc client to check if we
Expand Down
103 changes: 55 additions & 48 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,54 +249,8 @@ func (b *bitcoindZMQEvents) LookupInputSpend(
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
// gettxspendingprevout RPC.
return getTxSpendingPrevOut(op, b.client)
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -541,3 +495,56 @@ func (b *bitcoindZMQEvents) mempoolPoller() {
}
}
}

// getTxSpendingPrevOut makes an RPC call to `gettxspendingprevout` and returns
// the result.
func getTxSpendingPrevOut(op wire.OutPoint,
client *rpcclient.Client) (chainhash.Hash, bool) {

prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}
28 changes: 27 additions & 1 deletion chain/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,29 @@ type mempoolConfig struct {
// TODO(yy): interface rpcclient.FutureGetRawTransactionResult so we
// can remove this hack.
rawTxReceiver func(chainhash.Hash, getRawTxReceiver) *btcutil.Tx

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0, in
// which `gettxspendingprevout` can be used to fetch mempool spent for
// a given input so there's no need to create the `inputs` map used in
// `mempool` here.
hasPrevoutRPC bool
}

// newMempool creates a new mempool object.
func newMempool(cfg *mempoolConfig) *mempool {
m := &mempool{
cfg: cfg,
txs: make(map[chainhash.Hash]bool),
inputs: newCachedInputs(),
initFin: make(chan struct{}),
quit: make(chan struct{}),
}

// Init the `inputs` map if the bitcoind version doesn't support
// `gettxspendingprevout`.
if !cfg.hasPrevoutRPC {
m.inputs = newCachedInputs()
}

// Mount the default methods.
m.cfg.rawMempoolGetter = m.getRawMempool
m.cfg.rawTxReceiver = getRawTxIgnoreErr
Expand Down Expand Up @@ -236,6 +247,11 @@ func (m *mempool) containsTx(hash chainhash.Hash) bool {
//
// NOTE: must be used inside a lock.
func (m *mempool) containsInput(op wire.OutPoint) (chainhash.Hash, bool) {
// TODO(yy): port `getprevout` to bitcoind and use it here?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this occur on a higher level rn? Or you mean pulling down that check into the mempool logic itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so mempool already has an rpc client in m.cfg.client, which can be used to call the rpc method. If we use it here, then both zmq and rpc clients can use this containsInput to do the lookup without checking hasPrevRPC flag.

if m.inputs == nil {
return chainhash.Hash{}, false
}

return m.inputs.hasInput(op)
}

Expand Down Expand Up @@ -332,6 +348,11 @@ func (m *mempool) deleteUnmarked() {
//
// NOTE: must be used inside a lock.
func (m *mempool) removeInputs(tx chainhash.Hash) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

m.inputs.removeInputsFromTx(tx)
}

Expand All @@ -340,6 +361,11 @@ func (m *mempool) removeInputs(tx chainhash.Hash) {
//
// NOTE: must be used inside a lock.
func (m *mempool) updateInputs(tx *wire.MsgTx) {
// We won't have the `inputs` map if `hasPrevoutRPC` is true.
if m.inputs == nil {
return
}

// Iterate the tx's inputs.
for _, input := range tx.TxIn {
outpoint := input.PreviousOutPoint
Expand Down
114 changes: 114 additions & 0 deletions chain/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,117 @@ func TestBatchGetRawTxesOnWait(t *testing.T) {
// Assert the mock methods are called as expected.
mockRPC.AssertExpectations(t)
}

// TestNewMempool tests that `newMempool` behaves as expected.
func TestNewMempool(t *testing.T) {
// Create a new mempool with an empty config.
cfg := &mempoolConfig{}
m := newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.NotNil(t, m.inputs)

// Create a new config to check that the mempool is initialized without
// the `inputs` map when `hasPrevoutRPC` is true.
cfg = &mempoolConfig{hasPrevoutRPC: true}
m = newMempool(cfg)

// Validate that the mempool is initialized as expected.
require.Equal(t, cfg, m.cfg)
require.NotNil(t, m.cfg.rawMempoolGetter)
require.NotNil(t, m.cfg.rawTxReceiver)
require.NotNil(t, m.txs)
require.NotNil(t, m.initFin)
require.NotNil(t, m.quit)
require.Nil(t, m.inputs)
}

// TestMempoolAddNoInputs adds a coinbase tx, a normal tx, and a replacement tx
// to the mempool and checks the mempool's internal state is updated as
// expected when the `hasPrevoutRPC` is set.
func TestMempoolAddNoInputs(t *testing.T) {
require := require.New(t)

m := newMempool(&mempoolConfig{
batchWaitInterval: 0,
getRawTxBatchSize: 1,
hasPrevoutRPC: true,
})

// Create a coinbase transaction.
tx0 := &wire.MsgTx{
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: math.MaxUint32,
},
},
},
}

// Create a normal transaction that has two inputs.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
op2 := wire.OutPoint{Hash: chainhash.Hash{2}}
tx1 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
{PreviousOutPoint: op2},
},
}

// Create a replacement transaction that spends one of the inputs as
// tx1.
op3 := wire.OutPoint{Hash: chainhash.Hash{3}}
tx2 := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op2},
{PreviousOutPoint: op3},
},
}

// Now add all the transactions.
m.add(tx0)
m.add(tx1)
m.add(tx2)

// Check transactions are updated, mempool should now contain two
// transactions.
require.False(m.containsTx(tx0.TxHash()))
require.True(m.containsTx(tx1.TxHash()))
require.True(m.containsTx(tx2.TxHash()))

// Check inputs are NOT updated here because we don't track them when
// `hasPrevoutRPC` is true.
//
// Mempool should NOT contain op1.
txid, found := m.containsInput(op1)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op2.
txid, found = m.containsInput(op2)
require.False(found)
require.Empty(txid)

// Mempool should NOT contain op3.
txid, found = m.containsInput(op3)
require.False(found)
require.Empty(txid)

// Check the mempool's internal state.
//
// We should see two transactions in the mempool, tx1 and tx2.
require.Len(m.txs, 2)

// The mempool's inputs should be nil.
require.Nil(m.inputs)
}
Loading