From e9922316a01888f6f503d63fdb428394f998eb28 Mon Sep 17 00:00:00 2001 From: Lazar <12626340+Lazar955@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:22:33 +0200 Subject: [PATCH] chore(btcclient): use notifier interface (#20) - Removes usage of btc client with subscription to zmq - Removes proprietary client implementation of zmq subscription as we now rely on lnd's References [issue](https://github.com/babylonchain/vigilante/issues/199) --- btcclient/client.go | 22 +-- btcclient/client_block_subscriber.go | 139 ------------------ btcclient/client_wallet.go | 12 +- btcclient/interface.go | 3 - btcclient/notifier.go | 16 +++ btcclient/testutils.go | 25 ---- cmd/vigilante/cmd/btcstaking_tracker.go | 18 +-- cmd/vigilante/cmd/monitor.go | 26 ++-- cmd/vigilante/cmd/reporter.go | 10 +- e2etest/reporter_e2e_test.go | 25 ++++ e2etest/test_manager.go | 2 +- go.mod | 9 +- go.sum | 8 +- monitor/btcscanner/block_handler.go | 112 +++++++-------- monitor/btcscanner/btc_scanner.go | 31 ++-- monitor/btcscanner/btc_scanner_test.go | 9 +- monitor/monitor.go | 3 + reporter/block_handler.go | 160 ++++++--------------- reporter/bootstrapping.go | 19 +-- reporter/reorg_list.go | 76 ---------- reporter/reporter.go | 21 ++- reporter/utils.go | 10 -- reporter/utils_test.go | 3 + testutil/mocks/btcclient.go | 41 ------ zmq/client.go | 129 ----------------- zmq/subscribe.go | 182 ------------------------ 26 files changed, 233 insertions(+), 878 deletions(-) delete mode 100644 btcclient/client_block_subscriber.go delete mode 100644 btcclient/testutils.go delete mode 100644 reporter/reorg_list.go delete mode 100644 zmq/client.go delete mode 100644 zmq/subscribe.go diff --git a/btcclient/client.go b/btcclient/client.go index 4cd3933f..8da20dcd 100644 --- a/btcclient/client.go +++ b/btcclient/client.go @@ -15,8 +15,6 @@ import ( "go.uber.org/zap" "github.com/babylonlabs-io/vigilante/config" - "github.com/babylonlabs-io/vigilante/types" - "github.com/babylonlabs-io/vigilante/zmq" ) var _ BTCClient = &Client{} @@ -25,18 +23,14 @@ var _ BTCClient = &Client{} // for information regarding the current best block chain. type Client struct { *rpcclient.Client - zmqClient *zmq.Client - Params *chaincfg.Params - Cfg *config.BTCConfig + params *chaincfg.Params + cfg *config.BTCConfig logger *zap.SugaredLogger // retry attributes retrySleepTime time.Duration maxRetrySleepTime time.Duration - - // channel for notifying new BTC blocks to reporter - blockEventChan chan *types.BlockEvent } func (c *Client) GetTipBlockVerbose() (*btcjson.GetBlockVerboseResult, error) { @@ -54,16 +48,4 @@ func (c *Client) GetTipBlockVerbose() (*btcjson.GetBlockVerboseResult, error) { func (c *Client) Stop() { c.Shutdown() - // NewWallet will create a client with nil blockEventChan, - // while NewWithBlockSubscriber will have a non-nil one, so - // we need to check here - if c.blockEventChan != nil { - close(c.blockEventChan) - } - - if c.zmqClient != nil { - if err := c.zmqClient.Close(); err != nil { - c.logger.Debug(err) - } - } } diff --git a/btcclient/client_block_subscriber.go b/btcclient/client_block_subscriber.go deleted file mode 100644 index c3245158..00000000 --- a/btcclient/client_block_subscriber.go +++ /dev/null @@ -1,139 +0,0 @@ -package btcclient - -import ( - "fmt" - "time" - - "github.com/babylonlabs-io/babylon/types/retry" - "github.com/btcsuite/btcd/btcutil" - "go.uber.org/zap" - - "github.com/babylonlabs-io/vigilante/config" - "github.com/babylonlabs-io/vigilante/netparams" - "github.com/babylonlabs-io/vigilante/types" - "github.com/babylonlabs-io/vigilante/zmq" - - "github.com/btcsuite/btcd/rpcclient" - "github.com/btcsuite/btcd/wire" -) - -// NewWithBlockSubscriber creates a new BTC client that subscribes to newly connected/disconnected blocks -// used by vigilant reporter -func NewWithBlockSubscriber(cfg *config.BTCConfig, retrySleepTime, maxRetrySleepTime time.Duration, parentLogger *zap.Logger) (*Client, error) { - client := &Client{} - params, err := netparams.GetBTCParams(cfg.NetParams) - if err != nil { - return nil, err - } - client.blockEventChan = make(chan *types.BlockEvent, 10000) // TODO: parameterise buffer size - client.Cfg = cfg - client.Params = params - logger := parentLogger.With(zap.String("module", "btcclient")) - client.logger = logger.Sugar() - - client.retrySleepTime = retrySleepTime - client.maxRetrySleepTime = maxRetrySleepTime - - switch cfg.BtcBackend { - case types.Bitcoind: - // TODO Currently we are not using Params field of rpcclient.ConnConfig due to bug in btcd - // when handling signet. - connCfg := &rpcclient.ConnConfig{ - Host: cfg.Endpoint, - HTTPPostMode: true, - User: cfg.Username, - Pass: cfg.Password, - DisableTLS: cfg.DisableClientTLS, - } - - rpcClient, err := rpcclient.New(connCfg, nil) - if err != nil { - return nil, err - } - - zmqClient, err := zmq.New(logger, cfg.ZmqSeqEndpoint, client.blockEventChan, rpcClient) - if err != nil { - return nil, err - } - - client.zmqClient = zmqClient - client.Client = rpcClient - case types.Btcd: - notificationHandlers := rpcclient.NotificationHandlers{ - OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) { - client.logger.Debugf("Block %v at height %d has been connected at time %v", header.BlockHash(), height, header.Timestamp) - client.blockEventChan <- types.NewBlockEvent(types.BlockConnected, height, header) - }, - OnFilteredBlockDisconnected: func(height int32, header *wire.BlockHeader) { - client.logger.Debugf("Block %v at height %d has been disconnected at time %v", header.BlockHash(), height, header.Timestamp) - client.blockEventChan <- types.NewBlockEvent(types.BlockDisconnected, height, header) - }, - } - - // TODO Currently we are not using Params field of rpcclient.ConnConfig due to bug in btcd - // when handling signet. - connCfg := &rpcclient.ConnConfig{ - Host: cfg.Endpoint, - Endpoint: "ws", // websocket - User: cfg.Username, - Pass: cfg.Password, - DisableTLS: cfg.DisableClientTLS, - Certificates: cfg.ReadCAFile(), - } - - rpcClient, err := rpcclient.New(connCfg, ¬ificationHandlers) - if err != nil { - return nil, err - } - - // ensure we are using btcd as Bitcoin node, since Websocket-based subscriber is only available in btcd - backend, err := rpcClient.BackendVersion() - if err != nil { - return nil, fmt.Errorf("failed to get BTC backend: %v", err) - } - if backend != rpcclient.BtcdPost2401 { - return nil, fmt.Errorf("websocket is only supported by btcd, but got %v", backend) - } - - client.Client = rpcClient - } - - client.logger.Info("Successfully created the BTC client and connected to the BTC server") - - return client, nil -} - -func (c *Client) subscribeBlocksByWebSocket() error { - if err := c.NotifyBlocks(); err != nil { - return err - } - c.logger.Info("Successfully subscribed to newly connected/disconnected blocks via WebSocket") - return nil -} - -func (c *Client) mustSubscribeBlocksByWebSocket() { - if err := retry.Do(c.retrySleepTime, c.maxRetrySleepTime, func() error { - return c.subscribeBlocksByWebSocket() - }); err != nil { - panic(err) - } -} - -func (c *Client) mustSubscribeBlocksByZmq() { - if err := c.zmqClient.SubscribeSequence(); err != nil { - panic(err) - } -} - -func (c *Client) MustSubscribeBlocks() { - switch c.Cfg.BtcBackend { - case types.Btcd: - c.mustSubscribeBlocksByWebSocket() - case types.Bitcoind: - c.mustSubscribeBlocksByZmq() - } -} - -func (c *Client) BlockEventChan() <-chan *types.BlockEvent { - return c.blockEventChan -} diff --git a/btcclient/client_wallet.go b/btcclient/client_wallet.go index fe52e740..16c964bd 100644 --- a/btcclient/client_wallet.go +++ b/btcclient/client_wallet.go @@ -26,8 +26,8 @@ func NewWallet(cfg *config.BTCConfig, parentLogger *zap.Logger) (*Client, error) return nil, err } wallet := &Client{} - wallet.Cfg = cfg - wallet.Params = params + wallet.cfg = cfg + wallet.params = params wallet.logger = parentLogger.With(zap.String("module", "btcclient_wallet")).Sugar() connCfg := &rpcclient.ConnConfig{} @@ -69,15 +69,15 @@ func NewWallet(cfg *config.BTCConfig, parentLogger *zap.Logger) (*Client, error) } func (c *Client) GetWalletPass() string { - return c.Cfg.WalletPassword + return c.cfg.WalletPassword } func (c *Client) GetWalletLockTime() int64 { - return c.Cfg.WalletLockTime + return c.cfg.WalletLockTime } func (c *Client) GetNetParams() *chaincfg.Params { - net, err := netparams.GetBTCParams(c.Cfg.NetParams) + net, err := netparams.GetBTCParams(c.cfg.NetParams) if err != nil { panic(fmt.Errorf("failed to get BTC network params: %w", err)) } @@ -85,7 +85,7 @@ func (c *Client) GetNetParams() *chaincfg.Params { } func (c *Client) GetBTCConfig() *config.BTCConfig { - return c.Cfg + return c.cfg } func (c *Client) ListUnspent() ([]btcjson.ListUnspentResult, error) { diff --git a/btcclient/interface.go b/btcclient/interface.go index d3021412..71b9bca5 100644 --- a/btcclient/interface.go +++ b/btcclient/interface.go @@ -14,8 +14,6 @@ import ( type BTCClient interface { Stop() WaitForShutdown() - MustSubscribeBlocks() - BlockEventChan() <-chan *types.BlockEvent GetBestBlock() (*chainhash.Hash, uint64, error) GetBlockByHash(blockHash *chainhash.Hash) (*types.IndexedBlock, *wire.MsgBlock, error) FindTailBlocksByHeight(height uint64) ([]*types.IndexedBlock, error) @@ -37,7 +35,6 @@ type BTCWallet interface { SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (*chainhash.Hash, error) GetRawChangeAddress(account string) (btcutil.Address, error) WalletPassphrase(passphrase string, timeoutSecs int64) error - DumpPrivKey(address btcutil.Address) (*btcutil.WIF, error) GetHighUTXOAndSum() (*btcjson.ListUnspentResult, float64, error) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransactionOpts, isWitness *bool) (*btcjson.FundRawTransactionResult, error) SignRawTransactionWithWallet(tx *wire.MsgTx) (*wire.MsgTx, bool, error) diff --git a/btcclient/notifier.go b/btcclient/notifier.go index 4bc73c71..2ecc10ca 100644 --- a/btcclient/notifier.go +++ b/btcclient/notifier.go @@ -3,6 +3,7 @@ package btcclient import ( "encoding/hex" "fmt" + "github.com/babylonlabs-io/vigilante/netparams" "net" "os" "time" @@ -248,3 +249,18 @@ func NewNodeBackend( return nil, fmt.Errorf("unknown node backend: %v", cfg.ActiveNodeBackend) } } + +// NewNodeBackendWithParams creates a new NodeBackend by incorporating parameter retrieval and config conversion. +func NewNodeBackendWithParams(cfg config.BTCConfig, rawCert string) (*NodeBackend, error) { + btcParams, err := netparams.GetBTCParams(cfg.NetParams) + if err != nil { + return nil, fmt.Errorf("failed to get BTC net params: %w", err) + } + + btcNotifier, err := NewNodeBackend(CfgToBtcNodeBackendConfig(cfg, rawCert), btcParams, &EmptyHintCache{}) + if err != nil { + return nil, fmt.Errorf("failed to initialize notifier: %w", err) + } + + return btcNotifier, nil +} diff --git a/btcclient/testutils.go b/btcclient/testutils.go deleted file mode 100644 index c0941e58..00000000 --- a/btcclient/testutils.go +++ /dev/null @@ -1,25 +0,0 @@ -package btcclient - -import ( - "time" - - "github.com/babylonlabs-io/vigilante/config" - "github.com/babylonlabs-io/vigilante/netparams" - "github.com/babylonlabs-io/vigilante/types" - "github.com/btcsuite/btcd/rpcclient" -) - -func NewTestClientWithWsSubscriber(rpcClient *rpcclient.Client, cfg *config.BTCConfig, retrySleepTime time.Duration, maxRetrySleepTime time.Duration, blockEventChan chan *types.BlockEvent) (*Client, error) { - net, err := netparams.GetBTCParams(cfg.NetParams) - if err != nil { - return nil, err - } - return &Client{ - Client: rpcClient, - Params: net, - Cfg: cfg, - retrySleepTime: retrySleepTime, - maxRetrySleepTime: maxRetrySleepTime, - blockEventChan: blockEventChan, - }, nil -} diff --git a/cmd/vigilante/cmd/btcstaking_tracker.go b/cmd/vigilante/cmd/btcstaking_tracker.go index f11bf804..d4794170 100644 --- a/cmd/vigilante/cmd/btcstaking_tracker.go +++ b/cmd/vigilante/cmd/btcstaking_tracker.go @@ -8,7 +8,6 @@ import ( bst "github.com/babylonlabs-io/vigilante/btcstaking-tracker" "github.com/babylonlabs-io/vigilante/config" "github.com/babylonlabs-io/vigilante/metrics" - "github.com/babylonlabs-io/vigilante/netparams" "github.com/babylonlabs-io/vigilante/rpcserver" "github.com/spf13/cobra" ) @@ -61,26 +60,17 @@ func GetBTCStakingTracker() *cobra.Command { // create BTC client and connect to BTC server // Note that monitor needs to subscribe to new BTC blocks - btcClient, err := btcclient.NewWithBlockSubscriber( - &cfg.BTC, - cfg.Common.RetrySleepTime, - cfg.Common.MaxRetrySleepTime, - rootLogger, - ) + btcClient, err := btcclient.NewWallet(&cfg.BTC, rootLogger) + if err != nil { panic(fmt.Errorf("failed to open BTC client: %w", err)) } // create BTC notifier // TODO: is it possible to merge BTC client and BTC notifier? - btcParams, err := netparams.GetBTCParams(cfg.BTC.NetParams) + btcNotifier, err := btcclient.NewNodeBackendWithParams(cfg.BTC, "") if err != nil { - panic(fmt.Errorf("failed to get BTC parameter: %w", err)) - } - btcCfg := btcclient.CfgToBtcNodeBackendConfig(cfg.BTC, "") // we will read certifcates from file - btcNotifier, err := btcclient.NewNodeBackend(btcCfg, btcParams, &btcclient.EmptyHintCache{}) - if err != nil { - panic(fmt.Errorf("failed to create btc chain notifier: %w", err)) + panic(err) } bsMetrics := metrics.NewBTCStakingTrackerMetrics() diff --git a/cmd/vigilante/cmd/monitor.go b/cmd/vigilante/cmd/monitor.go index 2b66c15b..49259c50 100644 --- a/cmd/vigilante/cmd/monitor.go +++ b/cmd/vigilante/cmd/monitor.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - bbnqccfg "github.com/babylonlabs-io/babylon/client/config" bbnqc "github.com/babylonlabs-io/babylon/client/query" "github.com/spf13/cobra" @@ -62,13 +61,7 @@ func GetMonitorCmd() *cobra.Command { } // create BTC client and connect to BTC server - // Note that monitor needs to subscribe to new BTC blocks - btcClient, err = btcclient.NewWithBlockSubscriber( - &cfg.BTC, - cfg.Common.RetrySleepTime, - cfg.Common.MaxRetrySleepTime, - rootLogger, - ) + btcClient, err = btcclient.NewWallet(&cfg.BTC, rootLogger) if err != nil { panic(fmt.Errorf("failed to open BTC client: %w", err)) } @@ -80,8 +73,23 @@ func GetMonitorCmd() *cobra.Command { // register monitor metrics monitorMetrics := metrics.NewMonitorMetrics() + // create the chain notifier + btcNotifier, err := btcclient.NewNodeBackendWithParams(cfg.BTC, "") + if err != nil { + panic(err) + } + // create monitor - vigilanteMonitor, err = monitor.New(&cfg.Monitor, &cfg.Common, rootLogger, genesisInfo, bbnQueryClient, btcClient, monitorMetrics) + vigilanteMonitor, err = monitor.New( + &cfg.Monitor, + &cfg.Common, + rootLogger, + genesisInfo, + bbnQueryClient, + btcClient, + btcNotifier, + monitorMetrics, + ) if err != nil { panic(fmt.Errorf("failed to create vigilante monitor: %w", err)) } diff --git a/cmd/vigilante/cmd/reporter.go b/cmd/vigilante/cmd/reporter.go index bbaa9317..5c9a0b60 100644 --- a/cmd/vigilante/cmd/reporter.go +++ b/cmd/vigilante/cmd/reporter.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - bbnclient "github.com/babylonlabs-io/babylon/client/client" "github.com/spf13/cobra" @@ -48,7 +47,7 @@ func GetReporterCmd() *cobra.Command { // create BTC client and connect to BTC server // Note that vigilant reporter needs to subscribe to new BTC blocks - btcClient, err = btcclient.NewWithBlockSubscriber(&cfg.BTC, cfg.Common.RetrySleepTime, cfg.Common.MaxRetrySleepTime, rootLogger) + btcClient, err = btcclient.NewWallet(&cfg.BTC, rootLogger) if err != nil { panic(fmt.Errorf("failed to open BTC client: %w", err)) } @@ -62,12 +61,19 @@ func GetReporterCmd() *cobra.Command { // register reporter metrics reporterMetrics := metrics.NewReporterMetrics() + // create the chain notifier + btcNotifier, err := btcclient.NewNodeBackendWithParams(cfg.BTC, "") + if err != nil { + panic(err) + } + // create reporter vigilantReporter, err = reporter.New( &cfg.Reporter, rootLogger, btcClient, babylonClient, + btcNotifier, cfg.Common.RetrySleepTime, cfg.Common.MaxRetrySleepTime, reporterMetrics, diff --git a/e2etest/reporter_e2e_test.go b/e2etest/reporter_e2e_test.go index b269832c..9fec7d97 100644 --- a/e2etest/reporter_e2e_test.go +++ b/e2etest/reporter_e2e_test.go @@ -4,6 +4,8 @@ package e2etest import ( + "github.com/babylonlabs-io/vigilante/btcclient" + "github.com/babylonlabs-io/vigilante/netparams" "sync" "testing" "time" @@ -60,11 +62,19 @@ func TestReporter_BoostrapUnderFrequentBTCHeaders(t *testing.T) { reporterMetrics := metrics.NewReporterMetrics() + // create the chain notifier + btcParams, err := netparams.GetBTCParams(tm.Config.BTC.NetParams) + require.NoError(t, err) + btcCfg := btcclient.CfgToBtcNodeBackendConfig(tm.Config.BTC, "") + btcNotifier, err := btcclient.NewNodeBackend(btcCfg, btcParams, &btcclient.EmptyHintCache{}) + require.NoError(t, err) + vigilantReporter, err := reporter.New( &tm.Config.Reporter, logger, tm.BTCClient, tm.BabylonClient, + btcNotifier, tm.Config.Common.RetrySleepTime, tm.Config.Common.MaxRetrySleepTime, reporterMetrics, @@ -116,11 +126,18 @@ func TestRelayHeadersAndHandleRollbacks(t *testing.T) { reporterMetrics := metrics.NewReporterMetrics() + btcParams, err := netparams.GetBTCParams(tm.Config.BTC.NetParams) + require.NoError(t, err) + btcCfg := btcclient.CfgToBtcNodeBackendConfig(tm.Config.BTC, "") + btcNotifier, err := btcclient.NewNodeBackend(btcCfg, btcParams, &btcclient.EmptyHintCache{}) + require.NoError(t, err) + vigilantReporter, err := reporter.New( &tm.Config.Reporter, logger, tm.BTCClient, tm.BabylonClient, + btcNotifier, tm.Config.Common.RetrySleepTime, tm.Config.Common.MaxRetrySleepTime, reporterMetrics, @@ -160,11 +177,18 @@ func TestHandleReorgAfterRestart(t *testing.T) { reporterMetrics := metrics.NewReporterMetrics() + btcParams, err := netparams.GetBTCParams(tm.Config.BTC.NetParams) + require.NoError(t, err) + btcCfg := btcclient.CfgToBtcNodeBackendConfig(tm.Config.BTC, "") + btcNotifier, err := btcclient.NewNodeBackend(btcCfg, btcParams, &btcclient.EmptyHintCache{}) + require.NoError(t, err) + vigilantReporter, err := reporter.New( &tm.Config.Reporter, logger, tm.BTCClient, tm.BabylonClient, + btcNotifier, tm.Config.Common.RetrySleepTime, tm.Config.Common.MaxRetrySleepTime, reporterMetrics, @@ -199,6 +223,7 @@ func TestHandleReorgAfterRestart(t *testing.T) { logger, btcClient, tm.BabylonClient, + btcNotifier, tm.Config.Common.RetrySleepTime, tm.Config.Common.MaxRetrySleepTime, reporterMetrics, diff --git a/e2etest/test_manager.go b/e2etest/test_manager.go index 7a82624e..e7e2b003 100644 --- a/e2etest/test_manager.go +++ b/e2etest/test_manager.go @@ -65,7 +65,7 @@ type TestManager struct { } func initBTCClientWithSubscriber(t *testing.T, cfg *config.Config) *btcclient.Client { - client, err := btcclient.NewWithBlockSubscriber(&cfg.BTC, cfg.Common.RetrySleepTime, cfg.Common.MaxRetrySleepTime, zap.NewNop()) + client, err := btcclient.NewWallet(&cfg.BTC, zap.NewNop()) require.NoError(t, err) // let's wait until chain rpc becomes available diff --git a/go.mod b/go.mod index 7456b18f..fb2d502d 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,6 @@ require ( github.com/jsternberg/zap-logfmt v1.3.0 github.com/lightningnetwork/lnd v0.16.4-beta.rc1 github.com/ory/dockertest/v3 v3.9.1 - github.com/pebbe/zmq4 v1.2.9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/spf13/cobra v1.8.0 @@ -68,6 +67,7 @@ require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect github.com/aead/siphash v1.0.1 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/aws/aws-sdk-go v1.44.312 // indirect @@ -75,7 +75,11 @@ require ( github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect + github.com/btcsuite/btcd/btcutil/psbt v1.1.8 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect + github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2 // indirect + github.com/btcsuite/btcwallet/wallet/txrules v1.2.0 // indirect + github.com/btcsuite/btcwallet/wallet/txsizes v1.2.3 // indirect github.com/btcsuite/btcwallet/walletdb v1.4.0 // indirect github.com/btcsuite/btcwallet/wtxmgr v1.5.0 // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect @@ -137,6 +141,7 @@ require ( github.com/fergusstrange/embedded-postgres v1.10.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect + github.com/go-errors/errors v1.4.2 // indirect github.com/go-kit/kit v0.12.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect @@ -209,6 +214,7 @@ require ( github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf // indirect github.com/lightninglabs/neutrino v0.15.0 // indirect github.com/lightninglabs/neutrino/cache v1.1.1 // indirect + github.com/lightningnetwork/lightning-onion v1.2.1-0.20221202012345-ca23184850a1 // indirect github.com/lightningnetwork/lnd/clock v1.1.0 // indirect github.com/lightningnetwork/lnd/healthcheck v1.2.2 // indirect github.com/lightningnetwork/lnd/kvdb v1.4.1 // indirect @@ -217,6 +223,7 @@ require ( github.com/lightningnetwork/lnd/tlv v1.1.0 // indirect github.com/lightningnetwork/lnd/tor v1.1.0 // indirect github.com/linxGnu/grocksdb v1.8.14 // indirect + github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/manifoldco/promptui v0.9.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index ffc3ea90..8ff2ee2b 100644 --- a/go.sum +++ b/go.sum @@ -303,16 +303,19 @@ github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13P github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= github.com/btcsuite/btcd v0.22.0-beta.0.20220204213055-eaf0459ff879/go.mod h1:osu7EoKiL36UThEgzYPqdRaxeo0NU8VoXqgcnwpey0g= github.com/btcsuite/btcd v0.22.0-beta.0.20220207191057-4dc4ff7963b4/go.mod h1:7alexyj/lHlOtr2PJK7L/+HDJZpcGDn/pAU98r7DY08= +github.com/btcsuite/btcd v0.23.1/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= github.com/btcsuite/btcd v0.23.3/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd/go.mod h1:nm3Bko6zh6bWP60UxwoT5LzdGJsQJaPo6HjduXq9p6A= github.com/btcsuite/btcd v0.24.2 h1:aLmxPguqxza+4ag8R1I2nnJjSu2iFn/kqtHTIImswcY= github.com/btcsuite/btcd v0.24.2/go.mod h1:5C8ChTkl5ejr3WHj8tkQSCmydiMEPB0ZhQhehpq7Dgg= github.com/btcsuite/btcd/btcec/v2 v2.1.0/go.mod h1:2VzYrv4Gm4apmbVVsSq5bqf1Ec8v56E48Vt0Y/umPgA= +github.com/btcsuite/btcd/btcec/v2 v2.1.1/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/btcutil v1.0.0/go.mod h1:Uoxwv0pqYWhD//tfTiipkxNfdhG9UrLwaeswfjfdF0A= github.com/btcsuite/btcd/btcutil v1.1.0/go.mod h1:5OapHB7A2hBBWLm48mmw4MOHNJCcUBTwmWH/0Jn8VHE= +github.com/btcsuite/btcd/btcutil v1.1.1/go.mod h1:nbKlBMNm9FGsdvKvu0essceubPiAcI57pYBNnsLAa34= github.com/btcsuite/btcd/btcutil v1.1.5 h1:+wER79R5670vs/ZusMTF1yTcRYE5GUsFbdjdisflzM8= github.com/btcsuite/btcd/btcutil v1.1.5/go.mod h1:PSZZ4UitpLBWzxGd5VGOrLnmOjtPP/a6HaFo12zMs00= github.com/btcsuite/btcd/btcutil/psbt v1.1.8 h1:4voqtT8UppT7nmKQkXV+T9K8UyQjKOn2z/ycpmJK8wg= @@ -330,6 +333,7 @@ github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2 h1:etuLgGEojecsDOYTII8rYiGH github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2/go.mod h1:Zpk/LOb2sKqwP2lmHjaZT9AdaKsHPSbNLm2Uql5IQ/0= github.com/btcsuite/btcwallet/wallet/txrules v1.2.0 h1:BtEN5Empw62/RVnZ0VcJaVtVlBijnLlJY+dwjAye2Bg= github.com/btcsuite/btcwallet/wallet/txrules v1.2.0/go.mod h1:AtkqiL7ccKWxuLYtZm8Bu8G6q82w4yIZdgq6riy60z0= +github.com/btcsuite/btcwallet/wallet/txsizes v1.2.2/go.mod h1:q08Rms52VyWyXcp5zDc4tdFRKkFgNsMQrv3/LvE1448= github.com/btcsuite/btcwallet/wallet/txsizes v1.2.3 h1:PszOub7iXVYbtGybym5TGCp9Dv1h1iX4rIC3HICZGLg= github.com/btcsuite/btcwallet/wallet/txsizes v1.2.3/go.mod h1:q08Rms52VyWyXcp5zDc4tdFRKkFgNsMQrv3/LvE1448= github.com/btcsuite/btcwallet/walletdb v1.3.5/go.mod h1:oJDxAEUHVtnmIIBaa22wSBPTVcs6hUp5NKWmI8xDwwU= @@ -339,6 +343,7 @@ github.com/btcsuite/btcwallet/wtxmgr v1.5.0 h1:WO0KyN4l6H3JWnlFxfGR7r3gDnlGT7W2c github.com/btcsuite/btcwallet/wtxmgr v1.5.0/go.mod h1:TQVDhFxseiGtZwEPvLgtfyxuNUDsIdaJdshvWzR0HJ4= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= +github.com/btcsuite/golangcrypto v0.0.0-20150304025918-53f62d9b43e8/go.mod h1:tYvUd8KLhm/oXvUeSEs2VlLghFjQt9+ZaF9ghH0JNjc= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= @@ -1025,6 +1030,7 @@ github.com/linxGnu/grocksdb v1.8.14 h1:HTgyYalNwBSG/1qCQUIott44wU5b2Y9Kr3z7SK5Of github.com/linxGnu/grocksdb v1.8.14/go.mod h1:QYiYypR2d4v63Wj1adOOfzglnoII0gLj3PNh4fZkcFA= github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 h1:sjOGyegMIhvgfq5oaue6Td+hxZuf3tDC8lAPrFldqFw= github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796/go.mod h1:3p7ZTf9V1sNPI5H8P3NkTFF4LuwMdPl2DodF60qAKqY= +github.com/ltcsuite/ltcutil v0.0.0-20181217130922-17f3b04680b6/go.mod h1:8Vg/LTOO0KYa/vlHWJ6XZAevPQThGH5sufO0Hrou/lA= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -1160,8 +1166,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= -github.com/pebbe/zmq4 v1.2.9 h1:JlHcdgq6zpppNR1tH0wXJq0XK03pRUc4lBlHTD7aj/4= -github.com/pebbe/zmq4 v1.2.9/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= diff --git a/monitor/btcscanner/block_handler.go b/monitor/btcscanner/block_handler.go index 7054f9a4..704d1b9b 100644 --- a/monitor/btcscanner/block_handler.go +++ b/monitor/btcscanner/block_handler.go @@ -3,75 +3,90 @@ package btcscanner import ( "errors" "fmt" - - "github.com/babylonlabs-io/vigilante/types" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainntnfs" ) -// blockEventHandler handles connected and disconnected blocks from the BTC client. -func (bs *BtcScanner) blockEventHandler() { +// bootstrapAndBlockEventHandler handles connected and disconnected blocks from the BTC client. +func (bs *BtcScanner) bootstrapAndBlockEventHandler() { defer bs.wg.Done() + bs.Bootstrap() + + var blockEpoch *chainntnfs.BlockEpoch + bestKnownBlock := bs.UnconfirmedBlockCache.Tip() + if bestKnownBlock != nil { + hash := bestKnownBlock.BlockHash() + blockEpoch = &chainntnfs.BlockEpoch{ + Hash: &hash, + Height: bestKnownBlock.Height, + BlockHeader: bestKnownBlock.Header, + } + } + // register the notifier with the best known tip + blockNotifier, err := bs.btcNotifier.RegisterBlockEpochNtfn(blockEpoch) + if err != nil { + bs.logger.Errorf("Failed registering block epoch notifier") + return + } + defer blockNotifier.Cancel() + for { select { case <-bs.quit: bs.BtcClient.Stop() return - case event, open := <-bs.BtcClient.BlockEventChan(): + case epoch, open := <-blockNotifier.Epochs: if !open { bs.logger.Errorf("Block event channel is closed") return // channel closed } - if event.EventType == types.BlockConnected { - err := bs.handleConnectedBlocks(event) - if err != nil { - bs.logger.Warnf("failed to handle a connected block at height %d: %s, "+ - "need to restart the bootstrapping process", event.Height, err.Error()) - if bs.Synced.Swap(false) { - bs.Bootstrap() - } - } - } else if event.EventType == types.BlockDisconnected { - err := bs.handleDisconnectedBlocks(event) - if err != nil { - bs.logger.Warnf("failed to handle a disconnected block at height %d: %s,"+ - "need to restart the bootstrapping process", event.Height, err.Error()) - if bs.Synced.Swap(false) { - bs.Bootstrap() - } - } + + if err := bs.handleNewBlock(epoch.Height, epoch.BlockHeader); err != nil { + bs.logger.Warnf("failed to handle block at height %d: %s, "+ + "need to restart the bootstrapping process", epoch.Height, err.Error()) + bs.Bootstrap() } } } } -// handleConnectedBlocks handles connected blocks from the BTC client +// handleNewBlock handles blocks from the BTC client // if new confirmed blocks are found, send them through the channel -func (bs *BtcScanner) handleConnectedBlocks(event *types.BlockEvent) error { - if !bs.Synced.Load() { - return errors.New("the btc scanner is not synced") - } - - // get the block from hash - blockHash := event.Header.BlockHash() - ib, _, err := bs.BtcClient.GetBlockByHash(&blockHash) - if err != nil { - // failing to request the block, which means a bug - panic(err) - } - +func (bs *BtcScanner) handleNewBlock(height int32, header *wire.BlockHeader) error { // get cache tip cacheTip := bs.UnconfirmedBlockCache.Tip() if cacheTip == nil { return errors.New("no unconfirmed blocks found") } - parentHash := ib.Header.PrevBlock + if cacheTip.Height >= height { + bs.logger.Debugf( + "the connecting block (height: %d, hash: %s) is too early, skipping the block", + height, + header.BlockHash().String(), + ) + return nil + } + + if cacheTip.Height+1 < height { + return fmt.Errorf("missing blocks, expected block height: %d, got: %d", cacheTip.Height+1, height) + } + parentHash := header.PrevBlock // if the parent of the block is not the tip of the cache, then the cache is not up-to-date if parentHash != cacheTip.BlockHash() { return errors.New("cache is not up-to-date") } + // get the block from hash + blockHash := header.BlockHash() + ib, _, err := bs.BtcClient.GetBlockByHash(&blockHash) + if err != nil { + // failing to request the block, which means a bug + panic(fmt.Errorf("failed to request block by hash: %s", blockHash.String())) + } + // otherwise, add the block to the cache bs.UnconfirmedBlockCache.Add(ib) @@ -94,24 +109,3 @@ func (bs *BtcScanner) handleConnectedBlocks(event *types.BlockEvent) error { return nil } - -// handleDisconnectedBlocks handles disconnected blocks from the BTC client. -func (bs *BtcScanner) handleDisconnectedBlocks(event *types.BlockEvent) error { - // get cache tip - cacheTip := bs.UnconfirmedBlockCache.Tip() - if cacheTip == nil { - return errors.New("cache is empty") - } - - // if the block to be disconnected is not the tip of the cache, then the cache is not up-to-date, - if event.Header.BlockHash() != cacheTip.BlockHash() { - return errors.New("cache is out-of-sync") - } - - // otherwise, remove the block from the cache - if err := bs.UnconfirmedBlockCache.RemoveLast(); err != nil { - return fmt.Errorf("failed to remove last block from cache: %v", err) - } - - return nil -} diff --git a/monitor/btcscanner/btc_scanner.go b/monitor/btcscanner/btc_scanner.go index 5ed6ebe4..16da44ab 100644 --- a/monitor/btcscanner/btc_scanner.go +++ b/monitor/btcscanner/btc_scanner.go @@ -2,6 +2,7 @@ package btcscanner import ( "fmt" + notifier "github.com/lightningnetwork/lnd/chainntnfs" "sync" "github.com/babylonlabs-io/babylon/btctxformatter" @@ -19,7 +20,8 @@ type BtcScanner struct { logger *zap.SugaredLogger // connect to BTC node - BtcClient btcclient.BTCClient + BtcClient btcclient.BTCClient + btcNotifier notifier.ChainNotifier // the BTC height the scanner starts BaseHeight uint64 @@ -38,8 +40,6 @@ type BtcScanner struct { blockHeaderChan chan *wire.BlockHeader checkpointsChan chan *types.CheckpointRecord - Synced *atomic.Bool - wg sync.WaitGroup Started *atomic.Bool quit chan struct{} @@ -49,7 +49,8 @@ func New( monitorCfg *config.MonitorConfig, parentLogger *zap.Logger, btcClient btcclient.BTCClient, - btclightclientBaseHeight uint64, + btcNotifier notifier.ChainNotifier, + btcLightClientBaseHeight uint64, checkpointTag []byte, ) (*BtcScanner, error) { headersChan := make(chan *wire.BlockHeader, monitorCfg.BtcBlockBufferSize) @@ -64,14 +65,14 @@ func New( return &BtcScanner{ logger: parentLogger.With(zap.String("module", "btcscanner")).Sugar(), BtcClient: btcClient, - BaseHeight: btclightclientBaseHeight, + btcNotifier: btcNotifier, + BaseHeight: btcLightClientBaseHeight, K: monitorCfg.BtcConfirmationDepth, ckptCache: ckptCache, UnconfirmedBlockCache: unconfirmedBlockCache, ConfirmedBlocksChan: confirmedBlocksChan, blockHeaderChan: headersChan, checkpointsChan: ckptsChan, - Synced: atomic.NewBool(false), Started: atomic.NewBool(false), quit: make(chan struct{}), }, nil @@ -84,17 +85,17 @@ func (bs *BtcScanner) Start() { return } - // the bootstrapping should not block the main thread - go bs.Bootstrap() - - bs.BtcClient.MustSubscribeBlocks() - bs.Started.Store(true) bs.logger.Info("the BTC scanner is started") + if err := bs.btcNotifier.Start(); err != nil { + bs.logger.Errorf("Failed starting notifier") + return + } + // start handling new blocks bs.wg.Add(1) - go bs.blockEventHandler() + go bs.bootstrapAndBlockEventHandler() for bs.Started.Load() { select { @@ -128,12 +129,6 @@ func (bs *BtcScanner) Bootstrap() { err error ) - if bs.Synced.Load() { - // the scanner is already synced - return - } - defer bs.Synced.Store(true) - if bs.confirmedTipBlock != nil { firstUnconfirmedHeight = uint64(bs.confirmedTipBlock.Height + 1) } else { diff --git a/monitor/btcscanner/btc_scanner_test.go b/monitor/btcscanner/btc_scanner_test.go index 74b8f195..cb91d70d 100644 --- a/monitor/btcscanner/btc_scanner_test.go +++ b/monitor/btcscanner/btc_scanner_test.go @@ -5,15 +5,13 @@ import ( "testing" "github.com/babylonlabs-io/babylon/testutil/datagen" - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" - "github.com/babylonlabs-io/vigilante/config" "github.com/babylonlabs-io/vigilante/monitor/btcscanner" vdatagen "github.com/babylonlabs-io/vigilante/testutil/datagen" "github.com/babylonlabs-io/vigilante/testutil/mocks" "github.com/babylonlabs-io/vigilante/types" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" ) func FuzzBootStrap(f *testing.F) { @@ -30,7 +28,6 @@ func FuzzBootStrap(f *testing.F) { ctl := gomock.NewController(t) mockBtcClient := mocks.NewMockBTCClient(ctl) confirmedBlocks := chainIndexedBlocks[:numBlocks-k] - mockBtcClient.EXPECT().MustSubscribeBlocks().Return().AnyTimes() mockBtcClient.EXPECT().GetBestBlock().Return(nil, uint64(bestHeight), nil) for i := 0; i < int(numBlocks); i++ { mockBtcClient.EXPECT().GetBlockByHeight(gomock.Eq(uint64(chainIndexedBlocks[i].Height))). @@ -45,7 +42,6 @@ func FuzzBootStrap(f *testing.F) { K: k, ConfirmedBlocksChan: make(chan *types.IndexedBlock), UnconfirmedBlockCache: cache, - Synced: atomic.NewBool(false), } logger, err := config.NewRootLogger("auto", "debug") require.NoError(t, err) @@ -58,6 +54,5 @@ func FuzzBootStrap(f *testing.F) { } }() btcScanner.Bootstrap() - require.True(t, btcScanner.Synced.Load()) }) } diff --git a/monitor/monitor.go b/monitor/monitor.go index e3fd7fff..e9199256 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -3,6 +3,7 @@ package monitor import ( "encoding/hex" "fmt" + notifier "github.com/lightningnetwork/lnd/chainntnfs" "sort" "sync" @@ -52,6 +53,7 @@ func New( genesisInfo *types.GenesisInfo, bbnQueryClient BabylonQueryClient, btcClient btcclient.BTCClient, + btcNotifier notifier.ChainNotifier, monitorMetrics *metrics.MonitorMetrics, ) (*Monitor, error) { logger := parentLogger.With(zap.String("module", "monitor")) @@ -64,6 +66,7 @@ func New( cfg, logger, btcClient, + btcNotifier, genesisInfo.GetBaseBTCHeight(), checkpointTagBytes, ) diff --git a/reporter/block_handler.go b/reporter/block_handler.go index 1927d78a..f14dea32 100644 --- a/reporter/block_handler.go +++ b/reporter/block_handler.go @@ -2,35 +2,30 @@ package reporter import ( "fmt" - "github.com/babylonlabs-io/vigilante/types" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainntnfs" ) // blockEventHandler handles connected and disconnected blocks from the BTC client. -func (r *Reporter) blockEventHandler() { +func (r *Reporter) blockEventHandler(blockNotifier *chainntnfs.BlockEpochEvent) { defer r.wg.Done() quit := r.quitChan() + defer blockNotifier.Cancel() + for { select { - case event, open := <-r.btcClient.BlockEventChan(): + case epoch, open := <-blockNotifier.Epochs: if !open { r.logger.Errorf("Block event channel is closed") return // channel closed } - var errorRequiringBootstrap error - if event.EventType == types.BlockConnected { - errorRequiringBootstrap = r.handleConnectedBlocks(event) - } else if event.EventType == types.BlockDisconnected { - errorRequiringBootstrap = r.handleDisconnectedBlocks(event) + if err := r.handleNewBlock(epoch.Height, epoch.BlockHeader); err != nil { + r.logger.Warnf("Due to error in event processing: %v, bootstrap process need to be restarted", err) + r.bootstrapWithRetries() } - - if errorRequiringBootstrap != nil { - r.logger.Warnf("Due to error in event processing: %v, bootstrap process need to be restarted", errorRequiringBootstrap) - r.bootstrapWithRetries(true) - } - case <-quit: // We have been asked to stop return @@ -38,136 +33,67 @@ func (r *Reporter) blockEventHandler() { } } -// handleConnectedBlocks handles connected blocks from the BTC client. -func (r *Reporter) handleConnectedBlocks(event *types.BlockEvent) error { - // if the header is too early, ignore it - // NOTE: this might happen when bootstrapping is triggered after the reporter - // has subscribed to the BTC blocks - firstCacheBlock := r.btcCache.First() - if firstCacheBlock == nil { +// handleNewBlock processes a new block, checking if it connects to the cache or requires bootstrapping. +func (r *Reporter) handleNewBlock(height int32, header *wire.BlockHeader) error { + cacheTip := r.btcCache.Tip() + if cacheTip == nil { return fmt.Errorf("cache is empty, restart bootstrap process") } - if event.Height < firstCacheBlock.Height { + + if cacheTip.Height >= height { r.logger.Debugf( "the connecting block (height: %d, hash: %s) is too early, skipping the block", - event.Height, - event.Header.BlockHash().String(), + height, + header.BlockHash().String(), ) return nil } - // if the received header is within the cache's region, then this means the events have - // an overlap with the cache. Then, perform a consistency check. If the block is duplicated, - // then ignore the block, otherwise there is an inconsistency and redo bootstrap - // NOTE: this might happen when bootstrapping is triggered after the reporter - // has subscribed to the BTC blocks - if b := r.btcCache.FindBlock(uint64(event.Height)); b != nil { - if b.BlockHash() == event.Header.BlockHash() { - r.logger.Debugf( - "the connecting block (height: %d, hash: %s) is known to cache, skipping the block", - b.Height, - b.BlockHash().String(), - ) - return nil - } - return fmt.Errorf( - "the connecting block (height: %d, hash: %s) is different from the header (height: %d, hash: %s) at the same height in cache", - event.Height, - event.Header.BlockHash().String(), - b.Height, - b.BlockHash().String(), - ) + if cacheTip.Height+1 < height { + return fmt.Errorf("missing blocks, expected block height: %d, got: %d", cacheTip.Height+1, height) } - // get the block from hash - blockHash := event.Header.BlockHash() - ib, mBlock, err := r.btcClient.GetBlockByHash(&blockHash) - if err != nil { - return fmt.Errorf("failed to get block %v with number %d ,from BTC client: %w", blockHash, event.Height, err) + // Check if the new block connects to the cache cacheTip + parentHash := header.PrevBlock + if parentHash != cacheTip.BlockHash() { + // If the block doesn't connect, clear the cache and bootstrap + r.btcCache.RemoveAll() + return fmt.Errorf("block does not connect to the cache, diff hash, bootstrap required") } - // if the parent of the block is not the tip of the cache, then the cache is not up-to-date, - // and we might have missed some blocks. In this case, restart the bootstrap process. - parentHash := mBlock.Header.PrevBlock - cacheTip := r.btcCache.Tip() // NOTE: cache is guaranteed to be non-empty at this stage - if parentHash != cacheTip.BlockHash() { - return fmt.Errorf("cache (tip %d) is not up-to-date while connecting block %d, restart bootstrap process", cacheTip.Height, ib.Height) + // Block connects to the current chain, add it to the cache + blockHash := header.BlockHash() + ib, _, err := r.btcClient.GetBlockByHash(&blockHash) + if err != nil { + return fmt.Errorf("failed to get block %v with height %d: %w", blockHash, height, err) } - // otherwise, add the block to the cache r.btcCache.Add(ib) - var headersToProcess []*types.IndexedBlock - - if r.reorgList.size() > 0 { - // we are in the middle of reorg, we need to check whether we already have all blocks of better chain - // as reorgs in btc nodes happen only when better chain is available. - // 1. First we get oldest header from our reorg branch - // 2. Then we get all headers from our cache starting the height of the oldest header of new branch - // 3. then we calculate if work on new branch starting from the first reorged height is larger - // than removed branch work. - oldestBlockFromOldBranch := r.reorgList.getLastRemovedBlock() - currentBranch, err := r.btcCache.GetLastBlocks(oldestBlockFromOldBranch.height) - if err != nil { - panic(fmt.Errorf("failed to get block from cache after reorg: %w", err)) - } - - currentBranchWork := calculateBranchWork(currentBranch) + // Process the new block (submit headers, checkpoints, etc.) + return r.processNewBlock(ib) +} - // if current branch is better than reorg branch, we can submit headers and clear reorg list - if currentBranchWork.GT(r.reorgList.removedBranchWork()) { - r.logger.Debugf("Current branch is better than reorg branch. Length of current branch: %d, work of branch: %s", len(currentBranch), currentBranchWork) - headersToProcess = append(headersToProcess, currentBranch...) - r.reorgList.clear() - } - } else { - headersToProcess = append(headersToProcess, ib) - } +// processNewBlock handles further processing of a newly added block. +func (r *Reporter) processNewBlock(ib *types.IndexedBlock) error { + var headersToProcess []*types.IndexedBlock + headersToProcess = append(headersToProcess, ib) if len(headersToProcess) == 0 { r.logger.Debug("No new headers to submit to Babylon") return nil } - // extracts and submits headers for each blocks in ibs signer := r.babylonClient.MustGetAddr() - _, err = r.ProcessHeaders(signer, headersToProcess) - if err != nil { - r.logger.Warnf("Failed to submit header: %v", err) - } - - // extracts and submits checkpoints for each blocks in ibs - _, _, err = r.ProcessCheckpoints(signer, headersToProcess) - if err != nil { - r.logger.Warnf("Failed to submit checkpoint: %v", err) - } - return nil -} - -// handleDisconnectedBlocks handles disconnected blocks from the BTC client. -func (r *Reporter) handleDisconnectedBlocks(event *types.BlockEvent) error { - // get cache tip - cacheTip := r.btcCache.Tip() - if cacheTip == nil { - return fmt.Errorf("cache is empty, restart bootstrap process") - } - // if the block to be disconnected is not the tip of the cache, then the cache is not up-to-date, - if event.Header.BlockHash() != cacheTip.BlockHash() { - return fmt.Errorf("cache is not up-to-date while disconnecting block, restart bootstrap process") + // Process headers + if _, err := r.ProcessHeaders(signer, headersToProcess); err != nil { + r.logger.Warnf("Failed to submit headers: %v", err) } - // at this point, the block to be disconnected is the tip of the cache so we can - // add it to our reorg list - r.reorgList.addRemovedBlock( - uint64(cacheTip.Height), - cacheTip.Header, - ) - - // otherwise, remove the block from the cache - if err := r.btcCache.RemoveLast(); err != nil { - r.logger.Warnf("Failed to remove last block from cache: %v, restart bootstrap process", err) - panic(err) + // Process checkpoints + if _, _, err := r.ProcessCheckpoints(signer, headersToProcess); err != nil { + r.logger.Warnf("Failed to submit checkpoints: %v", err) } return nil diff --git a/reporter/bootstrapping.go b/reporter/bootstrapping.go index 5e45c678..729fb1eb 100644 --- a/reporter/bootstrapping.go +++ b/reporter/bootstrapping.go @@ -60,16 +60,13 @@ func (r *Reporter) checkConsistency() (*consistencyCheckInfo, error) { }, nil } -func (r *Reporter) bootstrap(skipBlockSubscription bool) error { +func (r *Reporter) bootstrap() error { var ( btcLatestBlockHeight uint64 ibs []*types.IndexedBlock err error ) - // if we are bootstraping, we will definitely not handle reorgs - r.reorgList.clear() - // ensure BTC has caught up with BBN header chain if err := r.waitUntilBTCSync(); err != nil { return err @@ -81,14 +78,7 @@ func (r *Reporter) bootstrap(skipBlockSubscription bool) error { } r.logger.Debugf("BTC cache size: %d", r.btcCache.Size()) - // Subscribe new blocks right after initialising BTC cache, in order to ensure subscribed blocks and cached blocks do not have overlap. - // Otherwise, if we subscribe too early, then they will have overlap, leading to duplicated header/ckpt submissions. - if !skipBlockSubscription { - r.btcClient.MustSubscribeBlocks() - } - consistencyInfo, err := r.checkConsistency() - if err != nil { return err } @@ -132,6 +122,7 @@ func (r *Reporter) bootstrap(skipBlockSubscription bool) error { } r.logger.Info("Successfully finished bootstrapping") + return nil } @@ -153,12 +144,12 @@ func (r *Reporter) reporterQuitCtx() (context.Context, func()) { return ctx, cancel } -func (r *Reporter) bootstrapWithRetries(skipBlockSubscription bool) { +func (r *Reporter) bootstrapWithRetries() { // if we are exiting, we need to cancel this process ctx, cancel := r.reporterQuitCtx() defer cancel() if err := retry.Do(func() error { - return r.bootstrap(skipBlockSubscription) + return r.bootstrap() }, retry.Context(ctx), bootstrapAttemptsAtt, @@ -226,6 +217,7 @@ func (r *Reporter) initBTCCache() error { if err = r.btcCache.Init(ibs); err != nil { panic(err) } + return nil } @@ -316,5 +308,6 @@ func (r *Reporter) checkHeaderConsistency(consistencyCheckHeight uint64) error { err = fmt.Errorf("BTC main chain is inconsistent with BBN header chain: k-deep block in BBN header chain: %v", consistencyCheckHash) panic(err) } + return nil } diff --git a/reporter/reorg_list.go b/reporter/reorg_list.go deleted file mode 100644 index 6e24c938..00000000 --- a/reporter/reorg_list.go +++ /dev/null @@ -1,76 +0,0 @@ -package reporter - -import ( - "sync" - - sdkmath "cosmossdk.io/math" - btclightclienttypes "github.com/babylonlabs-io/babylon/x/btclightclient/types" - "github.com/btcsuite/btcd/wire" -) - -type removedBlock struct { - height uint64 - header *wire.BlockHeader -} - -// Help data structure to keep track of removed blocks. -// NOTE: This is not generic data structure, and must be used with conjunction with -// reporter and btc cache -type reorgList struct { - sync.Mutex - workOfRemovedBlocks sdkmath.Uint - removedBlocks []*removedBlock -} - -func newReorgList() *reorgList { - return &reorgList{ - removedBlocks: []*removedBlock{}, - workOfRemovedBlocks: sdkmath.ZeroUint(), - } -} - -// addRemovedBlock add currently removed block to the end of the list. Re-orgs -// are started from the tip of the chain and go backwards, this means -// that oldest removed block is at the end of the list. -func (r *reorgList) addRemovedBlock( - height uint64, - header *wire.BlockHeader) { - headerWork := btclightclienttypes.CalcHeaderWork(header) - r.Lock() - defer r.Unlock() - - newWork := btclightclienttypes.CumulativeWork(headerWork, r.workOfRemovedBlocks) - r.removedBlocks = append(r.removedBlocks, &removedBlock{height, header}) - r.workOfRemovedBlocks = newWork -} - -func (r *reorgList) getLastRemovedBlock() *removedBlock { - r.Lock() - defer r.Unlock() - if len(r.removedBlocks) == 0 { - return nil - } - - return r.removedBlocks[len(r.removedBlocks)-1] -} - -func (r *reorgList) clear() { - r.Lock() - defer r.Unlock() - - r.removedBlocks = []*removedBlock{} - r.workOfRemovedBlocks = sdkmath.ZeroUint() -} - -func (r *reorgList) size() int { - r.Lock() - defer r.Unlock() - - return len(r.removedBlocks) -} - -func (r *reorgList) removedBranchWork() sdkmath.Uint { - r.Lock() - defer r.Unlock() - return r.workOfRemovedBlocks -} diff --git a/reporter/reporter.go b/reporter/reporter.go index 5a5bfb64..5c2f757c 100644 --- a/reporter/reporter.go +++ b/reporter/reporter.go @@ -3,6 +3,7 @@ package reporter import ( "encoding/hex" "fmt" + notifier "github.com/lightningnetwork/lnd/chainntnfs" "sync" "time" @@ -22,6 +23,7 @@ type Reporter struct { btcClient btcclient.BTCClient babylonClient BabylonClient + btcNotifier notifier.ChainNotifier // retry attributes retrySleepTime time.Duration @@ -30,7 +32,6 @@ type Reporter struct { // Internal states of the reporter CheckpointCache *types.CheckpointCache btcCache *types.BTCCache - reorgList *reorgList btcConfirmationDepth uint64 checkpointFinalizationTimeout uint64 metrics *metrics.ReporterMetrics @@ -45,6 +46,7 @@ func New( parentLogger *zap.Logger, btcClient btcclient.BTCClient, babylonClient BabylonClient, + btcNotifier notifier.ChainNotifier, retrySleepTime, maxRetrySleepTime time.Duration, metrics *metrics.ReporterMetrics, @@ -81,8 +83,8 @@ func New( maxRetrySleepTime: maxRetrySleepTime, btcClient: btcClient, babylonClient: babylonClient, + btcNotifier: btcNotifier, CheckpointCache: ckptCache, - reorgList: newReorgList(), btcConfirmationDepth: k, checkpointFinalizationTimeout: w, metrics: metrics, @@ -108,10 +110,21 @@ func (r *Reporter) Start() { } r.quitMu.Unlock() - r.bootstrapWithRetries(false) + r.bootstrapWithRetries() + + if err := r.btcNotifier.Start(); err != nil { + r.logger.Errorf("Failed starting notifier") + return + } + + blockNotifier, err := r.btcNotifier.RegisterBlockEpochNtfn(nil) + if err != nil { + r.logger.Errorf("Failed registering block epoch notifier") + return + } r.wg.Add(1) - go r.blockEventHandler() + go r.blockEventHandler(blockNotifier) // start record time-related metrics r.metrics.RecordMetrics() diff --git a/reporter/utils.go b/reporter/utils.go index 59dae461..ce1ef528 100644 --- a/reporter/utils.go +++ b/reporter/utils.go @@ -7,7 +7,6 @@ import ( pv "github.com/cosmos/relayer/v2/relayer/provider" - sdkmath "cosmossdk.io/math" "github.com/babylonlabs-io/babylon/types/retry" btcctypes "github.com/babylonlabs-io/babylon/x/btccheckpoint/types" btclctypes "github.com/babylonlabs-io/babylon/x/btclightclient/types" @@ -224,15 +223,6 @@ func (r *Reporter) ProcessCheckpoints(signer string, ibs []*types.IndexedBlock) return numCkptSegs, numMatchedCkpts, err } -func calculateBranchWork(branch []*types.IndexedBlock) sdkmath.Uint { - var currenWork = sdkmath.ZeroUint() - for _, h := range branch { - headerWork := btclctypes.CalcHeaderWork(h.Header) - currenWork = btclctypes.CumulativeWork(headerWork, currenWork) - } - return currenWork -} - // push msg to channel c, or quit if quit channel is closed func PushOrQuit[T any](c chan<- T, msg T, quit <-chan struct{}) { select { diff --git a/reporter/utils_test.go b/reporter/utils_test.go index d575c5ff..ab32234e 100644 --- a/reporter/utils_test.go +++ b/reporter/utils_test.go @@ -1,6 +1,7 @@ package reporter_test import ( + "github.com/lightningnetwork/lnd/lntest/mock" "math/rand" "testing" @@ -32,12 +33,14 @@ func newMockReporter(t *testing.T, ctrl *gomock.Controller) ( mockBabylonClient.EXPECT().GetConfig().Return(&cfg.Babylon).AnyTimes() mockBabylonClient.EXPECT().BTCCheckpointParams().Return( &btcctypes.QueryParamsResponse{Params: btccParams}, nil).AnyTimes() + mockNotifier := mock.ChainNotifier{} r, err := reporter.New( &cfg.Reporter, logger, mockBTCClient, mockBabylonClient, + &mockNotifier, cfg.Common.RetrySleepTime, cfg.Common.MaxRetrySleepTime, metrics.NewReporterMetrics(), diff --git a/testutil/mocks/btcclient.go b/testutil/mocks/btcclient.go index 1bbdb6b9..76581b6c 100644 --- a/testutil/mocks/btcclient.go +++ b/testutil/mocks/btcclient.go @@ -40,20 +40,6 @@ func (m *MockBTCClient) EXPECT() *MockBTCClientMockRecorder { return m.recorder } -// BlockEventChan mocks base method. -func (m *MockBTCClient) BlockEventChan() <-chan *types.BlockEvent { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BlockEventChan") - ret0, _ := ret[0].(<-chan *types.BlockEvent) - return ret0 -} - -// BlockEventChan indicates an expected call of BlockEventChan. -func (mr *MockBTCClientMockRecorder) BlockEventChan() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockEventChan", reflect.TypeOf((*MockBTCClient)(nil).BlockEventChan)) -} - // FindTailBlocksByHeight mocks base method. func (m *MockBTCClient) FindTailBlocksByHeight(height uint64) ([]*types.IndexedBlock, error) { m.ctrl.T.Helper() @@ -162,18 +148,6 @@ func (mr *MockBTCClientMockRecorder) GetTxOut(txHash, index, mempool interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTxOut", reflect.TypeOf((*MockBTCClient)(nil).GetTxOut), txHash, index, mempool) } -// MustSubscribeBlocks mocks base method. -func (m *MockBTCClient) MustSubscribeBlocks() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "MustSubscribeBlocks") -} - -// MustSubscribeBlocks indicates an expected call of MustSubscribeBlocks. -func (mr *MockBTCClientMockRecorder) MustSubscribeBlocks() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MustSubscribeBlocks", reflect.TypeOf((*MockBTCClient)(nil).MustSubscribeBlocks)) -} - // SendRawTransaction mocks base method. func (m *MockBTCClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (*chainhash.Hash, error) { m.ctrl.T.Helper() @@ -236,21 +210,6 @@ func (m *MockBTCWallet) EXPECT() *MockBTCWalletMockRecorder { return m.recorder } -// DumpPrivKey mocks base method. -func (m *MockBTCWallet) DumpPrivKey(address btcutil.Address) (*btcutil.WIF, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DumpPrivKey", address) - ret0, _ := ret[0].(*btcutil.WIF) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DumpPrivKey indicates an expected call of DumpPrivKey. -func (mr *MockBTCWalletMockRecorder) DumpPrivKey(address interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpPrivKey", reflect.TypeOf((*MockBTCWallet)(nil).DumpPrivKey), address) -} - // FundRawTransaction mocks base method. func (m *MockBTCWallet) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransactionOpts, isWitness *bool) (*btcjson.FundRawTransactionResult, error) { m.ctrl.T.Helper() diff --git a/zmq/client.go b/zmq/client.go deleted file mode 100644 index 51184f42..00000000 --- a/zmq/client.go +++ /dev/null @@ -1,129 +0,0 @@ -// Package zmq reference is taken from https://github.com/joakimofv/go-bitcoindclient which is a -// go wrapper around official zmq package https://github.com/pebbe/zmq4 -package zmq - -import ( - "errors" - "sync" - "sync/atomic" - - "github.com/babylonlabs-io/vigilante/types" - "github.com/btcsuite/btcd/rpcclient" - "github.com/pebbe/zmq4" - "go.uber.org/zap" -) - -var ( - ErrSubscribeDisabled = errors.New("subscribe disabled (ZmqEndpoint was not set)") - ErrSubscribeExited = errors.New("subscription backend has exited") - ErrSubscriptionAlreadyActive = errors.New("active subscription already exists") -) - -// Client is a client that provides methods for interacting with zmq4. -// Must be created with New and destroyed with Close. -// Clients are safe for concurrent use by multiple goroutines. -type Client struct { - rpcClient *rpcclient.Client - logger *zap.SugaredLogger - closed int32 // Set atomically. - wg sync.WaitGroup - quit chan struct{} - - zmqEndpoint string - blockEventChan chan *types.BlockEvent - - // ZMQ subscription related things. - zctx *zmq4.Context - zsub *zmq4.Socket - subs subscriptions - // subs.zfront --> zback is used like a channel to send messages to the zmqHandler goroutine. - // Have to use zmq4 sockets in place of native channels for communication from - // other functions to the goroutine, since it is constantly waiting on the zsub socket, - // it can't select on a channel at the same time but can poll on multiple sockets. - zback *zmq4.Socket -} - -// New returns an initiated client, or an error. -func New(parentLogger *zap.Logger, zmqEndpoint string, blockEventChan chan *types.BlockEvent, rpcClient *rpcclient.Client) (*Client, error) { - var ( - zctx *zmq4.Context - zsub *zmq4.Socket - zback *zmq4.Socket - err error - c = &Client{ - quit: make(chan struct{}), - rpcClient: rpcClient, - zmqEndpoint: zmqEndpoint, - logger: parentLogger.With(zap.String("module", "zmq")).Sugar(), - } - ) - - // ZMQ Subscribe. - zctx, err = zmq4.NewContext() - if err != nil { - return nil, err - } - - zsub, err = zctx.NewSocket(zmq4.SUB) - if err != nil { - return nil, err - } - if err = zsub.Connect(zmqEndpoint); err != nil { - return nil, err - } - - zback, err = zctx.NewSocket(zmq4.PAIR) - if err != nil { - return nil, err - } - if err = zback.Bind("inproc://channel"); err != nil { - return nil, err - } - - zfront, err := zctx.NewSocket(zmq4.PAIR) - if err != nil { - return nil, err - } - if err = zfront.Connect("inproc://channel"); err != nil { - return nil, err - } - - c.zctx = zctx - c.zsub = zsub - c.subs.exited = make(chan struct{}) - c.subs.zfront = zfront - c.zback = zback - c.blockEventChan = blockEventChan - - c.wg.Add(1) - go c.zmqHandler() - - return c, nil -} - -// Close terminates the client and releases resources. -func (c *Client) Close() (err error) { - if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { - return errors.New("client already closed") - } - if c.zctx != nil { - c.zctx.SetRetryAfterEINTR(false) - c.subs.Lock() - select { - case <-c.subs.exited: - default: - if _, err = c.subs.zfront.SendMessage("term"); err != nil { - return err - } - } - c.subs.Unlock() - <-c.subs.exited - err = c.zctx.Term() - if err != nil { - return err - } - } - close(c.quit) - c.wg.Wait() - return nil -} diff --git a/zmq/subscribe.go b/zmq/subscribe.go deleted file mode 100644 index 2df13ba6..00000000 --- a/zmq/subscribe.go +++ /dev/null @@ -1,182 +0,0 @@ -package zmq - -import ( - "encoding/hex" - "sync" - "time" - - "github.com/babylonlabs-io/vigilante/types" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" - zmq "github.com/pebbe/zmq4" -) - -// SequenceMsg is a subscription event coming from a "sequence" ZMQ message. -type SequenceMsg struct { - Hash [32]byte // use encoding/hex.EncodeToString() to get it into the RPC method string format. - Event types.EventType -} - -type subscriptions struct { - sync.RWMutex - - exited chan struct{} - zfront *zmq.Socket - latestEvent time.Time - active bool -} - -// SubscribeSequence subscribes to the ZMQ "sequence" messages as SequenceMsg items pushed onto the channel. -// Call cancel to cancel the subscription and let the client release the resources. The channel is closed -// when the subscription is canceled or when the client is closed. -func (c *Client) SubscribeSequence() (err error) { - if c.zsub == nil { - err = ErrSubscribeDisabled - return - } - c.subs.Lock() - select { - case <-c.subs.exited: - err = ErrSubscribeExited - c.subs.Unlock() - return - default: - } - - if c.subs.active { - err = ErrSubscriptionAlreadyActive - return - } - - _, err = c.subs.zfront.SendMessage("subscribe", "sequence") - if err != nil { - c.subs.Unlock() - return - } - c.subs.active = true - - c.subs.Unlock() - return -} - -func (c *Client) zmqHandler() { - defer c.wg.Done() - defer func(zsub *zmq.Socket) { - err := zsub.Close() - if err != nil { - c.logger.Errorf("Error closing ZMQ socket: %v", err) - } - }(c.zsub) - defer func(zback *zmq.Socket) { - err := zback.Close() - if err != nil { - c.logger.Errorf("Error closing ZMQ socket: %v", err) - } - }(c.zback) - - poller := zmq.NewPoller() - poller.Add(c.zsub, zmq.POLLIN) - poller.Add(c.zback, zmq.POLLIN) -OUTER: - for { - // Wait forever until a message can be received or the context was cancelled. - polled, err := poller.Poll(-1) - if err != nil { - break OUTER - } - - for _, p := range polled { - switch p.Socket { - case c.zsub: - msg, err := c.zsub.RecvMessage(0) - if err != nil { - break OUTER - } - c.subs.latestEvent = time.Now() - switch msg[0] { - case "sequence": - var sequenceMsg SequenceMsg - copy(sequenceMsg.Hash[:], msg[1]) - switch msg[1][32] { - case 'C': - sequenceMsg.Event = types.BlockConnected - case 'D': - sequenceMsg.Event = types.BlockDisconnected - default: - // not interested in other events - continue - } - - c.sendBlockEvent(sequenceMsg.Hash[:], sequenceMsg.Event) - } - - case c.zback: - msg, err := c.zback.RecvMessage(0) - if err != nil { - break OUTER - } - switch msg[0] { - case "subscribe": - if err := c.zsub.SetSubscribe(msg[1]); err != nil { - break OUTER - } - case "term": - break OUTER - } - } - } - } - - c.subs.Lock() - close(c.subs.exited) - err := c.subs.zfront.Close() - if err != nil { - c.logger.Errorf("Error closing zfront: %v", err) - return - } - // Close all subscriber channels. - if c.subs.active { - err = c.zsub.SetUnsubscribe("sequence") - if err != nil { - c.logger.Errorf("Error unsubscribing from sequence: %v", err) - return - } - } - - c.subs.Unlock() -} - -func (c *Client) sendBlockEvent(hash []byte, event types.EventType) { - blockHashStr := hex.EncodeToString(hash[:]) - blockHash, err := chainhash.NewHashFromStr(blockHashStr) - if err != nil { - c.logger.Errorf("Failed to parse block hash %v: %v", blockHashStr, err) - panic(err) - } - - c.logger.Infof("Received zmq sequence message for block %v", blockHashStr) - - ib, _, err := c.getBlockByHash(blockHash) - if err != nil { - c.logger.Errorf("Failed to get block %v from BTC client: %v", blockHash, err) - panic(err) - } - - c.blockEventChan <- types.NewBlockEvent(event, ib.Height, ib.Header) -} - -func (c *Client) getBlockByHash(blockHash *chainhash.Hash) (*types.IndexedBlock, *wire.MsgBlock, error) { - // TODO: ZMQ should not use BTC/RPC client, modify BlockEvent to include block hash - blockInfo, err := c.rpcClient.GetBlockVerbose(blockHash) - if err != nil { - return nil, nil, err - } - - mBlock, err := c.rpcClient.GetBlock(blockHash) - if err != nil { - return nil, nil, err - } - - btcTxs := types.GetWrappedTxs(mBlock) - return types.NewIndexedBlock(int32(blockInfo.Height), &mBlock.Header, btcTxs), mBlock, nil -}