Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(btcslasher): uses semaphore to limit btc node requests #40

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions btcstaking-tracker/btcslasher/bootstrapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func FuzzSlasher_Bootstrapping(f *testing.F) {
commonCfg.RetrySleepTime,
commonCfg.MaxRetrySleepTime,
commonCfg.MaxRetryTimes,
config.MaxSlashingConcurrency,
slashedFPSKChan,
metrics.NewBTCStakingTrackerMetrics().SlasherMetrics,
)
Expand Down
52 changes: 39 additions & 13 deletions btcstaking-tracker/btcslasher/slasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/babylonlabs-io/vigilante/types"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
"golang.org/x/sync/semaphore"
"sync"
"time"

Expand Down Expand Up @@ -46,6 +47,8 @@ type BTCSlasher struct {
// channel for receiving the slash result of each BTC delegation
slashResultChan chan *SlashResult

maxSlashingConcurrency int64

metrics *metrics.SlasherMetrics

startOnce sync.Once
Expand All @@ -62,23 +65,25 @@ func New(
retrySleepTime time.Duration,
maxRetrySleepTime time.Duration,
maxRetryTimes uint,
maxSlashingConcurrency uint8,
slashedFPSKChan chan *btcec.PrivateKey,
metrics *metrics.SlasherMetrics,
) (*BTCSlasher, error) {
logger := parentLogger.With(zap.String("module", "slasher")).Sugar()

return &BTCSlasher{
logger: logger,
BTCClient: btcClient,
BBNQuerier: bbnQuerier,
netParams: netParams,
retrySleepTime: retrySleepTime,
maxRetrySleepTime: maxRetrySleepTime,
maxRetryTimes: maxRetryTimes,
slashedFPSKChan: slashedFPSKChan,
slashResultChan: make(chan *SlashResult, 1000),
quit: make(chan struct{}),
metrics: metrics,
logger: logger,
BTCClient: btcClient,
BBNQuerier: bbnQuerier,
netParams: netParams,
retrySleepTime: retrySleepTime,
maxRetrySleepTime: maxRetrySleepTime,
maxRetryTimes: maxRetryTimes,
maxSlashingConcurrency: int64(maxSlashingConcurrency),
slashedFPSKChan: slashedFPSKChan,
slashResultChan: make(chan *SlashResult, 1000),
quit: make(chan struct{}),
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -251,24 +256,45 @@ func (bs *BTCSlasher) SlashFinalityProvider(extractedFpBTCSK *btcec.PrivateKey)
// Initialize a mutex protected *btcec.PrivateKey
safeExtractedFpBTCSK := types.NewPrivateKeyWithMutex(extractedFpBTCSK)

// Initialize a semaphore to control the number of concurrent operations
sem := semaphore.NewWeighted(bs.maxSlashingConcurrency)

// try to slash both staking and unbonding txs for each BTC delegation
// sign and submit slashing tx for each active delegation
// TODO: use semaphore to prevent spamming BTC node
for _, del := range activeBTCDels {
Lazar955 marked this conversation as resolved.
Show resolved Hide resolved
bs.wg.Add(1)
go func(d *bstypes.BTCDelegationResponse) {
defer bs.wg.Done()
ctx, cancel := bs.quitContext()
defer cancel()

// Acquire the semaphore before interacting with the BTC node
if err := sem.Acquire(ctx, 1); err != nil {
bs.logger.Errorf("failed to acquire semaphore: %v", err)
return
}
defer sem.Release(1)

safeExtractedFpBTCSK.UseKey(func(key *secp256k1.PrivateKey) {
bs.slashBTCDelegation(fpBTCPK, key, d)
})
}(del)
}
// sign and submit slashing tx for each unbonded delegation
// TODO: use semaphore to prevent spamming BTC node
for _, del := range unbondedBTCDels {
bs.wg.Add(1)
go func(d *bstypes.BTCDelegationResponse) {
defer bs.wg.Done()
ctx, cancel := bs.quitContext()
defer cancel()

// Acquire the semaphore before interacting with the BTC node
if err := sem.Acquire(ctx, 1); err != nil {
bs.logger.Errorf("failed to acquire semaphore: %v", err)
return
}
defer sem.Release(1)

safeExtractedFpBTCSK.UseKey(func(key *secp256k1.PrivateKey) {
bs.slashBTCDelegation(fpBTCPK, key, d)
})
Expand Down
1 change: 1 addition & 0 deletions btcstaking-tracker/btcslasher/slasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func FuzzSlasher(f *testing.F) {
commonCfg.RetrySleepTime,
commonCfg.MaxRetrySleepTime,
commonCfg.MaxRetryTimes,
config.MaxSlashingConcurrency,
slashedFPSKChan,
metrics.NewBTCStakingTrackerMetrics().SlasherMetrics,
)
Expand Down
1 change: 1 addition & 0 deletions btcstaking-tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewBTCSTakingTracker(
commonCfg.RetrySleepTime,
commonCfg.MaxRetrySleepTime,
commonCfg.MaxRetryTimes,
cfg.MaxSlashingConcurrency,
slashedFPSKChan,
metrics.SlasherMetrics,
)
Expand Down
18 changes: 14 additions & 4 deletions config/btcstaking_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"github.com/babylonlabs-io/vigilante/types"
)

const maxBatchSize = 10000
const (
maxBatchSize = 10000
MaxSlashingConcurrency = 20
)

type BTCStakingTrackerConfig struct {
CheckDelegationsInterval time.Duration `mapstructure:"check-delegations-interval"`
Expand All @@ -18,6 +21,8 @@ type BTCStakingTrackerConfig struct {
RetryJitter time.Duration `mapstructure:"max-jitter-interval"`
// the BTC network
BTCNetParams string `mapstructure:"btcnetparams"` // should be mainnet|testnet|simnet|signet|regtest
// number of concurrent requests that when slashing
MaxSlashingConcurrency uint8 `mapstructure:"max-slashing-concurrency"`
}

func DefaultBTCStakingTrackerConfig() BTCStakingTrackerConfig {
Expand All @@ -26,11 +31,12 @@ func DefaultBTCStakingTrackerConfig() BTCStakingTrackerConfig {
NewDelegationsBatchSize: 100,
// This can be quite large to avoid wasting resources on checking if delegation is active
CheckDelegationActiveInterval: 5 * time.Minute,
// This schould be small, as we want to report unbonding tx as soon as possible even if we initialy failed
// This should be small, as we want to report unbonding tx as soon as possible even if we initially failed
RetrySubmitUnbondingTxInterval: 1 * time.Minute,
// pretty large jitter to avoid spamming babylon with requests
RetryJitter: 30 * time.Second,
BTCNetParams: types.BtcSimnet.String(),
RetryJitter: 30 * time.Second,
BTCNetParams: types.BtcSimnet.String(),
MaxSlashingConcurrency: MaxSlashingConcurrency,
}
}

Expand Down Expand Up @@ -58,5 +64,9 @@ func (cfg *BTCStakingTrackerConfig) Validate() error {
return fmt.Errorf("invalid net params %s", cfg.BTCNetParams)
}

if cfg.MaxSlashingConcurrency == 0 {
return errors.New("max-slashing-concurrency cannot be 0")
}

return nil
}
Loading