Skip to content

Commit

Permalink
x - test new contractcourt
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Nov 7, 2024
1 parent 4217059 commit c6ffaaf
Showing 1 changed file with 80 additions and 101 deletions.
181 changes: 80 additions & 101 deletions contractcourt/chain_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ type chainWatcher struct {
// clientSubscriptions is a map that keeps track of all the active
// client subscriptions for events related to this channel.
clientSubscriptions map[uint64]*ChainEventSubscription

fundingSpendNtfn *chainntnfs.SpendEvent
}

// newChainWatcher returns a new instance of a chainWatcher for a channel given
Expand Down Expand Up @@ -273,34 +275,33 @@ func newChainWatcher(cfg chainWatcherConfig) (*chainWatcher, error) {
// Mount the block consumer.
c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())

return c, nil
}

// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*chainWatcher)(nil)

// Name returns the name of the watcher.
//
// NOTE: part of the `chainio.Consumer` interface.
func (c *chainWatcher) Name() string {
return fmt.Sprintf("ChainWatcher(%v)", c.cfg.chanState.FundingOutpoint)
}
localKey := chanState.LocalChanCfg.MultiSigKey.PubKey
remoteKey := chanState.RemoteChanCfg.MultiSigKey.PubKey

// Start starts all goroutines that the chainWatcher needs to perform its
// duties.
func (c *chainWatcher) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
var (
err error
)
if chanState.ChanType.IsTaproot() {
c.fundingPkScript, _, err = input.GenTaprootFundingScript(
localKey, remoteKey, 0, chanState.TapscriptRoot,
)
if err != nil {
return nil, err
}
} else {
multiSigScript, err := input.GenMultiSigScript(
localKey.SerializeCompressed(),
remoteKey.SerializeCompressed(),
)
if err != nil {
return nil, err
}
c.fundingPkScript, err = input.WitnessScriptHash(multiSigScript)
if err != nil {
return nil, err
}
}

chanState := c.cfg.chanState
log.Debugf("Starting chain watcher for ChannelPoint(%v)",
chanState.FundingOutpoint)

// First, we'll register for a notification to be dispatched if the
// funding output is spent.
fundingOut := &chanState.FundingOutpoint

// As a height hint, we'll try to use the opening height, but if the
// channel isn't yet open, then we'll use the height it was broadcast
// at. This may be an unconfirmed zero-conf channel.
Expand All @@ -324,44 +325,46 @@ func (c *chainWatcher) Start() error {
}
}

localKey := chanState.LocalChanCfg.MultiSigKey.PubKey
remoteKey := chanState.RemoteChanCfg.MultiSigKey.PubKey

var (
err error
)
if chanState.ChanType.IsTaproot() {
c.fundingPkScript, _, err = input.GenTaprootFundingScript(
localKey, remoteKey, 0, chanState.TapscriptRoot,
)
if err != nil {
return err
}
} else {
multiSigScript, err := input.GenMultiSigScript(
localKey.SerializeCompressed(),
remoteKey.SerializeCompressed(),
)
if err != nil {
return err
}
c.fundingPkScript, err = input.WitnessScriptHash(multiSigScript)
if err != nil {
return err
}
}

// We'll register for a notification to be dispatched if the funding
// output is spent.
spendNtfn, err := c.cfg.notifier.RegisterSpendNtfn(
fundingOut, c.fundingPkScript, c.heightHint,
&chanState.FundingOutpoint, c.fundingPkScript, c.heightHint,
)
if err != nil {
return err
return nil, err
}

// Mount the spend notification.
c.fundingSpendNtfn = spendNtfn

return c, nil
}

// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*chainWatcher)(nil)

// Name returns the name of the watcher.
//
// NOTE: part of the `chainio.Consumer` interface.
func (c *chainWatcher) Name() string {
return fmt.Sprintf("ChainWatcher(%v)", c.cfg.chanState.FundingOutpoint)
}

// Start starts all goroutines that the chainWatcher needs to perform its
// duties.
func (c *chainWatcher) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}

chanState := c.cfg.chanState
log.Debugf("Starting chain watcher for ChannelPoint(%v)",
chanState.FundingOutpoint)

// With the spend notification obtained, we'll now dispatch the
// closeObserver which will properly react to any changes.
c.wg.Add(1)
go c.closeObserver(spendNtfn)
go c.closeObserver()

return nil
}
Expand Down Expand Up @@ -639,9 +642,9 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) {
// close observer will assembled the proper materials required to claim the
// funds of the channel on-chain (if required), then dispatch these as
// notifications to all subscribers.
func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
func (c *chainWatcher) closeObserver() {
defer c.wg.Done()
defer spendNtfn.Cancel()
defer c.fundingSpendNtfn.Cancel()

fundingOutpoint := c.cfg.chanState.FundingOutpoint
pkScript, err := txscript.ParsePkScript(c.fundingPkScript)
Expand All @@ -661,9 +664,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
c.cfg.chanState.FundingOutpoint, beat.Height())

// Process the block.
c.handleBlockbeat(
beat, fundingOutpoint, pkScript, spendNtfn,
)
c.handleBlockbeat(beat, fundingOutpoint, pkScript)

// The chainWatcher has been signalled to exit, so we'll do so
// now.
Expand All @@ -677,8 +678,7 @@ func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
// funding output. If the spending tx is found, it will be handled based on the
// closure type.
func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat,
fundingOutpoint wire.OutPoint, pkScript txscript.PkScript,
spendNtfn *chainntnfs.SpendEvent) {
fundingOutpoint wire.OutPoint, pkScript txscript.PkScript) {

// Notify the chain arbitrator has processed the block.
defer beat.NotifyBlockProcessed(nil, c.quit)
Expand All @@ -693,35 +693,18 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat,
}
}

// Check if the block contains a spending tx for the funding output.
spend, err := beat.HasOutpointSpentByScript(fundingOutpoint, pkScript)
if err != nil {
log.Errorf("Query spend failed for %v", fundingOutpoint, err)
spend := c.checkFundingSpend()
if spend == nil {
log.Tracef("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

return
}

// Found a spending tx of the funding outpoint and handle it now.
if spend != nil {
log.Debugf("Found spend details for funding output: %v",
spend.SpenderTxHash)

err = c.handleCommitSpend(spend)
if err != nil {
log.Errorf("Failed to handle commit spend: %v", err)
}

return
err := c.handleCommitSpend(spend)
if err != nil {
log.Errorf("Failed to handle commit spend: %v", err)
}

log.Tracef("No spend found for ChannelPoint(%v) in block %v",
c.cfg.chanState.FundingOutpoint, beat.Height())

// If this block doesn't contain a spending tx, we still need to check
// past blocks to see if the channel is already closed. Atm the
// `blockbeat` is not able to do rescan, so we perform a non-blocking
// read on the spending notification to see if it's already spent.
c.handleRestart(spendNtfn)
}

// chanPointConfirmed checks whether the given channel point has confirmed.
Expand Down Expand Up @@ -1471,12 +1454,10 @@ func (c *chainWatcher) handleCommitSpend(
return nil
}

// handleRestart performs a non-blocking read on the spendNtfn channel to check
// whether there's a commit spend already. Returns a bool to indicate whether
// the spend was found and handled.
//
// TODO(yy): remove this step once `blockbeat` is able to handle rescanning.
func (c *chainWatcher) handleRestart(spendNtfn *chainntnfs.SpendEvent) {
// checkFundingSpend performs a non-blocking read on the spendNtfn channel to
// check whether there's a commit spend already. Returns the spend details if
// found.
func (c *chainWatcher) checkFundingSpend() *chainntnfs.SpendDetail {
select {
// We've detected a spend of the channel onchain! Depending on the type
// of spend, we'll act accordingly, so we'll examine the spending
Expand All @@ -1485,22 +1466,20 @@ func (c *chainWatcher) handleRestart(spendNtfn *chainntnfs.SpendEvent) {
// TODO(Roasbeef): need to be able to ensure this only triggers
// on confirmation, to ensure if multiple txns are broadcast, we
// act on the one that's timestamped
case commitSpend, ok := <-spendNtfn.Spend:
case spend, ok := <-c.fundingSpendNtfn.Spend:
// If the channel was closed, then this means that the notifier
// exited, so we will as well.
if !ok {
return
return nil
}

log.Debugf("Found spending tx %v during restart for "+
"ChannelPoint(%v)", commitSpend.SpenderTxHash,
c.cfg.chanState.FundingOutpoint)
log.Debugf("Found spend details for funding output: %v",
spend.SpenderTxHash)

err := c.handleCommitSpend(commitSpend)
if err != nil {
log.Errorf("Failed to handle commit spend: %v", err)
}
return spend

default:
}

return nil
}

0 comments on commit c6ffaaf

Please sign in to comment.