From 207ee09936c8eeb4cb3f351fdc07071d9e0d8932 Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 3 Oct 2024 17:41:26 +0200 Subject: [PATCH] separate poll by status --- .../expected_babylon_client.go | 1 - .../mock_babylon_client.go | 15 --- .../stakingeventwatcher.go | 91 +++++++++++++------ 3 files changed, 62 insertions(+), 45 deletions(-) diff --git a/btcstaking-tracker/stakingeventwatcher/expected_babylon_client.go b/btcstaking-tracker/stakingeventwatcher/expected_babylon_client.go index 586c6e3..fef3f00 100644 --- a/btcstaking-tracker/stakingeventwatcher/expected_babylon_client.go +++ b/btcstaking-tracker/stakingeventwatcher/expected_babylon_client.go @@ -24,7 +24,6 @@ type Delegation struct { } type BabylonNodeAdapter interface { - BtcDelegations(offset uint64, limit uint64) ([]Delegation, error) DelegationsByStatus(status btcstakingtypes.BTCDelegationStatus, offset uint64, limit uint64) ([]Delegation, error) IsDelegationActive(stakingTxHash chainhash.Hash) (bool, error) IsDelegationVerified(stakingTxHash chainhash.Hash) (bool, error) diff --git a/btcstaking-tracker/stakingeventwatcher/mock_babylon_client.go b/btcstaking-tracker/stakingeventwatcher/mock_babylon_client.go index fcb0a4e..d38ea70 100644 --- a/btcstaking-tracker/stakingeventwatcher/mock_babylon_client.go +++ b/btcstaking-tracker/stakingeventwatcher/mock_babylon_client.go @@ -67,21 +67,6 @@ func (mr *MockBabylonNodeAdapterMockRecorder) BtcClientTipHeight() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BtcClientTipHeight", reflect.TypeOf((*MockBabylonNodeAdapter)(nil).BtcClientTipHeight)) } -// BtcDelegations mocks base method. -func (m *MockBabylonNodeAdapter) BtcDelegations(offset, limit uint64) ([]Delegation, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BtcDelegations", offset, limit) - ret0, _ := ret[0].([]Delegation) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BtcDelegations indicates an expected call of BtcDelegations. -func (mr *MockBabylonNodeAdapterMockRecorder) BtcDelegations(offset, limit interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BtcDelegations", reflect.TypeOf((*MockBabylonNodeAdapter)(nil).BtcDelegations), offset, limit) -} - // DelegationsByStatus mocks base method. func (m *MockBabylonNodeAdapter) DelegationsByStatus(status types0.BTCDelegationStatus, offset, limit uint64) ([]Delegation, error) { m.ctrl.T.Helper() diff --git a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go index 8a16938..8935acc 100644 --- a/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go +++ b/btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" btcctypes "github.com/babylonlabs-io/babylon/x/btccheckpoint/types" + btcstakingtypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" "github.com/babylonlabs-io/vigilante/btcclient" "github.com/babylonlabs-io/vigilante/types" "sync" @@ -166,10 +167,10 @@ func (sew *StakingEventWatcher) handleNewBlocks(blockNotifier *notifier.BlockEpo // checkBabylonDelegations iterates over all active babylon delegations, and reports not already // tracked delegations to the unbondingDelegationChan -func (sew *StakingEventWatcher) checkBabylonDelegations() error { +func (sew *StakingEventWatcher) checkBabylonDelegations(status btcstakingtypes.BTCDelegationStatus, addDel func(del Delegation)) error { var i = uint64(0) for { - delegations, err := sew.babylonNodeAdapter.BtcDelegations(i, sew.cfg.NewDelegationsBatchSize) + delegations, err := sew.babylonNodeAdapter.DelegationsByStatus(status, i, sew.cfg.NewDelegationsBatchSize) if err != nil { return fmt.Errorf("error fetching active delegations from babylon: %v", err) @@ -178,33 +179,11 @@ func (sew *StakingEventWatcher) checkBabylonDelegations() error { sew.logger.Debugf("fetched %d delegations from babylon", len(delegations)) for _, delegation := range delegations { - stakingTxHash := delegation.StakingTx.TxHash() - - del := &newDelegation{ - stakingTxHash: stakingTxHash, - stakingTx: delegation.StakingTx, - stakingOutputIdx: delegation.StakingOutputIdx, - delegationStartHeight: delegation.DelegationStartHeight, - unbondingOutput: delegation.UnbondingOutput, - } - - // if we already have this delegation, skip it - // we should track both verified and active status for unbonding - if sew.unbondingTracker.GetDelegation(stakingTxHash) == nil { - utils.PushOrQuit(sew.unbondingDelegationChan, del, sew.quit) - } - - if sew.pendingTracker.GetDelegation(stakingTxHash) == nil && !delegation.HasProof { - _, _ = sew.pendingTracker.AddDelegation( - del.stakingTx, - del.stakingOutputIdx, - del.unbondingOutput, - ) - } + addDel(delegation) } if len(delegations) < int(sew.cfg.NewDelegationsBatchSize) { - // we received less delegations than we asked for, it means went through all of them + // we received fewer delegations than we asked for; it means went through all of them return nil } @@ -238,11 +217,65 @@ func (sew *StakingEventWatcher) fetchDelegations() { continue } - if err = sew.checkBabylonDelegations(); err != nil { - sew.logger.Errorf("error checking babylon delegations: %v", err) - continue + addToUnbonding := func(delegation Delegation) { + del := &newDelegation{ + stakingTxHash: delegation.StakingTx.TxHash(), + stakingTx: delegation.StakingTx, + stakingOutputIdx: delegation.StakingOutputIdx, + delegationStartHeight: delegation.DelegationStartHeight, + unbondingOutput: delegation.UnbondingOutput, + } + + // if we already have this delegation, skip it + // we should track both verified and active status for unbonding + if sew.unbondingTracker.GetDelegation(delegation.StakingTx.TxHash()) == nil { + utils.PushOrQuit(sew.unbondingDelegationChan, del, sew.quit) + } + } + + addToPending := func(delegation Delegation) { + del := &newDelegation{ + stakingTxHash: delegation.StakingTx.TxHash(), + stakingTx: delegation.StakingTx, + stakingOutputIdx: delegation.StakingOutputIdx, + delegationStartHeight: delegation.DelegationStartHeight, + unbondingOutput: delegation.UnbondingOutput, + } + + if sew.pendingTracker.GetDelegation(delegation.StakingTx.TxHash()) == nil && !delegation.HasProof { + _, _ = sew.pendingTracker.AddDelegation( + del.stakingTx, + del.stakingOutputIdx, + del.unbondingOutput, + ) + } } + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_ACTIVE, addToUnbonding); err != nil { + sew.logger.Errorf("error checking babylon delegations: %v", err) + } + }() + + go func() { + defer wg.Done() + if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_VERIFIED, addToUnbonding); err != nil { + sew.logger.Errorf("error checking babylon delegations: %v", err) + } + }() + + go func() { + defer wg.Done() + if err = sew.checkBabylonDelegations(btcstakingtypes.BTCDelegationStatus_VERIFIED, addToPending); err != nil { + sew.logger.Errorf("error checking babylon delegations: %v", err) + } + }() + + wg.Wait() case <-sew.quit: sew.logger.Debug("fetch delegations loop quit") return