-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chain: use batch rpc client in mempool poller #888
Changes from all commits
3b15bc3
0efe36f
9ef4483
a82ce7c
7f81f41
adadc86
d9466fa
026bbea
4acd62b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,14 @@ type ZMQConfig struct { | |
// | ||
// TODO(yy): replace this temp config with SEQUENCE check. | ||
PollingIntervalJitter float64 | ||
|
||
// RPCBatchSize defines the number of RPC requests to be batches before | ||
// sending them to the bitcoind node. | ||
RPCBatchSize uint32 | ||
|
||
// RPCBatchInterval defines the time to wait before attempting the next | ||
// batch when the current one finishes. | ||
RPCBatchInterval time.Duration | ||
} | ||
|
||
// bitcoindZMQEvents delivers block and transaction notifications that it gets | ||
|
@@ -91,8 +99,8 @@ var _ BitcoindEvents = (*bitcoindZMQEvents)(nil) | |
// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind. | ||
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit | ||
// the mempool. | ||
func newBitcoindZMQEvents(cfg *ZMQConfig, | ||
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) { | ||
func newBitcoindZMQEvents(cfg *ZMQConfig, client *rpcclient.Client, | ||
bClient batchClient, hasRPC bool) (*bitcoindZMQEvents, error) { | ||
|
||
// Check polling config. | ||
if cfg.MempoolPollingInterval == 0 { | ||
|
@@ -133,6 +141,22 @@ func newBitcoindZMQEvents(cfg *ZMQConfig, | |
"events: %v", err) | ||
} | ||
|
||
// Create the config for mempool and attach default values if not | ||
// configed. | ||
mCfg := &mempoolConfig{ | ||
client: bClient, | ||
getRawTxBatchSize: cfg.RPCBatchSize, | ||
batchWaitInterval: cfg.RPCBatchInterval, | ||
} | ||
|
||
if cfg.RPCBatchSize == 0 { | ||
mCfg.getRawTxBatchSize = DefaultGetRawTxBatchSize | ||
} | ||
|
||
if cfg.RPCBatchInterval == 0 { | ||
mCfg.batchWaitInterval = DefaultBatchWaitInterval | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as in the RPC case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
zmqEvents := &bitcoindZMQEvents{ | ||
cfg: cfg, | ||
client: client, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added back |
||
|
@@ -141,11 +165,12 @@ func newBitcoindZMQEvents(cfg *ZMQConfig, | |
hasPrevoutRPC: hasRPC, | ||
blockNtfns: make(chan *wire.MsgBlock), | ||
txNtfns: make(chan *wire.MsgTx), | ||
mempool: newMempool(client), | ||
mempool: newMempool(mCfg), | ||
quit: make(chan struct{}), | ||
} | ||
|
||
return zmqEvents, nil | ||
|
||
} | ||
|
||
// Start spins off the bitcoindZMQEvent goroutines. | ||
|
@@ -168,6 +193,8 @@ func (b *bitcoindZMQEvents) Start() error { | |
|
||
// Stop cleans up any of the resources and goroutines held by bitcoindZMQEvents. | ||
func (b *bitcoindZMQEvents) Stop() error { | ||
b.mempool.Shutdown() | ||
|
||
var returnErr error | ||
if err := b.txConn.Close(); err != nil { | ||
returnErr = err | ||
|
@@ -502,16 +529,9 @@ func (b *bitcoindZMQEvents) mempoolPoller() { | |
now := time.Now() | ||
|
||
// After each ticker interval, we poll the mempool to | ||
// check for transactions we haven't seen yet. | ||
txs, err := b.mempool.client.GetRawMempool() | ||
if err != nil { | ||
log.Errorf("Unable to retrieve mempool txs: "+ | ||
"%v", err) | ||
continue | ||
} | ||
|
||
// Update our local mempool with the new mempool. | ||
b.mempool.UpdateMempoolTxes(txs) | ||
// check for transactions we haven't seen yet and | ||
// update our local mempool with the new mempool. | ||
b.mempool.UpdateMempoolTxes() | ||
|
||
log.Tracef("Reconciled mempool spends in %v", | ||
time.Since(now)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
getRawTxBatchSize
&batchWaitInterval
members ofmCfg
don't get set if a non-default value is usedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops fixed