diff --git a/clientcontroller/api/interface.go b/clientcontroller/api/interface.go index 7af55c3a..551f9f94 100644 --- a/clientcontroller/api/interface.go +++ b/clientcontroller/api/interface.go @@ -71,3 +71,24 @@ type ConsumerController interface { Close() error } + +// ChainPoller is a interface that got block info from the consumer chain. +type ConsumerChainPoller interface { + // Start the poller, will begin with the `startHeight` + Start(startHeight uint64) error + + // SkipToHeight make poller skip blocks in `GetBlockInfoChan` until the `height`. + SkipToHeight(height uint64) error + + // Return read only channel for incoming blocks + // TODO: Handle the case when there is more than one consumer. Currently with more than + // one consumer blocks most probably will be received out of order to those consumers. + GetBlockInfoChan() <-chan *types.BlockInfo + + Stop() error +} + +// ConsumerChainPollerFactory is a factory for creating chain pollers by fp manager +type ConsumerChainPollerFactory interface { + CreateChainPoller() (ConsumerChainPoller, error) +} diff --git a/finality-provider/service/app.go b/finality-provider/service/app.go index ddb4a1e1..a921db24 100644 --- a/finality-provider/service/app.go +++ b/finality-provider/service/app.go @@ -77,15 +77,21 @@ func NewFinalityProviderAppFromConfig( logger.Info("successfully connected to a remote EOTS manager", zap.String("address", cfg.EOTSManagerAddress)) - return NewFinalityProviderApp(cfg, cc, consumerCon, em, db, logger) + fpMetrics := metrics.NewFpMetrics() + + pollerFactory := NewChainPollerFactory(logger, cfg.PollerConfig, cc, consumerCon, fpMetrics) + + return NewFinalityProviderApp(cfg, cc, consumerCon, pollerFactory, em, db, fpMetrics, logger) } func NewFinalityProviderApp( config *fpcfg.Config, cc ccapi.ClientController, // TODO: this should be renamed as client controller is always going to be babylon consumerCon ccapi.ConsumerController, + pollerFactory ccapi.ConsumerChainPollerFactory, em eotsmanager.EOTSManager, db kvdb.Backend, + fpMetrics *metrics.FpMetrics, logger *zap.Logger, ) (*FinalityProviderApp, error) { fpStore, err := store.NewFinalityProviderStore(db) @@ -108,9 +114,7 @@ func NewFinalityProviderApp( return nil, fmt.Errorf("failed to create keyring: %w", err) } - fpMetrics := metrics.NewFpMetrics() - - fpm, err := NewFinalityProviderManager(fpStore, pubRandStore, config, cc, consumerCon, em, fpMetrics, logger) + fpm, err := NewFinalityProviderManager(fpStore, pubRandStore, config, cc, consumerCon, pollerFactory, em, fpMetrics, logger) if err != nil { return nil, fmt.Errorf("failed to create finality-provider manager: %w", err) } diff --git a/finality-provider/service/app_test.go b/finality-provider/service/app_test.go index c6126474..5eb262a4 100644 --- a/finality-provider/service/app_test.go +++ b/finality-provider/service/app_test.go @@ -22,6 +22,7 @@ import ( "github.com/babylonlabs-io/finality-provider/finality-provider/config" "github.com/babylonlabs-io/finality-provider/finality-provider/proto" "github.com/babylonlabs-io/finality-provider/finality-provider/service" + "github.com/babylonlabs-io/finality-provider/metrics" "github.com/babylonlabs-io/finality-provider/testutil" "github.com/babylonlabs-io/finality-provider/types" ) @@ -68,7 +69,10 @@ func FuzzRegisterFinalityProvider(f *testing.F) { fpCfg.PollerConfig.StaticChainScanningStartHeight = randomStartingHeight fpdb, err := fpCfg.DatabaseConfig.GetDbBackend() require.NoError(t, err) - app, err := service.NewFinalityProviderApp(&fpCfg, mockBabylonController, mockConsumerController, em, fpdb, logger) + + fpMetrics := metrics.NewFpMetrics() + pollerFactory := service.NewChainPollerFactory(logger, fpCfg.PollerConfig, mockBabylonController, mockConsumerController, fpMetrics) + app, err := service.NewFinalityProviderApp(&fpCfg, mockBabylonController, mockConsumerController, pollerFactory, em, fpdb, fpMetrics, logger) require.NoError(t, err) defer func() { err = fpdb.Close() @@ -203,7 +207,10 @@ func FuzzSyncFinalityProviderStatus(f *testing.F) { mockConsumerController.EXPECT().QueryActivatedHeight().Return(currentHeight, nil).AnyTimes() } - app, err := service.NewFinalityProviderApp(&fpCfg, mockBabylonController, mockConsumerController, em, fpdb, logger) + fpMetrics := metrics.NewFpMetrics() + pollerFactory := service.NewChainPollerFactory(logger, fpCfg.PollerConfig, mockBabylonController, mockConsumerController, fpMetrics) + + app, err := service.NewFinalityProviderApp(&fpCfg, mockBabylonController, mockConsumerController, pollerFactory, em, fpdb, fpMetrics, logger) require.NoError(t, err) err = app.Start() diff --git a/finality-provider/service/chain_poller.go b/finality-provider/service/chain_poller.go index ec969ffe..55a63dc4 100644 --- a/finality-provider/service/chain_poller.go +++ b/finality-provider/service/chain_poller.go @@ -37,6 +37,36 @@ type skipHeightResponse struct { err error } +type ChainPollerFactory struct { + logger *zap.Logger + cfg *cfg.ChainPollerConfig + cc ccapi.ClientController + consumerCon ccapi.ConsumerController + metrics *metrics.FpMetrics +} + +var _ ccapi.ConsumerChainPollerFactory = &ChainPollerFactory{} + +func NewChainPollerFactory( + logger *zap.Logger, + cfg *cfg.ChainPollerConfig, + cc ccapi.ClientController, + consumerCon ccapi.ConsumerController, + metrics *metrics.FpMetrics, +) *ChainPollerFactory { + return &ChainPollerFactory{ + logger: logger, + cfg: cfg, + cc: cc, + consumerCon: consumerCon, + metrics: metrics, + } +} + +func (f *ChainPollerFactory) CreateChainPoller() (ccapi.ConsumerChainPoller, error) { + return NewChainPoller(f.logger, f.cfg, f.cc, f.consumerCon, f.metrics), nil +} + type ChainPoller struct { isStarted *atomic.Bool wg sync.WaitGroup @@ -52,6 +82,8 @@ type ChainPoller struct { logger *zap.Logger } +var _ ccapi.ConsumerChainPoller = &ChainPoller{} + func NewChainPoller( logger *zap.Logger, cfg *cfg.ChainPollerConfig, diff --git a/finality-provider/service/fp_instance.go b/finality-provider/service/fp_instance.go index b15d066d..d03bdcbc 100644 --- a/finality-provider/service/fp_instance.go +++ b/finality-provider/service/fp_instance.go @@ -39,7 +39,7 @@ type FinalityProviderInstance struct { em eotsmanager.EOTSManager cc ccapi.ClientController consumerCon ccapi.ConsumerController - poller *ChainPoller + poller ccapi.ConsumerChainPoller metrics *metrics.FpMetrics // passphrase is used to unlock private keys @@ -65,6 +65,7 @@ func NewFinalityProviderInstance( prStore *store.PubRandProofStore, cc ccapi.ClientController, consumerCon ccapi.ConsumerController, + poller ccapi.ConsumerChainPoller, em eotsmanager.EOTSManager, metrics *metrics.FpMetrics, passphrase string, @@ -95,6 +96,7 @@ func NewFinalityProviderInstance( em: em, cc: cc, consumerCon: consumerCon, + poller: poller, metrics: metrics, }, nil } @@ -114,14 +116,10 @@ func (fp *FinalityProviderInstance) Start() error { fp.logger.Info("the finality-provider has been bootstrapped", zap.String("pk", fp.GetBtcPkHex()), zap.Uint64("height", startHeight)) - poller := NewChainPoller(fp.logger, fp.cfg.PollerConfig, fp.cc, fp.consumerCon, fp.metrics) - - if err := poller.Start(startHeight); err != nil { + if err := fp.poller.Start(startHeight); err != nil { return fmt.Errorf("failed to start the poller: %w", err) } - fp.poller = poller - fp.laggingTargetChan = make(chan uint64, 1) fp.quit = make(chan struct{}) diff --git a/finality-provider/service/fp_instance_test.go b/finality-provider/service/fp_instance_test.go index d5dfb17b..70ff64d8 100644 --- a/finality-provider/service/fp_instance_test.go +++ b/finality-provider/service/fp_instance_test.go @@ -118,7 +118,12 @@ func startFinalityProviderAppWithRegisteredFp(t *testing.T, r *rand.Rand, cc cca fpCfg.PollerConfig.StaticChainScanningStartHeight = startingHeight db, err := fpCfg.DatabaseConfig.GetDbBackend() require.NoError(t, err) - app, err := service.NewFinalityProviderApp(&fpCfg, cc, consumerCon, em, db, logger) + + // TODO: use mock metrics + fpMetrics := metrics.NewFpMetrics() + pollerFactory := service.NewChainPollerFactory(logger, fpCfg.PollerConfig, cc, consumerCon, fpMetrics) + + app, err := service.NewFinalityProviderApp(&fpCfg, cc, consumerCon, pollerFactory, em, db, fpMetrics, logger) require.NoError(t, err) err = app.Start() require.NoError(t, err) @@ -131,9 +136,11 @@ func startFinalityProviderAppWithRegisteredFp(t *testing.T, r *rand.Rand, cc cca fpStore := app.GetFinalityProviderStore() err = fpStore.SetFpStatus(fp.BtcPk, proto.FinalityProviderStatus_REGISTERED) require.NoError(t, err) - // TODO: use mock metrics - m := metrics.NewFpMetrics() - fpIns, err := service.NewFinalityProviderInstance(fp.GetBIP340BTCPK(), &fpCfg, app.GetFinalityProviderStore(), pubRandProofStore, cc, consumerCon, em, m, passphrase, make(chan *service.CriticalError), logger) + + poller, err := pollerFactory.CreateChainPoller() + require.NoError(t, err) + + fpIns, err := service.NewFinalityProviderInstance(fp.GetBIP340BTCPK(), &fpCfg, app.GetFinalityProviderStore(), pubRandProofStore, cc, consumerCon, poller, em, fpMetrics, passphrase, make(chan *service.CriticalError), logger) require.NoError(t, err) cleanUp := func() { diff --git a/finality-provider/service/fp_manager.go b/finality-provider/service/fp_manager.go index 7a37ca6f..502ae18c 100644 --- a/finality-provider/service/fp_manager.go +++ b/finality-provider/service/fp_manager.go @@ -42,13 +42,14 @@ type FinalityProviderManager struct { fpis map[string]*FinalityProviderInstance // needed for initiating finality-provider instances - fps *store.FinalityProviderStore - pubRandStore *store.PubRandProofStore - config *fpcfg.Config - cc ccapi.ClientController - consumerCon ccapi.ConsumerController - em eotsmanager.EOTSManager - logger *zap.Logger + fps *store.FinalityProviderStore + pubRandStore *store.PubRandProofStore + config *fpcfg.Config + cc ccapi.ClientController + consumerCon ccapi.ConsumerController + pollerFactory ccapi.ConsumerChainPollerFactory + em eotsmanager.EOTSManager + logger *zap.Logger metrics *metrics.FpMetrics @@ -63,6 +64,7 @@ func NewFinalityProviderManager( config *fpcfg.Config, cc ccapi.ClientController, consumerCon ccapi.ConsumerController, + pollerFactory ccapi.ConsumerChainPollerFactory, em eotsmanager.EOTSManager, metrics *metrics.FpMetrics, logger *zap.Logger, @@ -76,6 +78,7 @@ func NewFinalityProviderManager( config: config, cc: cc, consumerCon: consumerCon, + pollerFactory: pollerFactory, em: em, metrics: metrics, logger: logger, @@ -433,7 +436,25 @@ func (fpm *FinalityProviderManager) addFinalityProviderInstance( return fmt.Errorf("finality-provider instance already exists") } - fpIns, err := NewFinalityProviderInstance(pk, fpm.config, fpm.fps, fpm.pubRandStore, fpm.cc, fpm.consumerCon, fpm.em, fpm.metrics, passphrase, fpm.criticalErrChan, fpm.logger) + poller, err := fpm.pollerFactory.CreateChainPoller() + if err != nil { + return fmt.Errorf("failed to create chain poller %s instance: %v", pkHex, err) + } + + fpIns, err := NewFinalityProviderInstance( + pk, + fpm.config, + fpm.fps, + fpm.pubRandStore, + fpm.cc, + fpm.consumerCon, + poller, + fpm.em, + fpm.metrics, + passphrase, + fpm.criticalErrChan, + fpm.logger, + ) if err != nil { return fmt.Errorf("failed to create finality-provider %s instance: %w", pkHex, err) } diff --git a/finality-provider/service/fp_manager_test.go b/finality-provider/service/fp_manager_test.go index 0c6c78b9..9cf93a83 100644 --- a/finality-provider/service/fp_manager_test.go +++ b/finality-provider/service/fp_manager_test.go @@ -135,7 +135,9 @@ func newFinalityProviderManagerWithRegisteredFp(t *testing.T, r *rand.Rand, cc c require.NoError(t, err) metricsCollectors := metrics.NewFpMetrics() - vm, err := service.NewFinalityProviderManager(fpStore, pubRandStore, &fpCfg, cc, consumerCon, em, metricsCollectors, logger) + pollerFactory := service.NewChainPollerFactory(logger, fpCfg.PollerConfig, cc, consumerCon, metricsCollectors) + + vm, err := service.NewFinalityProviderManager(fpStore, pubRandStore, &fpCfg, cc, consumerCon, pollerFactory, em, metricsCollectors, logger) require.NoError(t, err) // create registered finality-provider diff --git a/itest/babylon/babylon_test_manager.go b/itest/babylon/babylon_test_manager.go index 20de332c..e75f2535 100644 --- a/itest/babylon/babylon_test_manager.go +++ b/itest/babylon/babylon_test_manager.go @@ -14,6 +14,7 @@ import ( "github.com/babylonlabs-io/finality-provider/clientcontroller" e2eutils "github.com/babylonlabs-io/finality-provider/itest" base_test_manager "github.com/babylonlabs-io/finality-provider/itest/test-manager" + "github.com/babylonlabs-io/finality-provider/metrics" "github.com/btcsuite/btcd/btcec/v2" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -79,8 +80,12 @@ func StartManager(t *testing.T) *TestManager { // 4. prepare finality-provider fpdb, err := cfg.DatabaseConfig.GetDbBackend() require.NoError(t, err) - fpApp, err := service.NewFinalityProviderApp(cfg, bc, bcc, eotsCli, fpdb, logger) + + fpMetrics := metrics.NewFpMetrics() + pollerFactory := service.NewChainPollerFactory(logger, cfg.PollerConfig, bc, bcc, fpMetrics) + fpApp, err := service.NewFinalityProviderApp(cfg, bc, bcc, pollerFactory, eotsCli, fpdb, fpMetrics, logger) require.NoError(t, err) + err = fpApp.Start() require.NoError(t, err) diff --git a/itest/cosmwasm/wasmd/wasmd_test_manager.go b/itest/cosmwasm/wasmd/wasmd_test_manager.go index 2c0a83c6..2f646f03 100644 --- a/itest/cosmwasm/wasmd/wasmd_test_manager.go +++ b/itest/cosmwasm/wasmd/wasmd_test_manager.go @@ -23,6 +23,7 @@ import ( fpcfg "github.com/babylonlabs-io/finality-provider/finality-provider/config" "github.com/babylonlabs-io/finality-provider/finality-provider/service" e2eutils "github.com/babylonlabs-io/finality-provider/itest" + "github.com/babylonlabs-io/finality-provider/metrics" "github.com/babylonlabs-io/finality-provider/types" "github.com/btcsuite/btcd/btcec/v2" dbm "github.com/cosmos/cosmos-db" @@ -107,7 +108,10 @@ func StartWasmdTestManager(t *testing.T) *WasmdTestManager { // 5. prepare finality-provider fpdb, err := cfg.DatabaseConfig.GetDbBackend() require.NoError(t, err) - fpApp, err := service.NewFinalityProviderApp(cfg, bc, wcc, eotsCli, fpdb, logger) + + fpMetrics := metrics.NewFpMetrics() + pollerFactory := service.NewChainPollerFactory(logger, cfg.PollerConfig, bc, wcc, fpMetrics) + fpApp, err := service.NewFinalityProviderApp(cfg, bc, wcc, pollerFactory, eotsCli, fpdb, fpMetrics, logger) require.NoError(t, err) err = fpApp.Start() require.NoError(t, err)