Skip to content

Commit

Permalink
contractcourt: remove block subscription in arbitrators
Browse files Browse the repository at this point in the history
This commit removes the block subscriptions used in `ChainArbitrator`
and `ChannelArbitrator`, replaced them with the blockbeat managed by
`BlockbeatDispatcher`.
  • Loading branch information
yyforyongyu committed Jul 2, 2024
1 parent a3be1ac commit e01589e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 121 deletions.
2 changes: 1 addition & 1 deletion contractcourt/breach_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

var (
defaultTimeout = 30 * time.Second
defaultTimeout = 10 * time.Second

breachOutPoints = []wire.OutPoint{
{
Expand Down
115 changes: 33 additions & 82 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,113 +756,37 @@ func (c *ChainArbitrator) Start() error {
}
}

// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}

// Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.dispatchBlocks(blockEpoch)
c.dispatchBlocks()
}()

// TODO(roasbeef): eventually move all breach watching here

return nil
}

// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint

// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32

// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}

// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks(
blockEpoch *chainntnfs.BlockEpochEvent) {

// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()

return blocks
}

// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()

recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()

func (c *ChainArbitrator) dispatchBlocks() {
// Consume block epochs until we receive the instruction to shutdown.
for {
select {
// Consume block epochs, exiting if our subscription is
// terminated.
case block, ok := <-blockEpoch.Epochs:
case beat, ok := <-c.BlockbeatChan:
if !ok {
log.Trace("dispatchBlocks block epoch " +
"cancelled")
return
}

// Get the set of currently active channels block
// subscription channels and dispatch the block to
// each.
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:

// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)

// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Send this blockbeat to all the active channels and
// wait for them to finish processing it.
c.handleBlockbeat(beat)

// Exit if the chain arbitrator is shutting down.
case <-c.quit:
Expand All @@ -871,6 +795,33 @@ func (c *ChainArbitrator) dispatchBlocks(
}
}

// handleBlockbeat sends the blockbeat to all active channel arbitrator in
// parallel and wait for them to finish processing it.
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
// Notify the chain arbitrator has processed the block.
defer beat.NotifyBlockProcessed(nil, c.quit)

// Read the active channels in a lock.
c.Lock()

// Create a slice to record active channel arbitrator.
channels := make([]chainio.Consumer, 0, len(c.activeChannels))

// Copy the active channels to the slice.
for _, channel := range c.activeChannels {
channels = append(channels, channel)
}

c.Unlock()

// Iterate all the copied channels and send the blockbeat to them.
err := beat.DispatchConcurrent(channels)
if err != nil {
// Shutdown lnd if there's an error processing the block.
log.Criticalf("Notify blockbeat failed: %v", err)
}
}

// republishClosingTxs will load any stored cooperative or unilateral closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
Expand Down
2 changes: 0 additions & 2 deletions contractcourt/chain_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestChainArbitratorRepublishCloses(t *testing.T) {
ChainIO: &mock.ChainIO{},
Notifier: &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
},
PublishTx: func(tx *wire.MsgTx, _ string) error {
Expand Down Expand Up @@ -165,7 +164,6 @@ func TestResolveContract(t *testing.T) {
ChainIO: &mock.ChainIO{},
Notifier: &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
},
PublishTx: func(tx *wire.MsgTx, _ string) error {
Expand Down
58 changes: 30 additions & 28 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,6 @@ type ChannelArbitrator struct {
// to do its duty.
cfg ChannelArbitratorConfig

// blocks is a channel that the arbitrator will receive new blocks on.
// This channel should be buffered by so that it does not block the
// sender.
blocks chan int32

// signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg
Expand Down Expand Up @@ -403,7 +398,6 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,

c := &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
Expand Down Expand Up @@ -484,10 +478,7 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
// Set our state from our starting state.
c.state = state.currentState

_, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
bestHeight := c.CurrentBeat().Height()

// If the channel has been marked pending close in the database, and we
// haven't transitioned the state machine to StateContractClosed (or a
Expand Down Expand Up @@ -2801,31 +2792,21 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our
// track of the best current height.
case blockHeight, ok := <-c.blocks:
if !ok {
return
}
bestHeight = blockHeight
case beat := <-c.BlockbeatChan:
bestHeight = beat.Height()

// If we're not in the default state, then we can
// ignore this signal as we're waiting for contract
// resolution.
if c.state != StateDefault {
continue
}
log.Debugf("ChannelArbitrator(%v): new block height=%v",
c.cfg.ChanPoint, bestHeight)

// Now that a new block has arrived, we'll attempt to
// advance our state forward.
nextState, _, err := c.advanceState(
uint32(bestHeight), chainTrigger, nil,
)
err := c.handleBlockbeat(beat)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
log.Errorf("Handle block=%v got err: %v",
bestHeight, err)
}

// If as a result of this trigger, the contract is
// fully resolved, then well exit.
if nextState == StateFullyResolved {
if c.state == StateFullyResolved {
return
}

Expand Down Expand Up @@ -3151,6 +3132,27 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
}
}

// handleBlockbeat processes a newly received blockbeat by advancing the
// arbitrator's internal state using the received block height.
func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
// Notify we've processed the block.
defer beat.NotifyBlockProcessed(nil, c.quit)

// Try to advance the state if we are in StateDefault.
if c.state == StateDefault {
// Now that a new block has arrived, we'll attempt to advance
// our state forward.
_, _, err := c.advanceState(
uint32(beat.Height()), chainTrigger, nil,
)
if err != nil {
return fmt.Errorf("unable to advance state: %w", err)
}
}

return nil
}

// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.
Expand Down
Loading

0 comments on commit e01589e

Please sign in to comment.