Skip to content

Commit

Permalink
chain: make getRawTxBatchSize and batchWaitInterval configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Sep 20, 2023
1 parent cc438ec commit 0c87dca
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 26 deletions.
24 changes: 23 additions & 1 deletion chain/bitcoind_rpc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type PollingConfig struct {
// jitter by scaling TxPollingInterval with it. This value must be no
// less than 0. Default to 0, meaning no jitter will be applied.
TxPollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindRPCPollingEvents delivers block and transaction notifications that
Expand Down Expand Up @@ -86,12 +94,26 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig,
cfg.TxPollingIntervalJitter = 0
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: batchClient,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = defaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = defaultBatchWaitInterval
}

return &bitcoindRPCPollingEvents{
cfg: cfg,
client: client,
txNtfns: make(chan *wire.MsgTx),
blockNtfns: make(chan *wire.MsgBlock),
mempool: newMempool(batchClient),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}
}
Expand Down
24 changes: 23 additions & 1 deletion chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ type ZMQConfig struct {
//
// TODO(yy): replace this temp config with SEQUENCE check.
PollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindZMQEvents delivers block and transaction notifications that it gets
Expand Down Expand Up @@ -122,13 +130,27 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: batchClient,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = defaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = defaultBatchWaitInterval
}

return &bitcoindZMQEvents{
cfg: cfg,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(batchClient),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}, nil
}
Expand Down
56 changes: 35 additions & 21 deletions chain/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ const (
// `getrawtransaction` RPC when the requested txid cannot be found.
txNotFoundErr = "-5: No such mempool or blockchain transaction"

// getRawTxBatchSize specifies the number of requests to be batched
// before sending them to the bitcoind client.
getRawTxBatchSize = 10_000
// defaultGetRawTxBatchSize specifies the default number of requests to
// be batched before sending them to the bitcoind client.
defaultGetRawTxBatchSize = 10_000

// batchWaitInterval defines the time to sleep between each batched
// calls.
batchWaitInterval = 1 * time.Second
// defaultBatchWaitInterval defines the default time to sleep between
// each batched calls.
defaultBatchWaitInterval = 1 * time.Second
)

// cachedInputs caches the inputs of the transactions in the mempool. This is
Expand Down Expand Up @@ -104,6 +104,9 @@ func (c *cachedInputs) removeInputsFromTx(txid chainhash.Hash) {
type mempool struct {
sync.RWMutex

// cfg specifies the config for the mempool.
cfg *mempoolConfig

// txs stores the txids in the mempool.
txs map[chainhash.Hash]bool

Expand All @@ -114,9 +117,6 @@ type mempool struct {
// scripts.
inputs *cachedInputs

// client is the rpc client that we'll use to query for the mempool.
client batchClient

// initFin is a channel that will be closed once the mempool has been
// initialized.
initFin chan struct{}
Expand All @@ -125,14 +125,28 @@ type mempool struct {
quit chan struct{}
}

// mempoolConfig holds a list of config values specified by the callers.
type mempoolConfig struct {
// client is the rpc client that we'll use to query for the mempool.
client batchClient

// getRawTxBatchSize specifies the number of getrawtransaction requests
// to be batched before sending them to the bitcoind client.
getRawTxBatchSize int

// batchWaitInterval defines the default time to sleep between each
// batched calls.
batchWaitInterval time.Duration
}

// newMempool creates a new mempool object.
func newMempool(client batchClient) *mempool {
func newMempool(cfg *mempoolConfig) *mempool {
return &mempool{
cfg: cfg,
txs: make(map[chainhash.Hash]bool),
inputs: newCachedInputs(),
initFin: make(chan struct{}),
quit: make(chan struct{}),
client: client,
}
}

Expand Down Expand Up @@ -410,9 +424,9 @@ func (m *mempool) UpdateMempoolTxes() []*wire.MsgTx {
// getRawMempool returns all the raw transactions found in mempool.
func (m *mempool) getRawMempool() ([]*chainhash.Hash, error) {
// Create an async request and send it immediately.
result := m.client.GetRawMempoolAsync()
result := m.cfg.client.GetRawMempoolAsync()

err := m.client.Send()
err := m.cfg.client.Send()
if err != nil {
log.Errorf("Unable to send GetRawMempool: %v", err)
return nil, err
Expand All @@ -436,12 +450,12 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash,
returnNew bool) ([]*wire.MsgTx, error) {

log.Debugf("Batching GetRawTransaction in %v batches...",
len(txids)/getRawTxBatchSize)
len(txids)/m.cfg.getRawTxBatchSize)
defer log.Debugf("Finished batch GetRawTransaction")

// respReceivers stores a list of response receivers returned from
// batch calling `GetRawTransactionAsync`.
respReceivers := make([]getRawTxReceiver, 0, getRawTxBatchSize)
respReceivers := make([]getRawTxReceiver, 0, m.cfg.getRawTxBatchSize)

// Conditionally init a newTxes slice.
var newTxes []*wire.MsgTx
Expand All @@ -455,7 +469,7 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash,
// state and conditionally saved to a slice that will be returned.
processBatch := func(results []getRawTxReceiver) error {
// Ask the client to send all the batched requests.
err := m.client.Send()
err := m.cfg.client.Send()
if err != nil {
return fmt.Errorf("Send GetRawTransaction got %v", err)
}
Expand Down Expand Up @@ -489,14 +503,14 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash,
}

// Create the async request and save it to txRespReceivers.
resp := m.client.GetRawTransactionAsync(txHash)
resp := m.cfg.client.GetRawTransactionAsync(txHash)
respReceivers = append(respReceivers, resp)

// When getRawTxBatchSize is reached, we'd ask the batch client
// to send the requests and process the responses.
if i != 0 && i%getRawTxBatchSize == 0 {
if i != 0 && i%m.cfg.getRawTxBatchSize == 0 {
log.Debugf("Processing GetRawTransaction for batch "+
"%v...", i/getRawTxBatchSize)
"%v...", i/m.cfg.getRawTxBatchSize)

if err := processBatch(respReceivers); err != nil {
return nil, err
Expand All @@ -505,14 +519,14 @@ func (m *mempool) batchGetRawTxes(txids []*chainhash.Hash,
// We now pause the duration defined in
// `batchWaitInterval` or exit on quit signal.
select {
case <-time.After(batchWaitInterval):
case <-time.After(m.cfg.batchWaitInterval):
case <-m.quit:
return nil, nil
}

// Empty the slice for next batch iteration.
respReceivers = make(
[]getRawTxReceiver, 0, getRawTxBatchSize,
[]getRawTxReceiver, 0, m.cfg.getRawTxBatchSize,
)
}
}
Expand Down
18 changes: 15 additions & 3 deletions chain/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,11 @@ func TestUpdateMempoolTxes(t *testing.T) {

// Create a mock client and init our mempool.
mockRPC := &mockRPCClient{}
m := newMempool(mockRPC)
m := newMempool(&mempoolConfig{
client: mockRPC,
batchWaitInterval: 0,
getRawTxBatchSize: 1,
})

// Create a normal transaction that has two inputs.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
Expand Down Expand Up @@ -529,7 +533,11 @@ func TestUpdateMempoolTxesOnShutdown(t *testing.T) {

// Create a mock client and init our mempool.
mockRPC := &mockRPCClient{}
m := newMempool(mockRPC)
m := newMempool(&mempoolConfig{
client: mockRPC,
batchWaitInterval: 0,
getRawTxBatchSize: 1,
})

// Create a normal transaction.
op1 := wire.OutPoint{Hash: chainhash.Hash{1}}
Expand Down Expand Up @@ -569,7 +577,11 @@ func TestGetRawTxIgnoreErr(t *testing.T) {

// Create a mock client and init our mempool.
mockRPC := &mockRPCClient{}
m := newMempool(mockRPC)
m := newMempool(&mempoolConfig{
client: mockRPC,
batchWaitInterval: 0,
getRawTxBatchSize: 1,
})

// Create a normal transaction that has two inputs.
op := wire.OutPoint{Hash: chainhash.Hash{1}}
Expand Down

0 comments on commit 0c87dca

Please sign in to comment.