Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: use batch rpc client in mempool poller #888

Merged
merged 9 commits into from
Oct 12, 2023
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ clean:
@$(call print, "Cleaning source.$(NC)")
$(RM) coverage.txt

tidy-module:
echo "Running 'go mod tidy' for all modules"
scripts/tidy_modules.sh

tidy-module-check: tidy-module
if test -n "$$(git status --porcelain)"; then echo "modules not updated, please run `make tidy-module` again!"; git status; exit 1; fi

.PHONY: all \
default \
build \
Expand Down
7 changes: 6 additions & 1 deletion chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
return nil, err
}

batchClient, err := rpcclient.NewBatch(clientCfg)
if err != nil {
return nil, err
}

// Verify that the node is running on the expected network.
net, err := getCurrentNet(client)
if err != nil {
Expand Down Expand Up @@ -182,7 +187,7 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
quit: make(chan struct{}),
}

bc.events, err = NewBitcoindEventSubscriber(cfg, client)
bc.events, err = NewBitcoindEventSubscriber(cfg, client, batchClient)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type BitcoindEvents interface {
}

// Ensure rpcclient.Client implements the rpcClient interface at compile time.
var _ rpcClient = (*rpcclient.Client)(nil)
var _ batchClient = (*rpcclient.Client)(nil)

// NewBitcoindEventSubscriber initialises a new BitcoinEvents object impl
// depending on the config passed.
func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
client *rpcclient.Client) (BitcoindEvents, error) {
func NewBitcoindEventSubscriber(cfg *BitcoindConfig, client *rpcclient.Client,
bClient batchClient) (BitcoindEvents, error) {

if cfg.PollingConfig != nil && cfg.ZMQConfig != nil {
return nil, fmt.Errorf("either PollingConfig or ZMQConfig " +
Expand All @@ -52,7 +52,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
}

pollingEvents := newBitcoindRPCPollingEvents(
cfg.PollingConfig, client,
cfg.PollingConfig, client, bClient,
)

return pollingEvents, nil
Expand All @@ -71,7 +71,7 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
return newBitcoindZMQEvents(cfg.ZMQConfig, client, bClient, hasRPC)
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
Expand Down
45 changes: 32 additions & 13 deletions chain/bitcoind_rpc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type PollingConfig struct {
// jitter by scaling TxPollingInterval with it. This value must be no
// less than 0. Default to 0, meaning no jitter will be applied.
TxPollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindRPCPollingEvents delivers block and transaction notifications that
Expand Down Expand Up @@ -68,8 +76,8 @@ var _ BitcoindEvents = (*bitcoindRPCPollingEvents)(nil)

// newBitcoindRPCPollingEvents instantiates a new bitcoindRPCPollingEvents
// object.
func newBitcoindRPCPollingEvents(cfg *PollingConfig,
client *rpcclient.Client) *bitcoindRPCPollingEvents {
func newBitcoindRPCPollingEvents(cfg *PollingConfig, client *rpcclient.Client,
bClient batchClient) *bitcoindRPCPollingEvents {

if cfg.BlockPollingInterval == 0 {
cfg.BlockPollingInterval = defaultBlockPollInterval
Expand All @@ -86,12 +94,28 @@ func newBitcoindRPCPollingEvents(cfg *PollingConfig,
cfg.TxPollingIntervalJitter = 0
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = DefaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = DefaultBatchWaitInterval
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getRawTxBatchSize & batchWaitInterval members of mCfg don't get set if a non-default value is used

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops fixed

return &bitcoindRPCPollingEvents{
cfg: cfg,
client: client,
txNtfns: make(chan *wire.MsgTx),
blockNtfns: make(chan *wire.MsgBlock),
mempool: newMempool(client),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}
}
Expand All @@ -116,6 +140,8 @@ func (b *bitcoindRPCPollingEvents) Start() error {

// Stop cleans up all the bitcoindRPCPollingEvents resources and goroutines.
func (b *bitcoindRPCPollingEvents) Stop() error {
b.mempool.Shutdown()

close(b.quit)
b.wg.Wait()
return nil
Expand Down Expand Up @@ -247,16 +273,9 @@ func (b *bitcoindRPCPollingEvents) txEventHandlerRPC() {
now := time.Now()

// After each ticker interval, we poll the mempool to
// check for transactions we haven't seen yet.
txs, err := b.client.GetRawMempool()
if err != nil {
log.Errorf("Unable to retrieve mempool txs: "+
"%v", err)
continue
}

// Update our local mempool with the new mempool.
newTxs := b.mempool.UpdateMempoolTxes(txs)
// check for transactions we haven't seen yet and
// update our local mempool with the new mempool.
newTxs := b.mempool.UpdateMempoolTxes()

log.Tracef("Reconciled mempool spends in %v",
time.Since(now))
Expand Down
46 changes: 33 additions & 13 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type ZMQConfig struct {
//
// TODO(yy): replace this temp config with SEQUENCE check.
PollingIntervalJitter float64

// RPCBatchSize defines the number of RPC requests to be batches before
// sending them to the bitcoind node.
RPCBatchSize uint32

// RPCBatchInterval defines the time to wait before attempting the next
// batch when the current one finishes.
RPCBatchInterval time.Duration
}

// bitcoindZMQEvents delivers block and transaction notifications that it gets
Expand Down Expand Up @@ -91,8 +99,8 @@ var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)
// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {
func newBitcoindZMQEvents(cfg *ZMQConfig, client *rpcclient.Client,
bClient batchClient, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -133,6 +141,22 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

// Create the config for mempool and attach default values if not
// configed.
mCfg := &mempoolConfig{
client: bClient,
getRawTxBatchSize: cfg.RPCBatchSize,
batchWaitInterval: cfg.RPCBatchInterval,
}

if cfg.RPCBatchSize == 0 {
mCfg.getRawTxBatchSize = DefaultGetRawTxBatchSize
}

if cfg.RPCBatchInterval == 0 {
mCfg.batchWaitInterval = DefaultBatchWaitInterval
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the RPC case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client needs to be set here so that gettxspendingprevout can be called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

Expand All @@ -141,11 +165,12 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
hasPrevoutRPC: hasRPC,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
mempool: newMempool(mCfg),
quit: make(chan struct{}),
}

return zmqEvents, nil

}

// Start spins off the bitcoindZMQEvent goroutines.
Expand All @@ -168,6 +193,8 @@ func (b *bitcoindZMQEvents) Start() error {

// Stop cleans up any of the resources and goroutines held by bitcoindZMQEvents.
func (b *bitcoindZMQEvents) Stop() error {
b.mempool.Shutdown()

var returnErr error
if err := b.txConn.Close(); err != nil {
returnErr = err
Expand Down Expand Up @@ -502,16 +529,9 @@ func (b *bitcoindZMQEvents) mempoolPoller() {
now := time.Now()

// After each ticker interval, we poll the mempool to
// check for transactions we haven't seen yet.
txs, err := b.mempool.client.GetRawMempool()
if err != nil {
log.Errorf("Unable to retrieve mempool txs: "+
"%v", err)
continue
}

// Update our local mempool with the new mempool.
b.mempool.UpdateMempoolTxes(txs)
// check for transactions we haven't seen yet and
// update our local mempool with the new mempool.
b.mempool.UpdateMempoolTxes()

log.Tracef("Reconciled mempool spends in %v",
time.Since(now))
Expand Down
36 changes: 32 additions & 4 deletions chain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
Expand Down Expand Up @@ -124,8 +125,35 @@ type (
}
)

// rpcClient defines an interface that is used to interact with the RPC client.
type rpcClient interface {
GetRawMempool() ([]*chainhash.Hash, error)
GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error)
// batchClient defines an interface that is used to interact with the RPC
// client.
//
// NOTE: the client returned from `rpcclient.NewBatch` will implement this
// interface. Unlike the client from `rpcclient.New`, calling `GetRawMempool`
// on this client will block and won't return.
//
// TODO(yy): create a new type BatchClient in `rpcclient`.
type batchClient interface {
// GetRawMempoolAsync returns an instance of a type that can be used to
// get the result of the RPC at some future time by invoking the
// Receive function on the returned instance.
GetRawMempoolAsync() rpcclient.FutureGetRawMempoolResult

// GetRawTransactionAsync returns an instance of a type that can be
// used to get the result of the RPC at some future time by invoking
// the Receive function on the returned instance.
GetRawTransactionAsync(
txHash *chainhash.Hash) rpcclient.FutureGetRawTransactionResult

// Send marshalls bulk requests and sends to the server creates a
// response channel to receive the response
Send() error
}

// getRawTxReceiver defines an interface that's used to receive response from
// `GetRawTransactionAsync`.
type getRawTxReceiver interface {
// Receive waits for the Response promised by the future and returns a
// transaction given its hash.
Receive() (*btcutil.Tx, error)
}
Loading
Loading