Skip to content

Commit

Permalink
lnd: start blockbeat service and register subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Jun 27, 2024
1 parent 569a7f7 commit f638ef2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
Expand Down Expand Up @@ -164,6 +165,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger)
AddSubLogger(root, "PEER", interceptor, peer.UseLogger)
AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger)
AddSubLogger(root, "CHIO", interceptor, chainio.UseLogger)

AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger)
AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger)
Expand Down
45 changes: 44 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/aliasmgr"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/chanbackup"
Expand Down Expand Up @@ -330,6 +331,10 @@ type server struct {
// txPublisher is a publisher with fee-bumping capability.
txPublisher *sweep.TxPublisher

// blockbeatDispatcher is a block dispatcher that notifies subscribers
// of new blocks.
blockbeatDispatcher *chainio.BlockbeatDispatcher

quit chan struct{}

wg sync.WaitGroup
Expand Down Expand Up @@ -579,6 +584,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,

blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
cc.ChainNotifier,
),
channelNotifier: channelnotifier.New(
dbs.ChanStateDB.ChannelStateDB(),
),
Expand Down Expand Up @@ -608,9 +616,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peersByPub: make(map[string]*peer.Brontide),
inboundPeers: make(map[string]*peer.Brontide),
outboundPeers: make(map[string]*peer.Brontide),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),

peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),

customMessageServer: subscribe.NewServer(),

tlsManager: tlsManager,
Expand Down Expand Up @@ -1677,9 +1686,31 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.connMgr = cmgr

// Finally, register the subsystems in blockbeat.
s.registerBlockConsumers()

return s, nil
}

// registerBlockConsumers registers the subsystems that consume block events.
// By calling `RegisterQueue`, a list of subsystems are registered in the
// blockbeat for block notifications. When a new block arrives, the subsystems
// in the same queue are notified sequentially, and different queues are
// notified concurrently.
//
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
// a new `RegisterQueue` call.
func (s *server) registerBlockConsumers() {
// In this queue, when a new block arrives, it will be received and
// processed in this order: chainArb -> sweeper -> txPublisher.
consumers := []chainio.Consumer{
s.chainArb,
s.sweeper,
s.txPublisher,
}
s.blockbeatDispatcher.RegisterQueue(consumers)
}

// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
Expand Down Expand Up @@ -2110,6 +2141,17 @@ func (s *server) Start() error {
return nil
})

// Start the blockbeat after all other subsystems have been
// started so they are ready to receive new blocks.
if err := s.blockbeatDispatcher.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(func() error {
s.blockbeatDispatcher.Stop()
return nil
})

// If peers are specified as a config option, we'll add those
// peers first.
for _, peerAddrCfg := range s.cfg.AddPeers {
Expand Down Expand Up @@ -2297,6 +2339,7 @@ func (s *server) Stop() error {
}

s.txPublisher.Stop()
s.blockbeatDispatcher.Stop()

if err := s.channelNotifier.Stop(); err != nil {
srvrLog.Warnf("failed to stop channelNotifier: %v", err)
Expand Down

0 comments on commit f638ef2

Please sign in to comment.