diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 9899044df9..71a4545707 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -587,137 +587,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // First, we'll fetch all the channels that are still open, in order to // collect them within our set of active contracts. - openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels() - if err != nil { + if err := c.loadOpenChannels(); err != nil { return err } - if len(openChannels) > 0 { - log.Infof("Creating ChannelArbitrators for %v active channels", - len(openChannels)) - } - - // For each open channel, we'll configure then launch a corresponding - // ChannelArbitrator. - for _, channel := range openChannels { - chanPoint := channel.FundingOutpoint - channel := channel - - // First, we'll create an active chainWatcher for this channel - // to ensure that we detect any relevant on chain events. - breachClosure := func(ret *lnwallet.BreachRetribution) error { - return c.cfg.ContractBreach(chanPoint, ret) - } - - chainWatcher, err := newChainWatcher( - chainWatcherConfig{ - chanState: channel, - notifier: c.cfg.Notifier, - signer: c.cfg.Signer, - isOurAddr: c.cfg.IsOurAddress, - contractBreach: breachClosure, - extractStateNumHint: lnwallet.GetStateNumHint, - auxLeafStore: c.cfg.AuxLeafStore, - auxResolver: c.cfg.AuxResolver, - }, - ) - if err != nil { - return err - } - - c.activeWatchers[chanPoint] = chainWatcher - channelArb, err := newActiveChannelArbitrator( - channel, c, chainWatcher.SubscribeChannelEvents(), - ) - if err != nil { - return err - } - - c.activeChannels[chanPoint] = channelArb - - // Republish any closing transactions for this channel. - err = c.republishClosingTxs(channel) - if err != nil { - log.Errorf("Failed to republish closing txs for "+ - "channel %v", chanPoint) - } - } - // In addition to the channels that we know to be open, we'll also // launch arbitrators to finishing resolving any channels that are in // the pending close state. - closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels( - true, - ) - if err != nil { + if err := c.loadPendingCloseChannels(); err != nil { return err } - if len(closingChannels) > 0 { - log.Infof("Creating ChannelArbitrators for %v closing channels", - len(closingChannels)) - } - - // Next, for each channel is the closing state, we'll launch a - // corresponding more restricted resolver, as we don't have to watch - // the chain any longer, only resolve the contracts on the confirmed - // commitment. - //nolint:lll - for _, closeChanInfo := range closingChannels { - // We can leave off the CloseContract and ForceCloseChan - // methods as the channel is already closed at this point. - chanPoint := closeChanInfo.ChanPoint - arbCfg := ChannelArbitratorConfig{ - ChanPoint: chanPoint, - ShortChanID: closeChanInfo.ShortChanID, - ChainArbitratorConfig: c.cfg, - ChainEvents: &ChainEventSubscription{}, - IsPendingClose: true, - ClosingHeight: closeChanInfo.CloseHeight, - CloseType: closeChanInfo.CloseType, - PutResolverReport: func(tx kvdb.RwTx, - report *channeldb.ResolverReport) error { - - return c.chanSource.PutResolverReport( - tx, c.cfg.ChainHash, &chanPoint, report, - ) - }, - FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) { - chanStateDB := c.chanSource.ChannelStateDB() - return chanStateDB.FetchHistoricalChannel(&chanPoint) - }, - FindOutgoingHTLCDeadline: func( - htlc channeldb.HTLC) fn.Option[int32] { - - return c.FindOutgoingHTLCDeadline( - closeChanInfo.ShortChanID, htlc, - ) - }, - } - chanLog, err := newBoltArbitratorLog( - c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, - ) - if err != nil { - return err - } - arbCfg.MarkChannelResolved = func() error { - if c.cfg.NotifyFullyResolvedChannel != nil { - c.cfg.NotifyFullyResolvedChannel(chanPoint) - } - - return c.ResolveContract(chanPoint) - } - - // We create an empty map of HTLC's here since it's possible - // that the channel is in StateDefault and updateActiveHTLCs is - // called. We want to avoid writing to an empty map. Since the - // channel is already in the process of being resolved, no new - // HTLCs will be added. - c.activeChannels[chanPoint] = NewChannelArbitrator( - arbCfg, make(map[HtlcSetKey]htlcSet), chanLog, - ) - } - // Now, we'll start all chain watchers in parallel to shorten start up // duration. In neutrino mode, this allows spend registrations to take // advantage of batch spend reporting, instead of doing a single rescan @@ -769,7 +649,7 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // transaction. var startStates map[wire.OutPoint]*chanArbStartState - err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error { + err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error { for _, arbitrator := range c.activeChannels { startState, err := arbitrator.getStartState(tx) if err != nil { @@ -1336,3 +1216,147 @@ func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID, func (c *ChainArbitrator) Name() string { return "ChainArbitrator" } + +// loadOpenChannels loads all channels that are currently open in the database +// and registers them with the chainWatcher for future notification. +func (c *ChainArbitrator) loadOpenChannels() error { + openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels() + if err != nil { + return err + } + + if len(openChannels) == 0 { + return nil + } + + log.Infof("Creating ChannelArbitrators for %v active channels", + len(openChannels)) + + // For each open channel, we'll configure then launch a corresponding + // ChannelArbitrator. + for _, channel := range openChannels { + chanPoint := channel.FundingOutpoint + channel := channel + + // First, we'll create an active chainWatcher for this channel + // to ensure that we detect any relevant on chain events. + breachClosure := func(ret *lnwallet.BreachRetribution) error { + return c.cfg.ContractBreach(chanPoint, ret) + } + + chainWatcher, err := newChainWatcher( + chainWatcherConfig{ + chanState: channel, + notifier: c.cfg.Notifier, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + contractBreach: breachClosure, + extractStateNumHint: lnwallet.GetStateNumHint, + auxLeafStore: c.cfg.AuxLeafStore, + auxResolver: c.cfg.AuxResolver, + }, + ) + if err != nil { + return err + } + + c.activeWatchers[chanPoint] = chainWatcher + channelArb, err := newActiveChannelArbitrator( + channel, c, chainWatcher.SubscribeChannelEvents(), + ) + if err != nil { + return err + } + + c.activeChannels[chanPoint] = channelArb + + // Republish any closing transactions for this channel. + err = c.republishClosingTxs(channel) + if err != nil { + log.Errorf("Failed to republish closing txs for "+ + "channel %v", chanPoint) + } + } + + return nil +} + +// loadPendingCloseChannels loads all channels that are currently pending +// closure in the database and registers them with the ChannelArbitrator to +// continue the resolution process. +func (c *ChainArbitrator) loadPendingCloseChannels() error { + chanStateDB := c.chanSource.ChannelStateDB() + + closingChannels, err := chanStateDB.FetchClosedChannels(true) + if err != nil { + return err + } + + if len(closingChannels) == 0 { + return nil + } + + log.Infof("Creating ChannelArbitrators for %v closing channels", + len(closingChannels)) + + // Next, for each channel is the closing state, we'll launch a + // corresponding more restricted resolver, as we don't have to watch + // the chain any longer, only resolve the contracts on the confirmed + // commitment. + //nolint:lll + for _, closeChanInfo := range closingChannels { + // We can leave off the CloseContract and ForceCloseChan + // methods as the channel is already closed at this point. + chanPoint := closeChanInfo.ChanPoint + arbCfg := ChannelArbitratorConfig{ + ChanPoint: chanPoint, + ShortChanID: closeChanInfo.ShortChanID, + ChainArbitratorConfig: c.cfg, + ChainEvents: &ChainEventSubscription{}, + IsPendingClose: true, + ClosingHeight: closeChanInfo.CloseHeight, + CloseType: closeChanInfo.CloseType, + PutResolverReport: func(tx kvdb.RwTx, + report *channeldb.ResolverReport) error { + + return c.chanSource.PutResolverReport( + tx, c.cfg.ChainHash, &chanPoint, report, + ) + }, + FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) { + return chanStateDB.FetchHistoricalChannel(&chanPoint) + }, + FindOutgoingHTLCDeadline: func( + htlc channeldb.HTLC) fn.Option[int32] { + + return c.FindOutgoingHTLCDeadline( + closeChanInfo.ShortChanID, htlc, + ) + }, + } + chanLog, err := newBoltArbitratorLog( + c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, + ) + if err != nil { + return err + } + arbCfg.MarkChannelResolved = func() error { + if c.cfg.NotifyFullyResolvedChannel != nil { + c.cfg.NotifyFullyResolvedChannel(chanPoint) + } + + return c.ResolveContract(chanPoint) + } + + // We create an empty map of HTLC's here since it's possible + // that the channel is in StateDefault and updateActiveHTLCs is + // called. We want to avoid writing to an empty map. Since the + // channel is already in the process of being resolved, no new + // HTLCs will be added. + c.activeChannels[chanPoint] = NewChannelArbitrator( + arbCfg, make(map[HtlcSetKey]htlcSet), chanLog, + ) + } + + return nil +} diff --git a/contractcourt/commit_sweep_resolver.go b/contractcourt/commit_sweep_resolver.go index 90daae1c3b..47ad4b8105 100644 --- a/contractcourt/commit_sweep_resolver.go +++ b/contractcourt/commit_sweep_resolver.go @@ -165,6 +165,10 @@ func (c *commitSweepResolver) getCommitTxConfHeight() (uint32, error) { // returned. // // NOTE: This function MUST be run as a goroutine. + +// TODO(yy): fix the funlen in the next PR. +// +//nolint:funlen func (c *commitSweepResolver) Resolve() (ContractResolver, error) { // If we're already resolved, then we can exit early. if c.resolved {