Skip to content

Commit

Permalink
x - looks like it's fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Nov 9, 2024
1 parent d8e3687 commit 329a49e
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 10 deletions.
7 changes: 6 additions & 1 deletion chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,14 @@ func (b *BitcoindNotifier) handleBlockConnected(
// satisfy any client requests based upon the new block.
b.bestBlock = epoch

err = b.txNotifier.NotifyHeight(uint32(epoch.Height))
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

b.notifyBlockEpochs(epoch)

return b.txNotifier.NotifyHeight(uint32(epoch.Height))
return nil
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
Expand Down
7 changes: 6 additions & 1 deletion chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,14 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// satisfy any client requests based upon the new block.
b.bestBlock = epoch

err = b.txNotifier.NotifyHeight(uint32(epoch.Height))
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

b.notifyBlockEpochs(epoch)

return b.txNotifier.NotifyHeight(uint32(epoch.Height))
return nil
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
Expand Down
8 changes: 7 additions & 1 deletion chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,14 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
n.bestBlock.Height = int32(newBlock.height)
n.bestBlock.Block = rawBlock.MsgBlock()

err = n.txNotifier.NotifyHeight(newBlock.height)
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

n.notifyBlockEpochs(n.bestBlock)
return n.txNotifier.NotifyHeight(newBlock.height)

return nil
}

// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
Expand Down
2 changes: 0 additions & 2 deletions chainntnfs/txnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,8 +1490,6 @@ func (n *TxNotifier) ConnectTip(block *btcutil.Block,
}
}

Log.Debugf("Deleting mature spend request %v at "+
"height=%d", spendRequest, blockHeight)
delete(n.spendNotifications, spendRequest)
}
delete(n.spendsByHeight, matureBlockHeight)
Expand Down
12 changes: 7 additions & 5 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat

log.Infof("ChainArbitrator starting at height %d with budget=[%v]",
&c.cfg.Budget, c.beat.Height())

// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
Expand Down Expand Up @@ -685,6 +682,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
c.dispatchBlocks()
}()

log.Infof("ChainArbitrator starting at height %d with %d chain "+
"watchers, %d channel arbitrators, and budget config=[%v]",
c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
&c.cfg.Budget)

// TODO(roasbeef): eventually move all breach watching here

return nil
Expand Down Expand Up @@ -1060,8 +1062,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error

chanPoint := newChan.FundingOutpoint

log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
newChan.FundingOutpoint)
log.Infof("Creating new Chainwatcher and ChannelArbitrator for "+
"ChannelPoint(%v)", newChan.FundingOutpoint)

// If we're already watching this channel, then we'll ignore this
// request.
Expand Down
18 changes: 18 additions & 0 deletions contractcourt/chain_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,24 @@ func (c *chainWatcher) closeObserver() {
log.Tracef("ChainWatcher(%v) received blockbeat %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

log.Debugf("ChainWatcher(%v) received blockbeat %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

// Process the block.
c.handleBlockbeat(beat, fundingOutpoint, pkScript)

case spend, ok := <-c.fundingSpendNtfn.Spend:
// If the channel was closed, then this means that the notifier
// exited, so we will as well.
if !ok {
return
}

err := c.handleCommitSpend(spend)
if err != nil {
log.Errorf("Failed to handle commit spend: %v", err)
}

// The chainWatcher has been signalled to exit, so we'll do so
// now.
case <-c.quit:
Expand Down Expand Up @@ -698,6 +713,9 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat,
log.Tracef("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

log.Debugf("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

return
}

Expand Down
244 changes: 244 additions & 0 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,8 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
// Notify we've processed the block.
defer beat.NotifyBlockProcessed(nil, c.quit)

c.receiveCloseEvent()

// Try to advance the state if we are in StateDefault.
if c.state == StateDefault {
// Now that a new block has arrived, we'll attempt to advance
Expand All @@ -3181,6 +3183,248 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
return nil
}

func (c *ChannelArbitrator) receiveCloseEvent() {
select {
// We've cooperatively closed the channel, so we're no longer
// needed. We'll mark the channel as resolved and exit.
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
log.Infof("ChannelArbitrator(%v) marking channel "+
"cooperatively closed at height %v", c.id(),
closeInfo.CloseHeight,
)

err := c.cfg.MarkChannelClosed(
closeInfo.ChannelCloseSummary,
channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
log.Errorf("Unable to mark channel closed: "+
"%v", err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state, and the channel is marked resolved.
_, _, err = c.advanceState(
closeInfo.CloseHeight, coopCloseTrigger, nil,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
return
}

// We have broadcasted our commitment, and it is now confirmed
// on-chain.
case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
if c.state != StateCommitmentBroadcasted {
log.Errorf("ChannelArbitrator(%v): unexpected "+
"local on-chain channel close", c.id())
}

closeTx := closeInfo.CloseTx

log.Infof("ChannelArbitrator(%v): local force close "+
"tx=%v confirmed", c.id(), closeTx.TxHash())

contractRes := &ContractResolutions{
CommitHash: closeTx.TxHash(),
CommitResolution: closeInfo.CommitResolution,
HtlcResolutions: *closeInfo.HtlcResolutions,
AnchorResolution: closeInfo.AnchorResolution,
}

// When processing a unilateral close event, we'll
// transition to the ContractClosed state. We'll log
// out the set of resolutions such that they are
// available to fetch in that state, we'll also write
// the commit set so we can reconstruct our chain
// actions on restart.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&closeInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// After the set of resolutions are successfully
// logged, we can safely close the channel. After this
// succeeds we won't be getting chain events anymore,
// so we must make sure we can recover on restart after
// it is marked closed. If the next state transition
// fails, we'll start up in the prior state again, and
// we won't be longer getting chain events. In this
// case we must manually re-trigger the state
// transition into StateContractClosed based on the
// close status of the channel.
err = c.cfg.MarkChannelClosed(
closeInfo.ChannelCloseSummary,
channeldb.ChanStatusLocalCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark "+
"channel closed: %v", err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err = c.advanceState(
uint32(closeInfo.SpendingHeight),
localCloseTrigger, &closeInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

// The remote party has broadcast the commitment on-chain.
// We'll examine our state to determine if we need to act at
// all.
case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
log.Infof("ChannelArbitrator(%v): remote party has "+
"force closed channel at height %v", c.id(),
uniClosure.SpendingHeight,
)

// If we don't have a self output, and there are no
// active HTLC's, then we can immediately mark the
// contract as fully resolved and exit.
contractRes := &ContractResolutions{
CommitHash: *uniClosure.SpenderTxHash,
CommitResolution: uniClosure.CommitResolution,
HtlcResolutions: *uniClosure.HtlcResolutions,
AnchorResolution: uniClosure.AnchorResolution,
}

// When processing a unilateral close event, we'll
// transition to the ContractClosed state. We'll log
// out the set of resolutions such that they are
// available to fetch in that state, we'll also write
// the commit set so we can reconstruct our chain
// actions on restart.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&uniClosure.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// After the set of resolutions are successfully
// logged, we can safely close the channel. After this
// succeeds we won't be getting chain events anymore,
// so we must make sure we can recover on restart after
// it is marked closed. If the next state transition
// fails, we'll start up in the prior state again, and
// we won't be longer getting chain events. In this
// case we must manually re-trigger the state
// transition into StateContractClosed based on the
// close status of the channel.
closeSummary := &uniClosure.ChannelCloseSummary
err = c.cfg.MarkChannelClosed(
closeSummary,
channeldb.ChanStatusRemoteCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark channel closed: %v",
err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err = c.advanceState(
uint32(uniClosure.SpendingHeight),
remoteCloseTrigger, &uniClosure.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

// The remote has breached the channel. As this is handled by
// the ChainWatcher and BreachArbitrator, we don't have to do
// anything in particular, so just advance our state and
// gracefully exit.
case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
closeSummary := &breachInfo.CloseSummary

log.Infof("ChannelArbitrator(%v): remote party has "+
"breached channel at height %v!", c.id(),
closeSummary.CloseHeight)

// In the breach case, we'll only have anchor and
// breach resolutions.
contractRes := &ContractResolutions{
CommitHash: breachInfo.CommitHash,
BreachResolution: breachInfo.BreachResolution,
AnchorResolution: breachInfo.AnchorResolution,
}

// We'll transition to the ContractClosed state and log
// the set of resolutions such that they can be turned
// into resolvers later on. We'll also insert the
// CommitSet of the latest set of commitments.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// The channel is finally marked pending closed here as
// the BreachArbitrator and channel arbitrator have
// persisted the relevant states.
err = c.cfg.MarkChannelClosed(
closeSummary,
channeldb.ChanStatusRemoteCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark channel closed: %v",
err)
return
}

log.Infof("Breached channel=%v marked pending-closed",
breachInfo.BreachResolution.FundingOutPoint)

// We'll advance our state machine until it reaches a
// terminal state.
_, _, err = c.advanceState(
closeSummary.CloseHeight,
breachCloseTrigger, &breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

default:
log.Infof("ChannelArbitrator(%v) no close event", c.id())
}

}

// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.
Expand Down

0 comments on commit 329a49e

Please sign in to comment.