Skip to content

Commit

Permalink
x - looks like it's fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Nov 6, 2024
1 parent dafca3d commit 2ecc6cd
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 5 deletions.
7 changes: 6 additions & 1 deletion chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,14 @@ func (b *BitcoindNotifier) handleBlockConnected(
// satisfy any client requests based upon the new block.
b.bestBlock = epoch

err = b.txNotifier.NotifyHeight(uint32(epoch.Height))
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

b.notifyBlockEpochs(epoch)

return b.txNotifier.NotifyHeight(uint32(epoch.Height))
return nil
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
Expand Down
7 changes: 6 additions & 1 deletion chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,14 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// satisfy any client requests based upon the new block.
b.bestBlock = epoch

err = b.txNotifier.NotifyHeight(uint32(epoch.Height))
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

b.notifyBlockEpochs(epoch)

return b.txNotifier.NotifyHeight(uint32(epoch.Height))
return nil
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
Expand Down
8 changes: 7 additions & 1 deletion chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,14 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
n.bestBlock.Height = int32(newBlock.height)
n.bestBlock.Block = rawBlock.MsgBlock()

err = n.txNotifier.NotifyHeight(newBlock.height)
if err != nil {
return fmt.Errorf("unable to notify height: %w", err)
}

n.notifyBlockEpochs(n.bestBlock)
return n.txNotifier.NotifyHeight(newBlock.height)

return nil
}

// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
Expand Down
2 changes: 0 additions & 2 deletions chainntnfs/txnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,8 +1490,6 @@ func (n *TxNotifier) ConnectTip(block *btcutil.Block,
}
}

Log.Debugf("Deleting mature spend request %v at "+
"height=%d", spendRequest, blockHeight)
delete(n.spendNotifications, spendRequest)
}
delete(n.spendsByHeight, matureBlockHeight)
Expand Down
18 changes: 18 additions & 0 deletions contractcourt/chain_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,24 @@ func (c *chainWatcher) closeObserver() {
log.Tracef("ChainWatcher(%v) received blockbeat %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

log.Debugf("ChainWatcher(%v) received blockbeat %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

// Process the block.
c.handleBlockbeat(beat, fundingOutpoint, pkScript)

// case spend, ok := <-c.fundingSpendNtfn.Spend:
// // If the channel was closed, then this means that the notifier
// // exited, so we will as well.
// if !ok {
// return
// }

// err := c.handleCommitSpend(spend)
// if err != nil {
// log.Errorf("Failed to handle commit spend: %v", err)
// }

// The chainWatcher has been signalled to exit, so we'll do so
// now.
case <-c.quit:
Expand Down Expand Up @@ -698,6 +713,9 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat,
log.Tracef("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

log.Debugf("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

return
}

Expand Down
244 changes: 244 additions & 0 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,8 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
// Notify we've processed the block.
defer beat.NotifyBlockProcessed(nil, c.quit)

c.receiveCloseEvent()

// Try to advance the state if we are in StateDefault.
if c.state == StateDefault {
// Now that a new block has arrived, we'll attempt to advance
Expand All @@ -3181,6 +3183,248 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
return nil
}

func (c *ChannelArbitrator) receiveCloseEvent() {
select {
// We've cooperatively closed the channel, so we're no longer
// needed. We'll mark the channel as resolved and exit.
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
log.Infof("ChannelArbitrator(%v) marking channel "+
"cooperatively closed at height %v", c.id(),
closeInfo.CloseHeight,
)

err := c.cfg.MarkChannelClosed(
closeInfo.ChannelCloseSummary,
channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
log.Errorf("Unable to mark channel closed: "+
"%v", err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state, and the channel is marked resolved.
_, _, err = c.advanceState(
closeInfo.CloseHeight, coopCloseTrigger, nil,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
return
}

// We have broadcasted our commitment, and it is now confirmed
// on-chain.
case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
if c.state != StateCommitmentBroadcasted {
log.Errorf("ChannelArbitrator(%v): unexpected "+
"local on-chain channel close", c.id())
}

closeTx := closeInfo.CloseTx

log.Infof("ChannelArbitrator(%v): local force close "+
"tx=%v confirmed", c.id(), closeTx.TxHash())

contractRes := &ContractResolutions{
CommitHash: closeTx.TxHash(),
CommitResolution: closeInfo.CommitResolution,
HtlcResolutions: *closeInfo.HtlcResolutions,
AnchorResolution: closeInfo.AnchorResolution,
}

// When processing a unilateral close event, we'll
// transition to the ContractClosed state. We'll log
// out the set of resolutions such that they are
// available to fetch in that state, we'll also write
// the commit set so we can reconstruct our chain
// actions on restart.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&closeInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// After the set of resolutions are successfully
// logged, we can safely close the channel. After this
// succeeds we won't be getting chain events anymore,
// so we must make sure we can recover on restart after
// it is marked closed. If the next state transition
// fails, we'll start up in the prior state again, and
// we won't be longer getting chain events. In this
// case we must manually re-trigger the state
// transition into StateContractClosed based on the
// close status of the channel.
err = c.cfg.MarkChannelClosed(
closeInfo.ChannelCloseSummary,
channeldb.ChanStatusLocalCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark "+
"channel closed: %v", err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err = c.advanceState(
uint32(closeInfo.SpendingHeight),
localCloseTrigger, &closeInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

// The remote party has broadcast the commitment on-chain.
// We'll examine our state to determine if we need to act at
// all.
case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
log.Infof("ChannelArbitrator(%v): remote party has "+
"force closed channel at height %v", c.id(),
uniClosure.SpendingHeight,
)

// If we don't have a self output, and there are no
// active HTLC's, then we can immediately mark the
// contract as fully resolved and exit.
contractRes := &ContractResolutions{
CommitHash: *uniClosure.SpenderTxHash,
CommitResolution: uniClosure.CommitResolution,
HtlcResolutions: *uniClosure.HtlcResolutions,
AnchorResolution: uniClosure.AnchorResolution,
}

// When processing a unilateral close event, we'll
// transition to the ContractClosed state. We'll log
// out the set of resolutions such that they are
// available to fetch in that state, we'll also write
// the commit set so we can reconstruct our chain
// actions on restart.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&uniClosure.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// After the set of resolutions are successfully
// logged, we can safely close the channel. After this
// succeeds we won't be getting chain events anymore,
// so we must make sure we can recover on restart after
// it is marked closed. If the next state transition
// fails, we'll start up in the prior state again, and
// we won't be longer getting chain events. In this
// case we must manually re-trigger the state
// transition into StateContractClosed based on the
// close status of the channel.
closeSummary := &uniClosure.ChannelCloseSummary
err = c.cfg.MarkChannelClosed(
closeSummary,
channeldb.ChanStatusRemoteCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark channel closed: %v",
err)
return
}

// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err = c.advanceState(
uint32(uniClosure.SpendingHeight),
remoteCloseTrigger, &uniClosure.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

// The remote has breached the channel. As this is handled by
// the ChainWatcher and BreachArbitrator, we don't have to do
// anything in particular, so just advance our state and
// gracefully exit.
case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
closeSummary := &breachInfo.CloseSummary

log.Infof("ChannelArbitrator(%v): remote party has "+
"breached channel at height %v!", c.id(),
closeSummary.CloseHeight)

// In the breach case, we'll only have anchor and
// breach resolutions.
contractRes := &ContractResolutions{
CommitHash: breachInfo.CommitHash,
BreachResolution: breachInfo.BreachResolution,
AnchorResolution: breachInfo.AnchorResolution,
}

// We'll transition to the ContractClosed state and log
// the set of resolutions such that they can be turned
// into resolvers later on. We'll also insert the
// CommitSet of the latest set of commitments.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}

// The channel is finally marked pending closed here as
// the BreachArbitrator and channel arbitrator have
// persisted the relevant states.
err = c.cfg.MarkChannelClosed(
closeSummary,
channeldb.ChanStatusRemoteCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark channel closed: %v",
err)
return
}

log.Infof("Breached channel=%v marked pending-closed",
breachInfo.BreachResolution.FundingOutPoint)

// We'll advance our state machine until it reaches a
// terminal state.
_, _, err = c.advanceState(
closeSummary.CloseHeight,
breachCloseTrigger, &breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
}

default:
log.Infof("ChannelArbitrator(%v) no close event", c.id())
}

}

Check failure on line 3426 in contractcourt/channel_arbitrator.go

View workflow job for this annotation

GitHub Actions / lint code

unnecessary trailing newline (whitespace)

// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.
Expand Down

0 comments on commit 2ecc6cd

Please sign in to comment.