Skip to content

Commit

Permalink
separate poll by status
Browse files Browse the repository at this point in the history
  • Loading branch information
Lazar955 committed Oct 3, 2024
1 parent 50f222b commit 207ee09
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type Delegation struct {
}

type BabylonNodeAdapter interface {
BtcDelegations(offset uint64, limit uint64) ([]Delegation, error)
DelegationsByStatus(status btcstakingtypes.BTCDelegationStatus, offset uint64, limit uint64) ([]Delegation, error)
IsDelegationActive(stakingTxHash chainhash.Hash) (bool, error)
IsDelegationVerified(stakingTxHash chainhash.Hash) (bool, error)
Expand Down
15 changes: 0 additions & 15 deletions btcstaking-tracker/stakingeventwatcher/mock_babylon_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 62 additions & 29 deletions btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
btcctypes "github.com/babylonlabs-io/babylon/x/btccheckpoint/types"
btcstakingtypes "github.com/babylonlabs-io/babylon/x/btcstaking/types"
"github.com/babylonlabs-io/vigilante/btcclient"
"github.com/babylonlabs-io/vigilante/types"
"sync"
Expand Down Expand Up @@ -166,10 +167,10 @@ func (sew *StakingEventWatcher) handleNewBlocks(blockNotifier *notifier.BlockEpo

// checkBabylonDelegations iterates over all active babylon delegations, and reports not already
// tracked delegations to the unbondingDelegationChan
func (sew *StakingEventWatcher) checkBabylonDelegations() error {
func (sew *StakingEventWatcher) checkBabylonDelegations(status btcstakingtypes.BTCDelegationStatus, addDel func(del Delegation)) error {
var i = uint64(0)
for {
delegations, err := sew.babylonNodeAdapter.BtcDelegations(i, sew.cfg.NewDelegationsBatchSize)
delegations, err := sew.babylonNodeAdapter.DelegationsByStatus(status, i, sew.cfg.NewDelegationsBatchSize)

if err != nil {
return fmt.Errorf("error fetching active delegations from babylon: %v", err)
Expand All @@ -178,33 +179,11 @@ func (sew *StakingEventWatcher) checkBabylonDelegations() error {
sew.logger.Debugf("fetched %d delegations from babylon", len(delegations))

for _, delegation := range delegations {
stakingTxHash := delegation.StakingTx.TxHash()

del := &newDelegation{
stakingTxHash: stakingTxHash,
stakingTx: delegation.StakingTx,
stakingOutputIdx: delegation.StakingOutputIdx,
delegationStartHeight: delegation.DelegationStartHeight,
unbondingOutput: delegation.UnbondingOutput,
}

// if we already have this delegation, skip it
// we should track both verified and active status for unbonding
if sew.unbondingTracker.GetDelegation(stakingTxHash) == nil {
utils.PushOrQuit(sew.unbondingDelegationChan, del, sew.quit)
}

if sew.pendingTracker.GetDelegation(stakingTxHash) == nil && !delegation.HasProof {
_, _ = sew.pendingTracker.AddDelegation(
del.stakingTx,
del.stakingOutputIdx,
del.unbondingOutput,
)
}
addDel(delegation)
}

if len(delegations) < int(sew.cfg.NewDelegationsBatchSize) {
// we received less delegations than we asked for, it means went through all of them
// we received fewer delegations than we asked for; it means went through all of them
return nil
}

Expand Down Expand Up @@ -238,11 +217,65 @@ func (sew *StakingEventWatcher) fetchDelegations() {
continue
}

if err = sew.checkBabylonDelegations(); err != nil {
sew.logger.Errorf("error checking babylon delegations: %v", err)
continue
addToUnbonding := func(delegation Delegation) {
del := &newDelegation{
stakingTxHash: delegation.StakingTx.TxHash(),
stakingTx: delegation.StakingTx,
stakingOutputIdx: delegation.StakingOutputIdx,
delegationStartHeight: delegation.DelegationStartHeight,
unbondingOutput: delegation.UnbondingOutput,
}

// if we already have this delegation, skip it
// we should track both verified and active status for unbonding
if sew.unbondingTracker.GetDelegation(delegation.StakingTx.TxHash()) == nil {
utils.PushOrQuit(sew.unbondingDelegationChan, del, sew.quit)
}
}

addToPending := func(delegation Delegation) {
del := &newDelegation{
stakingTxHash: delegation.StakingTx.TxHash(),
stakingTx: delegation.StakingTx,
stakingOutputIdx: delegation.StakingOutputIdx,
delegationStartHeight: delegation.DelegationStartHeight,
unbondingOutput: delegation.UnbondingOutput,
}

if sew.pendingTracker.GetDelegation(delegation.StakingTx.TxHash()) == nil && !delegation.HasProof {
_, _ = sew.pendingTracker.AddDelegation(
del.stakingTx,
del.stakingOutputIdx,
del.unbondingOutput,
)
}
}

var wg sync.WaitGroup
wg.Add(3)

go func() {
defer wg.Done()
if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_ACTIVE, addToUnbonding); err != nil {
sew.logger.Errorf("error checking babylon delegations: %v", err)
}
}()

go func() {
defer wg.Done()
if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_VERIFIED, addToUnbonding); err != nil {
sew.logger.Errorf("error checking babylon delegations: %v", err)
}
}()

go func() {
defer wg.Done()
if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_VERIFIED, addToPending); err != nil {
sew.logger.Errorf("error checking babylon delegations: %v", err)
}
}()

wg.Wait()
case <-sew.quit:
sew.logger.Debug("fetch delegations loop quit")
return
Expand Down

0 comments on commit 207ee09

Please sign in to comment.