From f638ef22e74fb0f6658fc6f73123776ae60e130c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 26 Apr 2024 18:28:25 +0800 Subject: [PATCH] lnd: start blockbeat service and register subsystems --- log.go | 2 ++ server.go | 45 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index f6da0235a92..fc3238b509e 100644 --- a/log.go +++ b/log.go @@ -7,6 +7,7 @@ import ( sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" @@ -164,6 +165,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor) AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger) AddSubLogger(root, "PEER", interceptor, peer.UseLogger) AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger) + AddSubLogger(root, "CHIO", interceptor, chainio.UseLogger) AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger) AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger) diff --git a/server.go b/server.go index 6ab197f86a1..b192ba30a6d 100644 --- a/server.go +++ b/server.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/aliasmgr" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" @@ -330,6 +331,10 @@ type server struct { // txPublisher is a publisher with fee-bumping capability. txPublisher *sweep.TxPublisher + // blockbeatDispatcher is a block dispatcher that notifies subscribers + // of new blocks. + blockbeatDispatcher *chainio.BlockbeatDispatcher + quit chan struct{} wg sync.WaitGroup @@ -579,6 +584,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, readPool: readPool, chansToRestore: chansToRestore, + blockbeatDispatcher: chainio.NewBlockbeatDispatcher( + cc.ChainNotifier, + ), channelNotifier: channelnotifier.New( dbs.ChanStateDB.ChannelStateDB(), ), @@ -608,9 +616,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, peersByPub: make(map[string]*peer.Brontide), inboundPeers: make(map[string]*peer.Brontide), outboundPeers: make(map[string]*peer.Brontide), - peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerDisconnectedListeners: make(map[string][]chan<- struct{}), + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), + customMessageServer: subscribe.NewServer(), tlsManager: tlsManager, @@ -1677,9 +1686,31 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.connMgr = cmgr + // Finally, register the subsystems in blockbeat. + s.registerBlockConsumers() + return s, nil } +// registerBlockConsumers registers the subsystems that consume block events. +// By calling `RegisterQueue`, a list of subsystems are registered in the +// blockbeat for block notifications. When a new block arrives, the subsystems +// in the same queue are notified sequentially, and different queues are +// notified concurrently. +// +// NOTE: To put a subsystem in a different queue, create a slice and pass it to +// a new `RegisterQueue` call. +func (s *server) registerBlockConsumers() { + // In this queue, when a new block arrives, it will be received and + // processed in this order: chainArb -> sweeper -> txPublisher. + consumers := []chainio.Consumer{ + s.chainArb, + s.sweeper, + s.txPublisher, + } + s.blockbeatDispatcher.RegisterQueue(consumers) +} + // signAliasUpdate takes a ChannelUpdate and returns the signature. This is // used for option_scid_alias channels where the ChannelUpdate to be sent back // may differ from what is on disk. @@ -2110,6 +2141,17 @@ func (s *server) Start() error { return nil }) + // Start the blockbeat after all other subsystems have been + // started so they are ready to receive new blocks. + if err := s.blockbeatDispatcher.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(func() error { + s.blockbeatDispatcher.Stop() + return nil + }) + // If peers are specified as a config option, we'll add those // peers first. for _, peerAddrCfg := range s.cfg.AddPeers { @@ -2297,6 +2339,7 @@ func (s *server) Stop() error { } s.txPublisher.Stop() + s.blockbeatDispatcher.Stop() if err := s.channelNotifier.Stop(); err != nil { srvrLog.Warnf("failed to stop channelNotifier: %v", err)