Skip to content

Commit

Permalink
lnd: start blockbeat service and register subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Jul 2, 2024
1 parent af06828 commit 92d2ced
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
37 changes: 37 additions & 0 deletions chainio/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,40 @@ func (b *BlockbeatDispatcher) notifyQueues() {
}
}
}

// SetInitialBeat sets the current beat during the startup.
//
// NOTE: Must be called before `Start`.
func (b *BlockbeatDispatcher) SetInitialBeat() error {
// We need to register for block epochs and retry sweeping every block.
// We should get a notification with the current best block immediately
// if we don't provide any epoch. We'll wait for that in the collector.
blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}

// We registered for the block epochs with a nil request. The notifier
// should send us the current best block immediately. So we need to
// wait for it here because we need to know the current best height.
select {
case bestBlock := <-blockEpochs.Epochs:
clog.Infof("Received initial block %v at height %d",
bestBlock.Hash, bestBlock.Height)

// Update the current blockbeat.
b.beat = NewBeat(*bestBlock)

case <-b.quit:
clog.Debug("Sweeper shutting down")
}

// Set the initial height for the consumer.
for _, queue := range b.consumerQueues {
for _, c := range queue {
c.SetCurrentBeat(b.beat)
}
}

return nil
}
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
Expand Down Expand Up @@ -164,6 +165,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger)
AddSubLogger(root, "PEER", interceptor, peer.UseLogger)
AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger)
AddSubLogger(root, "CHIO", interceptor, chainio.UseLogger)

AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger)
AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger)
Expand Down
54 changes: 53 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/aliasmgr"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/chanbackup"
Expand Down Expand Up @@ -330,6 +331,10 @@ type server struct {
// txPublisher is a publisher with fee-bumping capability.
txPublisher *sweep.TxPublisher

// blockbeatDispatcher is a block dispatcher that notifies subscribers
// of new blocks.
blockbeatDispatcher *chainio.BlockbeatDispatcher

quit chan struct{}

wg sync.WaitGroup
Expand Down Expand Up @@ -579,6 +584,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,

blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
cc.ChainNotifier,
),
channelNotifier: channelnotifier.New(
dbs.ChanStateDB.ChannelStateDB(),
),
Expand Down Expand Up @@ -608,9 +616,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peersByPub: make(map[string]*peer.Brontide),
inboundPeers: make(map[string]*peer.Brontide),
outboundPeers: make(map[string]*peer.Brontide),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),

peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),

customMessageServer: subscribe.NewServer(),

tlsManager: tlsManager,
Expand Down Expand Up @@ -1677,9 +1686,31 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.connMgr = cmgr

// Finally, register the subsystems in blockbeat.
s.registerBlockConsumers()

return s, nil
}

// registerBlockConsumers registers the subsystems that consume block events.
// By calling `RegisterQueue`, a list of subsystems are registered in the
// blockbeat for block notifications. When a new block arrives, the subsystems
// in the same queue are notified sequentially, and different queues are
// notified concurrently.
//
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
// a new `RegisterQueue` call.
func (s *server) registerBlockConsumers() {
// In this queue, when a new block arrives, it will be received and
// processed in this order: chainArb -> sweeper -> txPublisher.
consumers := []chainio.Consumer{
s.chainArb,
s.sweeper,
s.txPublisher,
}
s.blockbeatDispatcher.RegisterQueue(consumers)
}

// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
Expand Down Expand Up @@ -1920,6 +1951,13 @@ func (s *server) Start() error {
}
cleanup = cleanup.add(s.cc.ChainNotifier.Stop)

// Once ChainNotifier is started, we can set the initial
// blockbeat for the consumers.
if err := s.blockbeatDispatcher.SetInitialBeat(); err != nil {
startErr = err
return
}

if err := s.cc.BestBlockTracker.Start(); err != nil {
startErr = err
return
Expand Down Expand Up @@ -2110,6 +2148,17 @@ func (s *server) Start() error {
return nil
})

// Start the blockbeat after all other subsystems have been
// started so they are ready to receive new blocks.
if err := s.blockbeatDispatcher.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(func() error {
s.blockbeatDispatcher.Stop()
return nil
})

// If peers are specified as a config option, we'll add those
// peers first.
for _, peerAddrCfg := range s.cfg.AddPeers {
Expand Down Expand Up @@ -2260,6 +2309,9 @@ func (s *server) Stop() error {
// Shutdown connMgr first to prevent conns during shutdown.
s.connMgr.Stop()

// Stop dispatching blocks to other systems immediately.
s.blockbeatDispatcher.Stop()

// Shutdown the wallet, funding manager, and the rpc server.
if err := s.chanStatusMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
Expand Down

0 comments on commit 92d2ced

Please sign in to comment.