Skip to content

Commit

Permalink
btc scanner merge handle block logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Lazar955 committed Aug 26, 2024
1 parent bdf4f9e commit f7335e2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 73 deletions.
102 changes: 30 additions & 72 deletions monitor/btcscanner/block_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,12 @@ import (
"errors"
"fmt"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// blockEventHandler handles connected and disconnected blocks from the BTC client.
func (bs *BtcScanner) blockEventHandler() {
func (bs *BtcScanner) blockEventHandler(blockNotifier *chainntnfs.BlockEpochEvent) {
defer bs.wg.Done()

if err := bs.btcNotifier.Start(); err != nil {
bs.logger.Errorf("Failed starting notifier")
return
}

blockNotifier, err := bs.btcNotifier.RegisterBlockEpochNtfn(nil)
if err != nil {
bs.logger.Errorf("Failed registering block epoch notifier")
return
}

defer blockNotifier.Cancel()

for {
Expand All @@ -34,67 +23,57 @@ func (bs *BtcScanner) blockEventHandler() {
return // channel closed
}

tip := bs.UnconfirmedBlockCache.Tip()

// Determine if a reorg happened to we know which flow to continue
// if the new block has the same height but a different hash.
reorg := false
if tip != nil {
if epoch.Height < tip.Height ||
(epoch.Height == tip.Height && epoch.BlockHeader.BlockHash() != tip.Header.BlockHash()) {
reorg = true
}
}

if !reorg {
if err := bs.handleConnectedBlocks(epoch.BlockHeader); err != nil {
bs.logger.Warnf("failed to handle a connected block at height %d: %s, "+
"need to restart the bootstrapping process", epoch.Height, err.Error())
if bs.Synced.Swap(false) {
bs.Bootstrap()
}
}
} else {
if err := bs.handleDisconnectedBlocks(epoch.BlockHeader); err != nil {
bs.logger.Warnf("failed to handle a disconnected block at height %d: %s,"+
"need to restart the bootstrapping process", epoch.Height, err.Error())
if bs.Synced.Swap(false) {
bs.Bootstrap()
}
if err := bs.handleNewBlock(epoch.Height, epoch.BlockHeader); err != nil {
bs.logger.Warnf("failed to handle block at height %d: %s, "+
"need to restart the bootstrapping process", epoch.Height, err.Error())
if bs.Synced.Swap(false) {
bs.Bootstrap()
}
}
}
}
}

// handleConnectedBlocks handles connected blocks from the BTC client
// handleNewBlock handles blocks from the BTC client
// if new confirmed blocks are found, send them through the channel
func (bs *BtcScanner) handleConnectedBlocks(header *wire.BlockHeader) error {
func (bs *BtcScanner) handleNewBlock(height int32, header *wire.BlockHeader) error {
if !bs.Synced.Load() {
return errors.New("the btc scanner is not synced")
}

// get the block from hash
blockHash := header.BlockHash()
ib, _, err := bs.BtcClient.GetBlockByHash(&blockHash)
if err != nil {
// failing to request the block, which means a bug
panic(err)
}

// get cache tip
cacheTip := bs.UnconfirmedBlockCache.Tip()
if cacheTip == nil {
return errors.New("no unconfirmed blocks found")
}

parentHash := ib.Header.PrevBlock
if cacheTip.Height >= height {
bs.logger.Debugf(
"the connecting block (height: %d, hash: %s) is too early, skipping the block",
height,
header.BlockHash().String(),
)
return nil
}

if cacheTip.Height+1 < height {
return fmt.Errorf("missing blocks, expected block height: %d, got: %d", cacheTip.Height+1, height)
}

parentHash := header.PrevBlock
// if the parent of the block is not the tip of the cache, then the cache is not up-to-date
if parentHash != cacheTip.BlockHash() {
return errors.New("cache is not up-to-date")
}

// get the block from hash
blockHash := header.BlockHash()
ib, _, err := bs.BtcClient.GetBlockByHash(&blockHash)
if err != nil {
// failing to request the block, which means a bug
panic(err)
}

// otherwise, add the block to the cache
bs.UnconfirmedBlockCache.Add(ib)

Expand All @@ -117,24 +96,3 @@ func (bs *BtcScanner) handleConnectedBlocks(header *wire.BlockHeader) error {

return nil
}

// handleDisconnectedBlocks handles disconnected blocks from the BTC client.
func (bs *BtcScanner) handleDisconnectedBlocks(header *wire.BlockHeader) error {
// get cache tip
cacheTip := bs.UnconfirmedBlockCache.Tip()
if cacheTip == nil {
return errors.New("cache is empty")
}

// if the block to be disconnected is not the tip of the cache, then the cache is not up-to-date,
if header.BlockHash() != cacheTip.BlockHash() {
return errors.New("cache is out-of-sync")
}

// otherwise, remove the block from the cache
if err := bs.UnconfirmedBlockCache.RemoveLast(); err != nil {
return fmt.Errorf("failed to remove last block from cache: %v", err)
}

return nil
}
13 changes: 12 additions & 1 deletion monitor/btcscanner/btc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,20 @@ func (bs *BtcScanner) Start() {
bs.Started.Store(true)
bs.logger.Info("the BTC scanner is started")

if err := bs.btcNotifier.Start(); err != nil {
bs.logger.Errorf("Failed starting notifier")
return
}

blockNotifier, err := bs.btcNotifier.RegisterBlockEpochNtfn(nil)
if err != nil {
bs.logger.Errorf("Failed registering block epoch notifier")
return
}

// start handling new blocks
bs.wg.Add(1)
go bs.blockEventHandler()
go bs.blockEventHandler(blockNotifier)

for bs.Started.Load() {
select {
Expand Down

0 comments on commit f7335e2

Please sign in to comment.