From a2fbd45d1a78136c328d40e3c2436284a636f240 Mon Sep 17 00:00:00 2001 From: Lazar <12626340+Lazar955@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:44:51 +0200 Subject: [PATCH] chore(btcslasher): uses semaphore to limit btc node requests (#40) Limits the number of concurrent requests managed by a semaphore [References issue](https://github.com/babylonchain/vigilante-private/issues/48) --- .../btcslasher/bootstrapping_test.go | 1 + btcstaking-tracker/btcslasher/slasher.go | 57 +++++++++++-------- btcstaking-tracker/btcslasher/slasher_test.go | 1 + btcstaking-tracker/tracker.go | 1 + config/btcstaking_tracker.go | 18 ++++-- 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/btcstaking-tracker/btcslasher/bootstrapping_test.go b/btcstaking-tracker/btcslasher/bootstrapping_test.go index c8a238b6..dff7061b 100644 --- a/btcstaking-tracker/btcslasher/bootstrapping_test.go +++ b/btcstaking-tracker/btcslasher/bootstrapping_test.go @@ -69,6 +69,7 @@ func FuzzSlasher_Bootstrapping(f *testing.F) { commonCfg.RetrySleepTime, commonCfg.MaxRetrySleepTime, commonCfg.MaxRetryTimes, + config.MaxSlashingConcurrency, slashedFPSKChan, metrics.NewBTCStakingTrackerMetrics().SlasherMetrics, ) diff --git a/btcstaking-tracker/btcslasher/slasher.go b/btcstaking-tracker/btcslasher/slasher.go index 8c284bf9..095e15bc 100644 --- a/btcstaking-tracker/btcslasher/slasher.go +++ b/btcstaking-tracker/btcslasher/slasher.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/babylonlabs-io/vigilante/types" "github.com/decred/dcrd/dcrec/secp256k1/v4" + "golang.org/x/sync/semaphore" "sync" "time" @@ -46,6 +47,8 @@ type BTCSlasher struct { // channel for receiving the slash result of each BTC delegation slashResultChan chan *SlashResult + maxSlashingConcurrency int64 + metrics *metrics.SlasherMetrics startOnce sync.Once @@ -62,23 +65,25 @@ func New( retrySleepTime time.Duration, maxRetrySleepTime time.Duration, maxRetryTimes uint, + maxSlashingConcurrency uint8, slashedFPSKChan chan *btcec.PrivateKey, metrics *metrics.SlasherMetrics, ) (*BTCSlasher, error) { logger := parentLogger.With(zap.String("module", "slasher")).Sugar() return &BTCSlasher{ - logger: logger, - BTCClient: btcClient, - BBNQuerier: bbnQuerier, - netParams: netParams, - retrySleepTime: retrySleepTime, - maxRetrySleepTime: maxRetrySleepTime, - maxRetryTimes: maxRetryTimes, - slashedFPSKChan: slashedFPSKChan, - slashResultChan: make(chan *SlashResult, 1000), - quit: make(chan struct{}), - metrics: metrics, + logger: logger, + BTCClient: btcClient, + BBNQuerier: bbnQuerier, + netParams: netParams, + retrySleepTime: retrySleepTime, + maxRetrySleepTime: maxRetrySleepTime, + maxRetryTimes: maxRetryTimes, + maxSlashingConcurrency: int64(maxSlashingConcurrency), + slashedFPSKChan: slashedFPSKChan, + slashResultChan: make(chan *SlashResult, 1000), + quit: make(chan struct{}), + metrics: metrics, }, nil } @@ -251,24 +256,26 @@ func (bs *BTCSlasher) SlashFinalityProvider(extractedFpBTCSK *btcec.PrivateKey) // Initialize a mutex protected *btcec.PrivateKey safeExtractedFpBTCSK := types.NewPrivateKeyWithMutex(extractedFpBTCSK) + // Initialize a semaphore to control the number of concurrent operations + sem := semaphore.NewWeighted(bs.maxSlashingConcurrency) + delegations := append(activeBTCDels, unbondedBTCDels...) + // try to slash both staking and unbonding txs for each BTC delegation - // sign and submit slashing tx for each active delegation - // TODO: use semaphore to prevent spamming BTC node - for _, del := range activeBTCDels { - bs.wg.Add(1) - go func(d *bstypes.BTCDelegationResponse) { - defer bs.wg.Done() - safeExtractedFpBTCSK.UseKey(func(key *secp256k1.PrivateKey) { - bs.slashBTCDelegation(fpBTCPK, key, d) - }) - }(del) - } - // sign and submit slashing tx for each unbonded delegation - // TODO: use semaphore to prevent spamming BTC node - for _, del := range unbondedBTCDels { + // sign and submit slashing tx for each active and unbonded delegation + for _, del := range delegations { bs.wg.Add(1) go func(d *bstypes.BTCDelegationResponse) { defer bs.wg.Done() + ctx, cancel := bs.quitContext() + defer cancel() + + // Acquire the semaphore before interacting with the BTC node + if err := sem.Acquire(ctx, 1); err != nil { + bs.logger.Errorf("failed to acquire semaphore: %v", err) + return + } + defer sem.Release(1) + safeExtractedFpBTCSK.UseKey(func(key *secp256k1.PrivateKey) { bs.slashBTCDelegation(fpBTCPK, key, d) }) diff --git a/btcstaking-tracker/btcslasher/slasher_test.go b/btcstaking-tracker/btcslasher/slasher_test.go index b8e23fab..e0fb37e3 100644 --- a/btcstaking-tracker/btcslasher/slasher_test.go +++ b/btcstaking-tracker/btcslasher/slasher_test.go @@ -79,6 +79,7 @@ func FuzzSlasher(f *testing.F) { commonCfg.RetrySleepTime, commonCfg.MaxRetrySleepTime, commonCfg.MaxRetryTimes, + config.MaxSlashingConcurrency, slashedFPSKChan, metrics.NewBTCStakingTrackerMetrics().SlasherMetrics, ) diff --git a/btcstaking-tracker/tracker.go b/btcstaking-tracker/tracker.go index ca6a696b..e8a1fb4e 100644 --- a/btcstaking-tracker/tracker.go +++ b/btcstaking-tracker/tracker.go @@ -88,6 +88,7 @@ func NewBTCSTakingTracker( commonCfg.RetrySleepTime, commonCfg.MaxRetrySleepTime, commonCfg.MaxRetryTimes, + cfg.MaxSlashingConcurrency, slashedFPSKChan, metrics.SlasherMetrics, ) diff --git a/config/btcstaking_tracker.go b/config/btcstaking_tracker.go index 3ed3ff0a..5d846b33 100644 --- a/config/btcstaking_tracker.go +++ b/config/btcstaking_tracker.go @@ -8,7 +8,10 @@ import ( "github.com/babylonlabs-io/vigilante/types" ) -const maxBatchSize = 10000 +const ( + maxBatchSize = 10000 + MaxSlashingConcurrency = 20 +) type BTCStakingTrackerConfig struct { CheckDelegationsInterval time.Duration `mapstructure:"check-delegations-interval"` @@ -18,6 +21,8 @@ type BTCStakingTrackerConfig struct { RetryJitter time.Duration `mapstructure:"max-jitter-interval"` // the BTC network BTCNetParams string `mapstructure:"btcnetparams"` // should be mainnet|testnet|simnet|signet|regtest + // number of concurrent requests that when slashing + MaxSlashingConcurrency uint8 `mapstructure:"max-slashing-concurrency"` } func DefaultBTCStakingTrackerConfig() BTCStakingTrackerConfig { @@ -26,11 +31,12 @@ func DefaultBTCStakingTrackerConfig() BTCStakingTrackerConfig { NewDelegationsBatchSize: 100, // This can be quite large to avoid wasting resources on checking if delegation is active CheckDelegationActiveInterval: 5 * time.Minute, - // This schould be small, as we want to report unbonding tx as soon as possible even if we initialy failed + // This should be small, as we want to report unbonding tx as soon as possible even if we initially failed RetrySubmitUnbondingTxInterval: 1 * time.Minute, // pretty large jitter to avoid spamming babylon with requests - RetryJitter: 30 * time.Second, - BTCNetParams: types.BtcSimnet.String(), + RetryJitter: 30 * time.Second, + BTCNetParams: types.BtcSimnet.String(), + MaxSlashingConcurrency: MaxSlashingConcurrency, } } @@ -58,5 +64,9 @@ func (cfg *BTCStakingTrackerConfig) Validate() error { return fmt.Errorf("invalid net params %s", cfg.BTCNetParams) } + if cfg.MaxSlashingConcurrency == 0 { + return errors.New("max-slashing-concurrency cannot be 0") + } + return nil }