From 92d2cedb2e5ff5b43c05167943959c071dad7179 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 26 Apr 2024 18:28:25 +0800 Subject: [PATCH] lnd: start blockbeat service and register subsystems --- chainio/dispatcher.go | 37 +++++++++++++++++++++++++++++ log.go | 2 ++ server.go | 54 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/chainio/dispatcher.go b/chainio/dispatcher.go index 7e1f97d02db..88d01cbaea9 100644 --- a/chainio/dispatcher.go +++ b/chainio/dispatcher.go @@ -181,3 +181,40 @@ func (b *BlockbeatDispatcher) notifyQueues() { } } } + +// SetInitialBeat sets the current beat during the startup. +// +// NOTE: Must be called before `Start`. +func (b *BlockbeatDispatcher) SetInitialBeat() error { + // We need to register for block epochs and retry sweeping every block. + // We should get a notification with the current best block immediately + // if we don't provide any epoch. We'll wait for that in the collector. + blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + return fmt.Errorf("register block epoch ntfn: %w", err) + } + + // We registered for the block epochs with a nil request. The notifier + // should send us the current best block immediately. So we need to + // wait for it here because we need to know the current best height. + select { + case bestBlock := <-blockEpochs.Epochs: + clog.Infof("Received initial block %v at height %d", + bestBlock.Hash, bestBlock.Height) + + // Update the current blockbeat. + b.beat = NewBeat(*bestBlock) + + case <-b.quit: + clog.Debug("Sweeper shutting down") + } + + // Set the initial height for the consumer. + for _, queue := range b.consumerQueues { + for _, c := range queue { + c.SetCurrentBeat(b.beat) + } + } + + return nil +} diff --git a/log.go b/log.go index f6da0235a92..fc3238b509e 100644 --- a/log.go +++ b/log.go @@ -7,6 +7,7 @@ import ( sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" @@ -164,6 +165,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor) AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger) AddSubLogger(root, "PEER", interceptor, peer.UseLogger) AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger) + AddSubLogger(root, "CHIO", interceptor, chainio.UseLogger) AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger) AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger) diff --git a/server.go b/server.go index 6ab197f86a1..84a0ceaa7af 100644 --- a/server.go +++ b/server.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/aliasmgr" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" @@ -330,6 +331,10 @@ type server struct { // txPublisher is a publisher with fee-bumping capability. txPublisher *sweep.TxPublisher + // blockbeatDispatcher is a block dispatcher that notifies subscribers + // of new blocks. + blockbeatDispatcher *chainio.BlockbeatDispatcher + quit chan struct{} wg sync.WaitGroup @@ -579,6 +584,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, readPool: readPool, chansToRestore: chansToRestore, + blockbeatDispatcher: chainio.NewBlockbeatDispatcher( + cc.ChainNotifier, + ), channelNotifier: channelnotifier.New( dbs.ChanStateDB.ChannelStateDB(), ), @@ -608,9 +616,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, peersByPub: make(map[string]*peer.Brontide), inboundPeers: make(map[string]*peer.Brontide), outboundPeers: make(map[string]*peer.Brontide), - peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerDisconnectedListeners: make(map[string][]chan<- struct{}), + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), + customMessageServer: subscribe.NewServer(), tlsManager: tlsManager, @@ -1677,9 +1686,31 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.connMgr = cmgr + // Finally, register the subsystems in blockbeat. + s.registerBlockConsumers() + return s, nil } +// registerBlockConsumers registers the subsystems that consume block events. +// By calling `RegisterQueue`, a list of subsystems are registered in the +// blockbeat for block notifications. When a new block arrives, the subsystems +// in the same queue are notified sequentially, and different queues are +// notified concurrently. +// +// NOTE: To put a subsystem in a different queue, create a slice and pass it to +// a new `RegisterQueue` call. +func (s *server) registerBlockConsumers() { + // In this queue, when a new block arrives, it will be received and + // processed in this order: chainArb -> sweeper -> txPublisher. + consumers := []chainio.Consumer{ + s.chainArb, + s.sweeper, + s.txPublisher, + } + s.blockbeatDispatcher.RegisterQueue(consumers) +} + // signAliasUpdate takes a ChannelUpdate and returns the signature. This is // used for option_scid_alias channels where the ChannelUpdate to be sent back // may differ from what is on disk. @@ -1920,6 +1951,13 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.cc.ChainNotifier.Stop) + // Once ChainNotifier is started, we can set the initial + // blockbeat for the consumers. + if err := s.blockbeatDispatcher.SetInitialBeat(); err != nil { + startErr = err + return + } + if err := s.cc.BestBlockTracker.Start(); err != nil { startErr = err return @@ -2110,6 +2148,17 @@ func (s *server) Start() error { return nil }) + // Start the blockbeat after all other subsystems have been + // started so they are ready to receive new blocks. + if err := s.blockbeatDispatcher.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(func() error { + s.blockbeatDispatcher.Stop() + return nil + }) + // If peers are specified as a config option, we'll add those // peers first. for _, peerAddrCfg := range s.cfg.AddPeers { @@ -2260,6 +2309,9 @@ func (s *server) Stop() error { // Shutdown connMgr first to prevent conns during shutdown. s.connMgr.Stop() + // Stop dispatching blocks to other systems immediately. + s.blockbeatDispatcher.Stop() + // Shutdown the wallet, funding manager, and the rpc server. if err := s.chanStatusMgr.Stop(); err != nil { srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)