diff --git a/chain/bitcoind_rpc_events.go b/chain/bitcoind_rpc_events.go index 71db1dcc9d..358e94529b 100644 --- a/chain/bitcoind_rpc_events.go +++ b/chain/bitcoind_rpc_events.go @@ -38,6 +38,14 @@ type PollingConfig struct { // jitter by scaling TxPollingInterval with it. This value must be no // less than 0. Default to 0, meaning no jitter will be applied. TxPollingIntervalJitter float64 + + // RPCBatchSize defines the number of RPC requests to be batches before + // sending them to the bitcoind node. + RPCBatchSize uint32 + + // RPCBatchInterval defines the time to wait before attempting the next + // batch when the current one finishes. + RPCBatchInterval time.Duration } // bitcoindRPCPollingEvents delivers block and transaction notifications that @@ -86,12 +94,26 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig, cfg.TxPollingIntervalJitter = 0 } + // Create the config for mempool and attach default values if not + // configed. + mCfg := &mempoolConfig{ + client: batchClient, + } + + if cfg.RPCBatchSize == 0 { + mCfg.getRawTxBatchSize = defaultGetRawTxBatchSize + } + + if cfg.RPCBatchInterval == 0 { + mCfg.batchWaitInterval = defaultBatchWaitInterval + } + return &bitcoindRPCPollingEvents{ cfg: cfg, client: client, txNtfns: make(chan *wire.MsgTx), blockNtfns: make(chan *wire.MsgBlock), - mempool: newMempool(batchClient), + mempool: newMempool(mCfg), quit: make(chan struct{}), } } diff --git a/chain/bitcoind_zmq_events.go b/chain/bitcoind_zmq_events.go index 5163370bc5..457f639877 100644 --- a/chain/bitcoind_zmq_events.go +++ b/chain/bitcoind_zmq_events.go @@ -46,6 +46,14 @@ type ZMQConfig struct { // // TODO(yy): replace this temp config with SEQUENCE check. PollingIntervalJitter float64 + + // RPCBatchSize defines the number of RPC requests to be batches before + // sending them to the bitcoind node. + RPCBatchSize uint32 + + // RPCBatchInterval defines the time to wait before attempting the next + // batch when the current one finishes. + RPCBatchInterval time.Duration } // bitcoindZMQEvents delivers block and transaction notifications that it gets @@ -122,13 +130,27 @@ func newBitcoindZMQEvents(cfg *ZMQConfig, "events: %v", err) } + // Create the config for mempool and attach default values if not + // configed. + mCfg := &mempoolConfig{ + client: batchClient, + } + + if cfg.RPCBatchSize == 0 { + mCfg.getRawTxBatchSize = defaultGetRawTxBatchSize + } + + if cfg.RPCBatchInterval == 0 { + mCfg.batchWaitInterval = defaultBatchWaitInterval + } + return &bitcoindZMQEvents{ cfg: cfg, blockConn: zmqBlockConn, txConn: zmqTxConn, blockNtfns: make(chan *wire.MsgBlock), txNtfns: make(chan *wire.MsgTx), - mempool: newMempool(batchClient), + mempool: newMempool(mCfg), quit: make(chan struct{}), }, nil } diff --git a/chain/mempool.go b/chain/mempool.go index 71de92b172..cc0e30d0ad 100644 --- a/chain/mempool.go +++ b/chain/mempool.go @@ -17,13 +17,13 @@ const ( // `getrawtransaction` RPC when the requested txid cannot be found. txNotFoundErr = "-5: No such mempool or blockchain transaction" - // getRawTxBatchSize specifies the number of requests to be batched - // before sending them to the bitcoind client. - getRawTxBatchSize = 10_000 + // defaultGetRawTxBatchSize specifies the default number of requests to + // be batched before sending them to the bitcoind client. + defaultGetRawTxBatchSize = 10_000 - // batchWaitInterval defines the time to sleep between each batched - // calls. - batchWaitInterval = 1 * time.Second + // defaultBatchWaitInterval defines the default time to sleep between + // each batched calls. + defaultBatchWaitInterval = 1 * time.Second ) // cachedInputs caches the inputs of the transactions in the mempool. This is @@ -104,6 +104,9 @@ func (c *cachedInputs) removeInputsFromTx(txid chainhash.Hash) { type mempool struct { sync.RWMutex + // cfg specifies the config for the mempool. + cfg *mempoolConfig + // txs stores the txids in the mempool. txs map[chainhash.Hash]bool @@ -114,9 +117,6 @@ type mempool struct { // scripts. inputs *cachedInputs - // client is the rpc client that we'll use to query for the mempool. - client batchClient - // initFin is a channel that will be closed once the mempool has been // initialized. initFin chan struct{} @@ -125,14 +125,28 @@ type mempool struct { quit chan struct{} } +// mempoolConfig holds a list of config values specified by the callers. +type mempoolConfig struct { + // client is the rpc client that we'll use to query for the mempool. + client batchClient + + // getRawTxBatchSize specifies the number of getrawtransaction requests + // to be batched before sending them to the bitcoind client. + getRawTxBatchSize int + + // batchWaitInterval defines the default time to sleep between each + // batched calls. + batchWaitInterval time.Duration +} + // newMempool creates a new mempool object. -func newMempool(client batchClient) *mempool { +func newMempool(cfg *mempoolConfig) *mempool { return &mempool{ + cfg: cfg, txs: make(map[chainhash.Hash]bool), inputs: newCachedInputs(), initFin: make(chan struct{}), quit: make(chan struct{}), - client: client, } } @@ -410,9 +424,9 @@ func (m *mempool) UpdateMempoolTxes() []*wire.MsgTx { // getRawMempool returns all the raw transactions found in mempool. func (m *mempool) getRawMempool() ([]*chainhash.Hash, error) { // Create an async request and send it immediately. - result := m.client.GetRawMempoolAsync() + result := m.cfg.client.GetRawMempoolAsync() - err := m.client.Send() + err := m.cfg.client.Send() if err != nil { log.Errorf("Unable to send GetRawMempool: %v", err) return nil, err @@ -436,12 +450,12 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash, returnNew bool) ([]*wire.MsgTx, error) { log.Debugf("Batching GetRawTransaction in %v batches...", - len(txids)/getRawTxBatchSize) + len(txids)/m.cfg.getRawTxBatchSize) defer log.Debugf("Finished batch GetRawTransaction") // respReceivers stores a list of response receivers returned from // batch calling `GetRawTransactionAsync`. - respReceivers := make([]getRawTxReceiver, 0, getRawTxBatchSize) + respReceivers := make([]getRawTxReceiver, 0, m.cfg.getRawTxBatchSize) // Conditionally init a newTxes slice. var newTxes []*wire.MsgTx @@ -455,7 +469,7 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash, // state and conditionally saved to a slice that will be returned. processBatch := func(results []getRawTxReceiver) error { // Ask the client to send all the batched requests. - err := m.client.Send() + err := m.cfg.client.Send() if err != nil { return fmt.Errorf("Send GetRawTransaction got %v", err) } @@ -489,14 +503,14 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash, } // Create the async request and save it to txRespReceivers. - resp := m.client.GetRawTransactionAsync(txHash) + resp := m.cfg.client.GetRawTransactionAsync(txHash) respReceivers = append(respReceivers, resp) // When getRawTxBatchSize is reached, we'd ask the batch client // to send the requests and process the responses. - if i != 0 && i%getRawTxBatchSize == 0 { + if i != 0 && i%m.cfg.getRawTxBatchSize == 0 { log.Debugf("Processing GetRawTransaction for batch "+ - "%v...", i/getRawTxBatchSize) + "%v...", i/m.cfg.getRawTxBatchSize) if err := processBatch(respReceivers); err != nil { return nil, err @@ -505,14 +519,14 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash, // We now pause the duration defined in // `batchWaitInterval` or exit on quit signal. select { - case <-time.After(batchWaitInterval): + case <-time.After(m.cfg.batchWaitInterval): case <-m.quit: return nil, nil } // Empty the slice for next batch iteration. respReceivers = make( - []getRawTxReceiver, 0, getRawTxBatchSize, + []getRawTxReceiver, 0, m.cfg.getRawTxBatchSize, ) } } diff --git a/chain/mempool_test.go b/chain/mempool_test.go index f9121c9d45..351a742ef8 100644 --- a/chain/mempool_test.go +++ b/chain/mempool_test.go @@ -413,7 +413,11 @@ func TestUpdateMempoolTxes(t *testing.T) { // Create a mock client and init our mempool. mockRPC := &mockRPCClient{} - m := newMempool(mockRPC) + m := newMempool(&mempoolConfig{ + client: mockRPC, + batchWaitInterval: 0, + getRawTxBatchSize: 1, + }) // Create a normal transaction that has two inputs. op1 := wire.OutPoint{Hash: chainhash.Hash{1}} @@ -529,7 +533,11 @@ func TestUpdateMempoolTxesOnShutdown(t *testing.T) { // Create a mock client and init our mempool. mockRPC := &mockRPCClient{} - m := newMempool(mockRPC) + m := newMempool(&mempoolConfig{ + client: mockRPC, + batchWaitInterval: 0, + getRawTxBatchSize: 1, + }) // Create a normal transaction. op1 := wire.OutPoint{Hash: chainhash.Hash{1}} @@ -569,7 +577,11 @@ func TestGetRawTxIgnoreErr(t *testing.T) { // Create a mock client and init our mempool. mockRPC := &mockRPCClient{} - m := newMempool(mockRPC) + m := newMempool(&mempoolConfig{ + client: mockRPC, + batchWaitInterval: 0, + getRawTxBatchSize: 1, + }) // Create a normal transaction that has two inputs. op := wire.OutPoint{Hash: chainhash.Hash{1}}