diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 858ede922d..18681843f1 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1758,1220 +1758,1209 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( schedulerOp = append(schedulerOp, batch.LazyAdd()) } - var announcements []networkMsg - switch msg := nMsg.msg.(type) { - // A new node announcement has arrived which either presents new // information about a node in one of the channels we know about, or a // updating previously advertised information. case *lnwire.NodeAnnouncement: - timestamp := time.Unix(int64(msg.Timestamp), 0) - - // We'll quickly ask the router if it already has a - // newer update for this node so we can skip validating - // signatures if not required. - if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { - log.Debugf("Skipped processing stale node: %x", - msg.NodeID) - nMsg.err <- nil - return nil, true - } - - if err := d.addNode(msg, schedulerOp...); err != nil { - log.Debugf("Adding node: %x got error: %v", - msg.NodeID, err) - - if !routing.IsError( - err, - routing.ErrOutdated, - routing.ErrIgnored, - routing.ErrVBarrierShuttingDown, - ) { - - log.Error(err) - } - - nMsg.err <- err - return nil, false - } - - // In order to ensure we don't leak unadvertised nodes, we'll - // make a quick check to ensure this node intends to publicly - // advertise itself to the network. - isPublic, err := d.cfg.Router.IsPublicNode(msg.NodeID) - if err != nil { - log.Errorf("Unable to determine if node %x is "+ - "advertised: %v", msg.NodeID, err) - nMsg.err <- err - return nil, false - } - - // If it does, we'll add their announcement to our batch so that - // it can be broadcast to the rest of our peers. - if isPublic { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: msg, - }) - } else { - log.Tracef("Skipping broadcasting node announcement "+ - "for %x due to being unadvertised", msg.NodeID) - } - - nMsg.err <- nil - // TODO(roasbeef): get rid of the above - return announcements, true + return d.handleNodeAnnouncement(nMsg, msg, schedulerOp) // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises // the existence of a channel and not yet the routing policies in // either direction of the channel. case *lnwire.ChannelAnnouncement: - // We'll ignore any channel announcements that target any chain - // other than the set of chains we know of. - if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - err := fmt.Errorf("ignoring ChannelAnnouncement from "+ - "chain=%v, gossiper on chain=%v", msg.ChainHash, - d.cfg.ChainHash) - log.Errorf(err.Error()) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - nMsg.err <- err - return nil, false - } + return d.handleChanAnnouncement(nMsg, msg, schedulerOp) - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll ignore for it now. - d.Lock() - if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { - log.Warnf("Announcement for chan_id=(%v), is "+ - "premature: advertises height %v, only "+ - "height %v is known", - msg.ShortChannelID.ToUint64(), - msg.ShortChannelID.BlockHeight, - d.bestHeight) - d.Unlock() - nMsg.err <- nil - return nil, false - } - d.Unlock() + // A new authenticated channel edge update has arrived. This indicates + // that the directional information for an already known channel has + // been updated. + case *lnwire.ChannelUpdate: + return d.handleChanUpdate(nMsg, msg, schedulerOp) - // At this point, we'll now ask the router if this is a - // zombie/known edge. If so we can skip all the processing - // below. - if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { - nMsg.err <- nil - return nil, true - } + // A new signature announcement has been received. This indicates + // willingness of nodes involved in the funding of a channel to + // announce this new channel to the rest of the world. + case *lnwire.AnnounceSignatures: + return d.handleAnnSig(nMsg, msg) - // If this is a remote channel announcement, then we'll validate - // all the signatures within the proof as it should be well - // formed. - var proof *channeldb.ChannelAuthProof - if nMsg.isRemote { - if err := routing.ValidateChannelAnn(msg); err != nil { - err := fmt.Errorf("unable to validate "+ - "announcement: %v", err) + default: + err := errors.New("wrong type of the announcement") + nMsg.err <- err + return nil, false + } +} - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) +// processZombieUpdate determines whether the provided channel update should +// resurrect a given zombie edge. +func (d *AuthenticatedGossiper) processZombieUpdate( + chanInfo *channeldb.ChannelEdgeInfo, msg *lnwire.ChannelUpdate) error { - log.Error(err) - nMsg.err <- err - return nil, false - } + // The least-significant bit in the flag on the channel update tells us + // which edge is being updated. + isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 - // If the proof checks out, then we'll save the proof - // itself to the database so we can fetch it later when - // gossiping with other nodes. - proof = &channeldb.ChannelAuthProof{ - NodeSig1Bytes: msg.NodeSig1.ToSignatureBytes(), - NodeSig2Bytes: msg.NodeSig2.ToSignatureBytes(), - BitcoinSig1Bytes: msg.BitcoinSig1.ToSignatureBytes(), - BitcoinSig2Bytes: msg.BitcoinSig2.ToSignatureBytes(), - } - } + // Since we've deemed the update as not stale above, before marking it + // live, we'll make sure it has been signed by the correct party. If we + // have both pubkeys, either party can resurect the channel. If we've + // already marked this with the stricter, single-sided resurrection we + // will only have the pubkey of the node with the oldest timestamp. + var pubKey *btcec.PublicKey + switch { + case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey: + pubKey, _ = chanInfo.NodeKey1() + case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey: + pubKey, _ = chanInfo.NodeKey2() + } + if pubKey == nil { + return fmt.Errorf("incorrect pubkey to resurrect zombie "+ + "with chan_id=%v", msg.ShortChannelID) + } - // With the proof validate (if necessary), we can now store it - // within the database for our path finding and syncing needs. - var featureBuf bytes.Buffer - if err := msg.Features.Encode(&featureBuf); err != nil { - log.Errorf("unable to encode features: %v", err) - nMsg.err <- err - return nil, false - } + err := routing.VerifyChannelUpdateSignature(msg, pubKey) + if err != nil { + return fmt.Errorf("unable to verify channel "+ + "update signature: %v", err) + } - edge := &channeldb.ChannelEdgeInfo{ - ChannelID: msg.ShortChannelID.ToUint64(), - ChainHash: msg.ChainHash, - NodeKey1Bytes: msg.NodeID1, - NodeKey2Bytes: msg.NodeID2, - BitcoinKey1Bytes: msg.BitcoinKey1, - BitcoinKey2Bytes: msg.BitcoinKey2, - AuthProof: proof, - Features: featureBuf.Bytes(), - ExtraOpaqueData: msg.ExtraOpaqueData, - } + // With the signature valid, we'll proceed to mark the + // edge as live and wait for the channel announcement to + // come through again. + err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) + if err != nil { + return fmt.Errorf("unable to remove edge with "+ + "chan_id=%v from zombie index: %v", + msg.ShortChannelID, err) + } - // If there were any optional message fields provided, we'll - // include them in its serialized disk representation now. - if nMsg.optionalMsgFields != nil { - if nMsg.optionalMsgFields.capacity != nil { - edge.Capacity = *nMsg.optionalMsgFields.capacity - } - if nMsg.optionalMsgFields.channelPoint != nil { - edge.ChannelPoint = *nMsg.optionalMsgFields.channelPoint - } - } + log.Debugf("Removed edge with chan_id=%v from zombie "+ + "index", msg.ShortChannelID) - // We will add the edge to the channel router. If the nodes - // present in this channel are not present in the database, a - // partial node will be added to represent each node while we - // wait for a node announcement. - // - // Before we add the edge to the database, we obtain - // the mutex for this channel ID. We do this to ensure - // no other goroutine has read the database and is now - // making decisions based on this DB state, before it - // writes to the DB. - d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - err := d.cfg.Router.AddEdge(edge, schedulerOp...) - if err != nil { - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - - // If the edge was rejected due to already being known, - // then it may be that case that this new message has a - // fresh channel proof, so we'll check. - if routing.IsError(err, routing.ErrIgnored) { - // Attempt to process the rejected message to - // see if we get any new announcements. - anns, rErr := d.processRejectedEdge(msg, proof) - if rErr != nil { - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) + return nil +} - nMsg.err <- rErr - return nil, false - } +// fetchNodeAnn fetches the latest signed node announcement from our point of +// view for the node with the given public key. +func (d *AuthenticatedGossiper) fetchNodeAnn( + pubKey [33]byte) (*lnwire.NodeAnnouncement, error) { - // If while processing this rejected edge, we - // realized there's a set of announcements we - // could extract, then we'll return those - // directly. - if len(anns) != 0 { - nMsg.err <- nil - return anns, true - } + node, err := d.cfg.Router.FetchLightningNode(pubKey) + if err != nil { + return nil, err + } - // Otherwise, this is just a regular rejected - // edge. - log.Debugf("Router rejected channel "+ - "edge: %v", err) - } else { - log.Debugf("Router rejected channel "+ - "edge: %v", err) + return node.NodeAnnouncement(true) +} - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - } +// isMsgStale determines whether a message retrieved from the backing +// MessageStore is seen as stale by the current graph. +func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID, + ) - nMsg.err <- err - return nil, false + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true } - - // If err is nil, release the lock immediately. - d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - - // If we earlier received any ChannelUpdates for this channel, - // we can now process them, as the channel is added to the - // graph. - shortChanID := msg.ShortChannelID.ToUint64() - var channelUpdates []*networkMsg - - earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) - if err == nil { - // There was actually an entry in the map, so we'll - // accumulate it. We don't worry about deletion, since - // it'll eventually fall out anyway. - chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) - channelUpdates = append(channelUpdates, chanMsgs.msgs...) + if err != nil { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) + return false } - // Launch a new goroutine to handle each ChannelUpdate, this to - // ensure we don't block here, as we can handle only one - // announcement at a time. - for _, cu := range channelUpdates { - d.wg.Add(1) - go func(nMsg *networkMsg) { - defer d.wg.Done() - - switch msg := nMsg.msg.(type) { - - // Reprocess the message, making sure we return - // an error to the original caller in case the - // gossiper shuts down. - case *lnwire.ChannelUpdate: - log.Debugf("Reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v", - msg.ShortChannelID.ToUint64()) + // If the proof exists in the graph, then we have successfully + // received the remote proof and assembled the full proof, so we + // can safely delete the local proof from the database. + return chanInfo.AuthProof != nil - select { - case d.networkMsgs <- nMsg: - case <-d.quit: - nMsg.err <- ErrGossiperShuttingDown - } + case *lnwire.ChannelUpdate: + _, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - // We don't expect any other message type than - // ChannelUpdate to be in this map. - default: - log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: "+ - "%T", msg) - } - }(cu) + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true } - - // Channel announcement was successfully proceeded and know it - // might be broadcast to other connected nodes if it was - // announcement with proof (remote). - if proof != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: msg, - }) + if err != nil { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", msg.ShortChannelID, err) + return false } - nMsg.err <- nil - return announcements, true - - // A new authenticated channel edge update has arrived. This indicates - // that the directional information for an already known channel has - // been updated. - case *lnwire.ChannelUpdate: - // We'll ignore any channel announcements that target any chain - // other than the set of chains we know of. - if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - err := fmt.Errorf("ignoring ChannelUpdate from "+ - "chain=%v, gossiper on chain=%v", msg.ChainHash, - d.cfg.ChainHash) - log.Errorf(err.Error()) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - nMsg.err <- err - return nil, false + // Otherwise, we'll retrieve the correct policy that we + // currently have stored within our graph to check if this + // message is stale by comparing its timestamp. + var p *channeldb.ChannelEdgePolicy + if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 { + p = p1 + } else { + p = p2 } - blockHeight := msg.ShortChannelID.BlockHeight - shortChanID := msg.ShortChannelID.ToUint64() - - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll put the announcement in limbo - // to be fully verified once we advance forward in the chain. - d.Lock() - if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { - log.Warnf("Update announcement for "+ - "short_chan_id(%v), is premature: advertises "+ - "height %v, only height %v is known", - shortChanID, blockHeight, - d.bestHeight) - d.Unlock() - nMsg.err <- nil - return nil, false + // If the policy is still unknown, then we can consider this + // policy fresh. + if p == nil { + return false } - d.Unlock() - // Before we perform any of the expensive checks below, we'll - // check whether this update is stale or is for a zombie - // channel in order to quickly reject it. timestamp := time.Unix(int64(msg.Timestamp), 0) - if d.cfg.Router.IsStaleEdgePolicy( - msg.ShortChannelID, timestamp, msg.ChannelFlags, - ) { - - log.Debugf("Ignored stale edge policy: peer=%v, "+ - "source=%x, msg=%s, is_remote=%v", nMsg.peer, - nMsg.source.SerializeCompressed(), - nMsg.msg.MsgType(), nMsg.isRemote) + return p.LastUpdate.After(timestamp) - nMsg.err <- nil - return nil, true - } + default: + // We'll make sure to not mark any unsupported messages as stale + // to ensure they are not removed. + return false + } +} - // Get the node pub key as far as we don't have it in channel - // update announcement message. We'll need this to properly - // verify message signature. - // - // We make sure to obtain the mutex for this channel ID - // before we access the database. This ensures the state - // we read from the database has not changed between this - // point and when we call UpdateEdge() later. - d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - switch err { - // No error, break. - case nil: - break - - case channeldb.ErrZombieEdge: - err = d.processZombieUpdate(chanInfo, msg) - if err != nil { - log.Debug(err) - nMsg.err <- err - return nil, false - } +// updateChannel creates a new fully signed update for the channel, and updates +// the underlying graph with the new state. +func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, + *lnwire.ChannelUpdate, error) { - // We'll fallthrough to ensure we stash the update until - // we receive its corresponding ChannelAnnouncement. - // This is needed to ensure the edge exists in the graph - // before applying the update. - fallthrough - case channeldb.ErrGraphNotFound: - fallthrough - case channeldb.ErrGraphNoEdgesFound: - fallthrough - case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this ChannelUpdate was - // not found in the graph, this might be a channel in - // the process of being opened, and we haven't processed - // our own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets resolved after - // the channel proofs are exchanged and the channel is - // broadcasted to the rest of the network, but in case - // this is a private channel this won't ever happen. - // This can also happen in the case of a zombie channel - // with a fresh update for which we don't have a - // ChannelAnnouncement for since we reject them. Because - // of this, we temporarily add it to a map, and - // reprocess it after our own ChannelAnnouncement has - // been processed. - earlyMsgs, err := d.prematureChannelUpdates.Get( - shortChanID, - ) - switch { - // Nothing in the cache yet, we can just directly - // insert this element. - case err == cache.ErrElementNotFound: - _, _ = d.prematureChannelUpdates.Put( - shortChanID, &cachedNetworkMsg{ - msgs: []*networkMsg{nMsg}, - }) - - // There's already something in the cache, so we'll - // combine the set of messages into a single value. - default: - msgs := earlyMsgs.(*cachedNetworkMsg).msgs - msgs = append(msgs, nMsg) - _, _ = d.prematureChannelUpdates.Put( - shortChanID, &cachedNetworkMsg{ - msgs: msgs, - }) - } + // Parse the unsigned edge into a channel update. + chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge) - log.Debugf("Got ChannelUpdate for edge not found in "+ - "graph(shortChanID=%v), saving for "+ - "reprocessing later", shortChanID) + // We'll generate a new signature over a digest of the channel + // announcement itself and update the timestamp to ensure it propagate. + err := netann.SignChannelUpdate( + d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate, + netann.ChanUpdSetTimestamp, + ) + if err != nil { + return nil, nil, err + } - // NOTE: We don't return anything on the error channel - // for this message, as we expect that will be done when - // this ChannelUpdate is later reprocessed. - return nil, false + // Next, we'll set the new signature in place, and update the reference + // in the backing slice. + edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0) + edge.SigBytes = chanUpdate.Signature.ToSignatureBytes() - default: - err := fmt.Errorf("unable to validate channel update "+ - "short_chan_id=%v: %v", shortChanID, err) - log.Error(err) - nMsg.err <- err + // To ensure that our signature is valid, we'll verify it ourself + // before committing it to the slice returned. + err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate) + if err != nil { + return nil, nil, fmt.Errorf("generated invalid channel "+ + "update sig: %v", err) + } - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) + // Finally, we'll write the new edge policy to disk. + if err := d.cfg.Router.UpdateEdge(edge); err != nil { + return nil, nil, err + } - return nil, false + // We'll also create the original channel announcement so the two can + // be broadcast along side each other (if necessary), but only if we + // have a full channel announcement for this channel. + var chanAnn *lnwire.ChannelAnnouncement + if info.AuthProof != nil { + chanID := lnwire.NewShortChanIDFromInt(info.ChannelID) + chanAnn = &lnwire.ChannelAnnouncement{ + ShortChannelID: chanID, + NodeID1: info.NodeKey1Bytes, + NodeID2: info.NodeKey2Bytes, + ChainHash: info.ChainHash, + BitcoinKey1: info.BitcoinKey1Bytes, + Features: lnwire.NewRawFeatureVector(), + BitcoinKey2: info.BitcoinKey2Bytes, + ExtraOpaqueData: edge.ExtraOpaqueData, } - - // The least-significant bit in the flag on the channel update - // announcement tells us "which" side of the channels directed - // edge is being updated. - var ( - pubKey *btcec.PublicKey - edgeToUpdate *channeldb.ChannelEdgePolicy + chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature( + info.AuthProof.NodeSig1Bytes, ) - direction := msg.ChannelFlags & lnwire.ChanUpdateDirection - switch direction { - case 0: - pubKey, _ = chanInfo.NodeKey1() - edgeToUpdate = edge1 - case 1: - pubKey, _ = chanInfo.NodeKey2() - edgeToUpdate = edge2 + if err != nil { + return nil, nil, err } - - // If we have a previous version of the edge being updated, - // we'll want to rate limit its updates to prevent spam - // throughout the network. - if nMsg.isRemote && edgeToUpdate != nil { - // If it's a keep-alive update, we'll only propagate one - // if it's been a day since the previous. This follows - // our own heuristic of sending keep-alive updates after - // the same duration (see retransmitStaleAnns). - timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) - if IsKeepAliveUpdate(msg, edgeToUpdate) { - if timeSinceLastUpdate < d.cfg.RebroadcastInterval { - log.Debugf("Ignoring keep alive update "+ - "not within %v period for "+ - "channel %v", - d.cfg.RebroadcastInterval, - shortChanID) - nMsg.err <- nil - return nil, false - } - } else { - // If it's not, we'll allow an update per minute - // with a maximum burst of 10. If we haven't - // seen an update for this channel before, we'll - // need to initialize a rate limiter for each - // direction. - d.Lock() - rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] - if !ok { - r := rate.Every(d.cfg.ChannelUpdateInterval) - b := d.cfg.MaxChannelUpdateBurst - rateLimiters = [2]*rate.Limiter{ - rate.NewLimiter(r, b), - rate.NewLimiter(r, b), - } - d.chanUpdateRateLimiter[shortChanID] = rateLimiters - } - d.Unlock() - - if !rateLimiters[direction].Allow() { - log.Debugf("Rate limiting update for "+ - "channel %v from direction %x", - shortChanID, - pubKey.SerializeCompressed()) - nMsg.err <- nil - return nil, false - } - } + chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature( + info.AuthProof.NodeSig2Bytes, + ) + if err != nil { + return nil, nil, err } - - // Validate the channel announcement with the expected public key and - // channel capacity. In the case of an invalid channel update, we'll - // return an error to the caller and exit early. - err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, msg) + chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature( + info.AuthProof.BitcoinSig1Bytes, + ) if err != nil { - rErr := fmt.Errorf("unable to validate channel "+ - "update announcement for short_chan_id=%v: %v", - spew.Sdump(msg.ShortChannelID), err) - - log.Error(rErr) - nMsg.err <- rErr - return nil, false + return nil, nil, err } - - update := &channeldb.ChannelEdgePolicy{ - SigBytes: msg.Signature.ToSignatureBytes(), - ChannelID: shortChanID, - LastUpdate: timestamp, - MessageFlags: msg.MessageFlags, - ChannelFlags: msg.ChannelFlags, - TimeLockDelta: msg.TimeLockDelta, - MinHTLC: msg.HtlcMinimumMsat, - MaxHTLC: msg.HtlcMaximumMsat, - FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), - FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), - ExtraOpaqueData: msg.ExtraOpaqueData, + chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature( + info.AuthProof.BitcoinSig2Bytes, + ) + if err != nil { + return nil, nil, err } + } - if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { - if routing.IsError( - err, routing.ErrOutdated, - routing.ErrIgnored, - routing.ErrVBarrierShuttingDown, - ) { + return chanAnn, chanUpdate, err +} - log.Debug(err) - } else { - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) +// SyncManager returns the gossiper's SyncManager instance. +func (d *AuthenticatedGossiper) SyncManager() *SyncManager { + return d.syncMgr +} - log.Error(err) - } +// IsKeepAliveUpdate determines whether this channel update is considered a +// keep-alive update based on the previous channel update processed for the same +// direction. +func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, + prev *channeldb.ChannelEdgePolicy) bool { - nMsg.err <- err - return nil, false - } + // Both updates should be from the same direction. + if update.ChannelFlags&lnwire.ChanUpdateDirection != + prev.ChannelFlags&lnwire.ChanUpdateDirection { - // If this is a local ChannelUpdate without an AuthProof, it - // means it is an update to a channel that is not (yet) - // supposed to be announced to the greater network. However, - // our channel counter party will need to be given the update, - // so we'll try sending the update directly to the remote peer. - if !nMsg.isRemote && chanInfo.AuthProof == nil { - // Get our peer's public key. - remotePubKey := remotePubFromChanInfo( - chanInfo, msg.ChannelFlags, - ) + return false + } - log.Debugf("The message %v has no AuthProof, sending "+ - "the update to remote peer %x", - msg.MsgType(), remotePubKey) + // The timestamp should always increase for a keep-alive update. + timestamp := time.Unix(int64(update.Timestamp), 0) + if !timestamp.After(prev.LastUpdate) { + return false + } - // Now, we'll attempt to send the channel update message - // reliably to the remote peer in the background, so - // that we don't block if the peer happens to be offline - // at the moment. - err := d.reliableSender.sendMessage(msg, remotePubKey) - if err != nil { - err := fmt.Errorf("unable to reliably send %v "+ - "for channel=%v to peer=%x: %v", - msg.MsgType(), msg.ShortChannelID, - remotePubKey, err) - nMsg.err <- err - return nil, false - } - } + // None of the remaining fields should change for a keep-alive update. + if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { + return false + } + if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { + return false + } + if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { + return false + } + if update.TimeLockDelta != prev.TimeLockDelta { + return false + } + if update.HtlcMinimumMsat != prev.MinHTLC { + return false + } + if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { + return false + } + if update.HtlcMaximumMsat != prev.MaxHTLC { + return false + } + if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { + return false + } + return true +} - // Channel update announcement was successfully processed and - // now it can be broadcast to the rest of the network. However, - // we'll only broadcast the channel update announcement if it - // has an attached authentication proof. - if chanInfo.AuthProof != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: msg, - }) - } +// latestHeight returns the gossiper's latest height known of the chain. +func (d *AuthenticatedGossiper) latestHeight() uint32 { + d.Lock() + defer d.Unlock() + return d.bestHeight +} + +// handleNodeAnnouncement processes a new node announcement. +func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, + nodeAnn *lnwire.NodeAnnouncement, + ops []batch.SchedulerOption) ([]networkMsg, bool) { + timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) + + // We'll quickly ask the router if it already has a newer update for + // this node so we can skip validating signatures if not required. + if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) { + log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID) nMsg.err <- nil - return announcements, true + return nil, true + } - // A new signature announcement has been received. This indicates - // willingness of nodes involved in the funding of a channel to - // announce this new channel to the rest of the world. - case *lnwire.AnnounceSignatures: - needBlockHeight := msg.ShortChannelID.BlockHeight + - d.cfg.ProofMatureDelta - shortChanID := msg.ShortChannelID.ToUint64() + if err := d.addNode(nodeAnn, ops...); err != nil { + log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID, + err) - prefix := "local" - if nMsg.isRemote { - prefix = "remote" + if !routing.IsError( + err, + routing.ErrOutdated, + routing.ErrIgnored, + routing.ErrVBarrierShuttingDown, + ) { + + log.Error(err) } - log.Infof("Received new %v channel announcement for %v", prefix, - msg.ShortChannelID) + nMsg.err <- err + return nil, false + } + + // In order to ensure we don't leak unadvertised nodes, we'll make a + // quick check to ensure this node intends to publicly advertise itself + // to the network. + isPublic, err := d.cfg.Router.IsPublicNode(nodeAnn.NodeID) + if err != nil { + log.Errorf("Unable to determine if node %x is advertised: %v", + nodeAnn.NodeID, err) + nMsg.err <- err + return nil, false + } + + var announcements []networkMsg + + // If it does, we'll add their announcement to our batch so that it can + // be broadcast to the rest of our peers. + if isPublic { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nMsg.source, + msg: nodeAnn, + }) + } else { + log.Tracef("Skipping broadcasting node announcement for %x "+ + "due to being unadvertised", nodeAnn.NodeID) + } + + nMsg.err <- nil + // TODO(roasbeef): get rid of the above + return announcements, true +} - // By the specification, channel announcement proofs should be - // sent after some number of confirmations after channel was - // registered in bitcoin blockchain. Therefore, we check if the - // proof is premature. - d.Lock() - premature := d.isPremature( - msg.ShortChannelID, d.cfg.ProofMatureDelta, nMsg, +// handleChanAnnouncement processes a new channel announcement. +func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, + ann *lnwire.ChannelAnnouncement, + ops []batch.SchedulerOption) ([]networkMsg, bool) { + + // We'll ignore any channel announcements that target any chain other + // than the set of chains we know of. + if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) { + err := fmt.Errorf("ignoring ChannelAnnouncement from chain=%v"+ + ", gossiper on chain=%v", ann.ChainHash, + d.cfg.ChainHash) + log.Errorf(err.Error()) + + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), ) - if premature { - log.Warnf("Premature proof announcement, current "+ - "block height lower than needed: %v < %v", - d.bestHeight, needBlockHeight) - d.Unlock() - nMsg.err <- nil - return nil, false - } + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + nMsg.err <- err + return nil, false + } + + // If the advertised inclusionary block is beyond our knowledge of the + // chain tip, then we'll ignore it for now. + d.Lock() + if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) { + log.Warnf("Announcement for chan_id=(%v), is premature: "+ + "advertises height %v, only height %v is known", + ann.ShortChannelID.ToUint64(), + ann.ShortChannelID.BlockHeight, d.bestHeight) d.Unlock() + nMsg.err <- nil + return nil, false + } + d.Unlock() - // Ensure that we know of a channel with the target channel ID - // before proceeding further. - // - // We must acquire the mutex for this channel ID before getting - // the channel from the database, to ensure what we read does - // not change before we call AddProof() later. - d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - - chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( - msg.ShortChannelID) - if err != nil { - // TODO(andrew.shvv) this is dangerous because remote - // node might rewrite the waiting proof. - proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - err := d.cfg.WaitingProofStore.Add(proof) - if err != nil { - err := fmt.Errorf("unable to store "+ - "the proof for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) - nMsg.err <- err - return nil, false - } + // At this point, we'll now ask the router if this is a zombie/known + // edge. If so we can skip all the processing below. + if d.cfg.Router.IsKnownEdge(ann.ShortChannelID) { + nMsg.err <- nil + return nil, true + } - log.Infof("Orphan %v proof announcement with "+ - "short_chan_id=%v, adding "+ - "to waiting batch", prefix, shortChanID) - nMsg.err <- nil - return nil, false - } + // If this is a remote channel announcement, then we'll validate all + // the signatures within the proof as it should be well formed. + var proof *channeldb.ChannelAuthProof + if nMsg.isRemote { + if err := routing.ValidateChannelAnn(ann); err != nil { + err := fmt.Errorf("unable to validate announcement: "+ + "%v", err) - nodeID := nMsg.source.SerializeCompressed() - isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) - isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) - // Ensure that channel that was retrieved belongs to the peer - // which sent the proof announcement. - if !(isFirstNode || isSecondNode) { - err := fmt.Errorf("channel that was received not "+ - "belongs to the peer which sent the proof, "+ - "short_chan_id=%v", shortChanID) log.Error(err) nMsg.err <- err return nil, false } - // If proof was sent by a local sub-system, then we'll - // send the announcement signature to the remote node - // so they can also reconstruct the full channel - // announcement. - if !nMsg.isRemote { - var remotePubKey [33]byte - if isFirstNode { - remotePubKey = chanInfo.NodeKey2Bytes - } else { - remotePubKey = chanInfo.NodeKey1Bytes - } - // Since the remote peer might not be online - // we'll call a method that will attempt to - // deliver the proof when it comes online. - err := d.reliableSender.sendMessage(msg, remotePubKey) - if err != nil { - err := fmt.Errorf("unable to reliably send %v "+ - "for channel=%v to peer=%x: %v", - msg.MsgType(), msg.ShortChannelID, - remotePubKey, err) - nMsg.err <- err - return nil, false - } + // If the proof checks out, then we'll save the proof itself to + // the database so we can fetch it later when gossiping with + // other nodes. + proof = &channeldb.ChannelAuthProof{ + NodeSig1Bytes: ann.NodeSig1.ToSignatureBytes(), + NodeSig2Bytes: ann.NodeSig2.ToSignatureBytes(), + BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(), + BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(), } + } - // Check if we already have the full proof for this channel. - if chanInfo.AuthProof != nil { - // If we already have the fully assembled proof, then - // the peer sending us their proof has probably not - // received our local proof yet. So be kind and send - // them the full proof. - if nMsg.isRemote { - peerID := nMsg.source.SerializeCompressed() - log.Debugf("Got AnnounceSignatures for " + - "channel with full proof.") - - d.wg.Add(1) - go func() { - defer d.wg.Done() - log.Debugf("Received half proof for "+ - "channel %v with existing "+ - "full proof. Sending full "+ - "proof to peer=%x", - msg.ChannelID, - peerID) - - chanAnn, _, _, err := netann.CreateChanAnnouncement( - chanInfo.AuthProof, chanInfo, - e1, e2, - ) - if err != nil { - log.Errorf("unable to gen "+ - "ann: %v", err) - return - } - err = nMsg.peer.SendMessage( - false, chanAnn, - ) - if err != nil { - log.Errorf("Failed sending "+ - "full proof to "+ - "peer=%x: %v", - peerID, err) - return - } - log.Debugf("Full proof sent to peer=%x"+ - " for chanID=%v", peerID, - msg.ChannelID) - }() - } + // With the proof validated (if necessary), we can now store it within + // the database for our path finding and syncing needs. + var featureBuf bytes.Buffer + if err := ann.Features.Encode(&featureBuf); err != nil { + log.Errorf("unable to encode features: %v", err) + nMsg.err <- err + return nil, false + } - log.Debugf("Already have proof for channel "+ - "with chanID=%v", msg.ChannelID) - nMsg.err <- nil - return nil, true - } + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: ann.ShortChannelID.ToUint64(), + ChainHash: ann.ChainHash, + NodeKey1Bytes: ann.NodeID1, + NodeKey2Bytes: ann.NodeID2, + BitcoinKey1Bytes: ann.BitcoinKey1, + BitcoinKey2Bytes: ann.BitcoinKey2, + AuthProof: proof, + Features: featureBuf.Bytes(), + ExtraOpaqueData: ann.ExtraOpaqueData, + } - // Check that we received the opposite proof. If so, then we're - // now able to construct the full proof, and create the channel - // announcement. If we didn't receive the opposite half of the - // proof than we should store it this one, and wait for - // opposite to be received. - proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - oppositeProof, err := d.cfg.WaitingProofStore.Get( - proof.OppositeKey(), - ) - if err != nil && err != channeldb.ErrWaitingProofNotFound { - err := fmt.Errorf("unable to get "+ - "the opposite proof for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) - nMsg.err <- err - return nil, false + // If there were any optional message fields provided, we'll include + // them in its serialized disk representation now. + if nMsg.optionalMsgFields != nil { + if nMsg.optionalMsgFields.capacity != nil { + edge.Capacity = *nMsg.optionalMsgFields.capacity + } + if nMsg.optionalMsgFields.channelPoint != nil { + cp := *nMsg.optionalMsgFields.channelPoint + edge.ChannelPoint = cp } + } - if err == channeldb.ErrWaitingProofNotFound { - err := d.cfg.WaitingProofStore.Add(proof) - if err != nil { - err := fmt.Errorf("unable to store "+ - "the proof for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) - nMsg.err <- err + // We will add the edge to the channel router. If the nodes present in + // this channel are not present in the database, a partial node will be + // added to represent each node while we wait for a node announcement. + // + // Before we add the edge to the database, we obtain the mutex for this + // channel ID. We do this to ensure no other goroutine has read the + // database and is now making decisions based on this DB state, before + // it writes to the DB. + d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) + err := d.cfg.Router.AddEdge(edge, ops...) + if err != nil { + defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + + // If the edge was rejected due to already being known, then it + // may be the case that this new message has a fresh channel + // proof, so we'll check. + if routing.IsError(err, routing.ErrIgnored) { + // Attempt to process the rejected message to see if we + // get any new announcements. + anns, rErr := d.processRejectedEdge(ann, proof) + if rErr != nil { + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + cr := &cachedReject{} + _, _ = d.recentRejects.Put(key, cr) + + nMsg.err <- rErr return nil, false } - log.Infof("1/2 of channel ann proof received for "+ - "short_chan_id=%v, waiting for other half", - shortChanID) - - nMsg.err <- nil - return nil, false - } + // If while processing this rejected edge, we realized + // there's a set of announcements we could extract, + // then we'll return those directly. + if len(anns) != 0 { + nMsg.err <- nil + return anns, true + } - // We now have both halves of the channel announcement proof, - // then we'll reconstruct the initial announcement so we can - // validate it shortly below. - var dbProof channeldb.ChannelAuthProof - if isFirstNode { - dbProof.NodeSig1Bytes = msg.NodeSignature.ToSignatureBytes() - dbProof.NodeSig2Bytes = oppositeProof.NodeSignature.ToSignatureBytes() - dbProof.BitcoinSig1Bytes = msg.BitcoinSignature.ToSignatureBytes() - dbProof.BitcoinSig2Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() + // Otherwise, this is just a regular rejected edge. + log.Debugf("Router rejected channel edge: %v", err) } else { - dbProof.NodeSig1Bytes = oppositeProof.NodeSignature.ToSignatureBytes() - dbProof.NodeSig2Bytes = msg.NodeSignature.ToSignatureBytes() - dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() - dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() - } - chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( - &dbProof, chanInfo, e1, e2, - ) - if err != nil { - log.Error(err) - nMsg.err <- err - return nil, false + log.Debugf("Router rejected channel edge: %v", err) + + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) } - // With all the necessary components assembled validate the - // full channel announcement proof. - if err := routing.ValidateChannelAnn(chanAnn); err != nil { - err := fmt.Errorf("channel announcement proof "+ - "for short_chan_id=%v isn't valid: %v", - shortChanID, err) + nMsg.err <- err + return nil, false + } - log.Error(err) - nMsg.err <- err - return nil, false - } + // If err is nil, release the lock immediately. + d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) - // If the channel was returned by the router it means that - // existence of funding point and inclusion of nodes bitcoin - // keys in it already checked by the router. In this stage we - // should check that node keys are attest to the bitcoin keys - // by validating the signatures of announcement. If proof is - // valid then we'll populate the channel edge with it, so we - // can announce it on peer connect. - err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof) - if err != nil { - err := fmt.Errorf("unable add proof to the "+ - "channel chanID=%v: %v", msg.ChannelID, err) - log.Error(err) - nMsg.err <- err - return nil, false - } + // If we earlier received any ChannelUpdates for this channel, we can + // now process them, as the channel is added to the graph. + shortChanID := ann.ShortChannelID.ToUint64() + var channelUpdates []*networkMsg - err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) - if err != nil { - err := fmt.Errorf("unable remove opposite proof "+ - "for the channel with chanID=%v: %v", - msg.ChannelID, err) - log.Error(err) - nMsg.err <- err - return nil, false - } + earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + if err == nil { + // There was actually an entry in the map, so we'll accumulate + // it. We don't worry about deletion, since it'll eventually + // fall out anyway. + chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) + channelUpdates = append(channelUpdates, chanMsgs.msgs...) + } + + // Launch a new goroutine to handle each ChannelUpdate, this is to + // ensure we don't block here, as we can handle only one announcement + // at a time. + for _, cu := range channelUpdates { + d.wg.Add(1) + go func(updMsg *networkMsg) { + defer d.wg.Done() + + switch msg := updMsg.msg.(type) { + // Reprocess the message, making sure we return an + // error to the original caller in case the gossiper + // shuts down. + case *lnwire.ChannelUpdate: + log.Debugf("Reprocessing ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- updMsg: + case <-d.quit: + updMsg.err <- ErrGossiperShuttingDown + } - // Proof was successfully created and now can announce the - // channel to the remain network. - log.Infof("Fully valid channel proof for short_chan_id=%v "+ - "constructed, adding to next ann batch", - shortChanID) + // We don't expect any other message type than + // ChannelUpdate to be in this cache. + default: + log.Errorf("Unsupported message type found "+ + "among ChannelUpdates: %T", msg) + } + }(cu) + } + + // Channel announcement was successfully processed and now it might be + // broadcast to other connected nodes if it was an announcement with + // proof (remote). + var announcements []networkMsg - // Assemble the necessary announcements to add to the next - // broadcasting batch. + if proof != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nMsg.source, - msg: chanAnn, + msg: ann, }) - if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: src, - msg: e1Ann, - }) - } - if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: src, - msg: e2Ann, - }) - } + } + + nMsg.err <- nil + return announcements, true +} + +// handleChanUpdate processes a new channel update. +func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, + upd *lnwire.ChannelUpdate, + ops []batch.SchedulerOption) ([]networkMsg, bool) { + + // We'll ignore any channel updates that target any chain other than + // the set of chains we know of. + if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) { + err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+ + "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash) + log.Errorf(err.Error()) + + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + nMsg.err <- err + return nil, false + } + + blockHeight := upd.ShortChannelID.BlockHeight + shortChanID := upd.ShortChannelID.ToUint64() - // We'll also send along the node announcements for each channel - // participant if we know of them. To ensure our node - // announcement propagates to our channel counterparty, we'll - // set the source for each announcement to the node it belongs - // to, otherwise we won't send it since the source gets skipped. - // This isn't necessary for channel updates and announcement - // signatures since we send those directly to our channel - // counterparty through the gossiper's reliable sender. - node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes) + // If the advertised inclusionary block is beyond our knowledge of the + // chain tip, then we'll put the announcement in limbo to be fully + // verified once we advance forward in the chain. + d.Lock() + if nMsg.isRemote && d.isPremature(upd.ShortChannelID, 0, nMsg) { + log.Warnf("Update announcement for short_chan_id(%v), is "+ + "premature: advertises height %v, only height %v is "+ + "known", shortChanID, blockHeight, d.bestHeight) + d.Unlock() + nMsg.err <- nil + return nil, false + } + d.Unlock() + + // Before we perform any of the expensive checks below, we'll check + // whether this update is stale or is for a zombie channel in order to + // quickly reject it. + timestamp := time.Unix(int64(upd.Timestamp), 0) + if d.cfg.Router.IsStaleEdgePolicy( + upd.ShortChannelID, timestamp, upd.ChannelFlags, + ) { + + log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+ + "msg=%s, is_remote=%v", nMsg.peer, + nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), + nMsg.isRemote, + ) + + nMsg.err <- nil + return nil, true + } + + // Get the node pub key as far since we don't have it in the channel + // update announcement message. We'll need this to properly verify the + // message's signature. + // + // We make sure to obtain the mutex for this channel ID before we + // access the database. This ensures the state we read from the + // database has not changed between this point and when we call + // UpdateEdge() later. + d.channelMtx.Lock(upd.ShortChannelID.ToUint64()) + defer d.channelMtx.Unlock(upd.ShortChannelID.ToUint64()) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + upd.ShortChannelID, + ) + switch err { + // No error, break. + case nil: + break + + case channeldb.ErrZombieEdge: + err = d.processZombieUpdate(chanInfo, upd) if err != nil { - log.Debugf("Unable to fetch node announcement for "+ - "%x: %v", chanInfo.NodeKey1Bytes, err) - } else { - if nodeKey1, err := chanInfo.NodeKey1(); err == nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nodeKey1, - msg: node1Ann, - }) - } + log.Debug(err) + nMsg.err <- err + return nil, false } - node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes) - if err != nil { - log.Debugf("Unable to fetch node announcement for "+ - "%x: %v", chanInfo.NodeKey2Bytes, err) - } else { - if nodeKey2, err := chanInfo.NodeKey2(); err == nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nodeKey2, - msg: node2Ann, + + // We'll fallthrough to ensure we stash the update until we + // receive its corresponding ChannelAnnouncement. This is + // needed to ensure the edge exists in the graph before + // applying the update. + fallthrough + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this ChannelUpdate was not + // found in the graph, this might be a channel in the process + // of being opened, and we haven't processed our own + // ChannelAnnouncement yet, hence it is not not found in the + // graph. This usually gets resolved after the channel proofs + // are exchanged and the channel is broadcasted to the rest of + // the network, but in case this is a private channel this + // won't ever happen. This can also happen in the case of a + // zombie channel with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because of + // this, we temporarily add it to a map, and reprocess it after + // our own ChannelAnnouncement has been processed. + earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID) + switch { + // Nothing in the cache yet, we can just directly insert this + // element. + case err == cache.ErrElementNotFound: + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: []*networkMsg{nMsg}, + }) + + // There's already something in the cache, so we'll combine the + // set of messages into a single value. + default: + msgs := earlyMsgs.(*cachedNetworkMsg).msgs + msgs = append(msgs, nMsg) + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: msgs, }) - } } - nMsg.err <- nil - return announcements, true + log.Debugf("Got ChannelUpdate for edge not found in graph"+ + "(shortChanID=%v), saving for reprocessing later", + shortChanID) + + // NOTE: We don't return anything on the error channel for this + // message, as we expect that will be done when this + // ChannelUpdate is later reprocessed. + return nil, false default: - err := errors.New("wrong type of the announcement") + err := fmt.Errorf("unable to validate channel update "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) nMsg.err <- err - return nil, false - } -} -// processZombieUpdate determines whether the provided channel update should -// resurrect a given zombie edge. -func (d *AuthenticatedGossiper) processZombieUpdate( - chanInfo *channeldb.ChannelEdgeInfo, msg *lnwire.ChannelUpdate) error { + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) - // The least-significant bit in the flag on the channel update tells us - // which edge is being updated. - isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 + return nil, false + } - // Since we've deemed the update as not stale above, before marking it - // live, we'll make sure it has been signed by the correct party. If we - // have both pubkeys, either party can resurect the channel. If we've - // already marked this with the stricter, single-sided resurrection we - // will only have the pubkey of the node with the oldest timestamp. - var pubKey *btcec.PublicKey - switch { - case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey: + // The least-significant bit in the flag on the channel update + // announcement tells us "which" side of the channels directed edge is + // being updated. + var ( + pubKey *btcec.PublicKey + edgeToUpdate *channeldb.ChannelEdgePolicy + ) + direction := upd.ChannelFlags & lnwire.ChanUpdateDirection + switch direction { + case 0: pubKey, _ = chanInfo.NodeKey1() - case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey: + edgeToUpdate = e1 + case 1: pubKey, _ = chanInfo.NodeKey2() - } - if pubKey == nil { - return fmt.Errorf("incorrect pubkey to resurrect zombie "+ - "with chan_id=%v", msg.ShortChannelID) + edgeToUpdate = e2 } - err := routing.VerifyChannelUpdateSignature(msg, pubKey) + // Validate the channel announcement with the expected public key and + // channel capacity. In the case of an invalid channel update, we'll + // return an error to the caller and exit early. + err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd) if err != nil { - return fmt.Errorf("unable to verify channel "+ - "update signature: %v", err) - } + rErr := fmt.Errorf("unable to validate channel update "+ + "announcement for short_chan_id=%v: %v", + spew.Sdump(upd.ShortChannelID), err) - // With the signature valid, we'll proceed to mark the - // edge as live and wait for the channel announcement to - // come through again. - err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) - if err != nil { - return fmt.Errorf("unable to remove edge with "+ - "chan_id=%v from zombie index: %v", - msg.ShortChannelID, err) + log.Error(rErr) + nMsg.err <- rErr + return nil, false } - log.Debugf("Removed edge with chan_id=%v from zombie "+ - "index", msg.ShortChannelID) - - return nil -} + // If we have a previous version of the edge being updated, we'll want + // to rate limit its updates to prevent spam throughout the network. + if nMsg.isRemote && edgeToUpdate != nil { + // If it's a keep-alive update, we'll only propagate one if + // it's been a day since the previous. This follows our own + // heuristic of sending keep-alive updates after the same + // duration (see retransmitStaleAnns). + timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) + if IsKeepAliveUpdate(upd, edgeToUpdate) { + if timeSinceLastUpdate < d.cfg.RebroadcastInterval { + log.Debugf("Ignoring keep alive update not "+ + "within %v period for channel %v", + d.cfg.RebroadcastInterval, shortChanID) + nMsg.err <- nil + return nil, false + } + } else { + // If it's not, we'll allow an update per minute with a + // maximum burst of 10. If we haven't seen an update + // for this channel before, we'll need to initialize a + // rate limiter for each direction. + d.Lock() + rls, ok := d.chanUpdateRateLimiter[shortChanID] + if !ok { + r := rate.Every(d.cfg.ChannelUpdateInterval) + b := d.cfg.MaxChannelUpdateBurst + rls = [2]*rate.Limiter{ + rate.NewLimiter(r, b), + rate.NewLimiter(r, b), + } + d.chanUpdateRateLimiter[shortChanID] = rls + } + d.Unlock() -// fetchNodeAnn fetches the latest signed node announcement from our point of -// view for the node with the given public key. -func (d *AuthenticatedGossiper) fetchNodeAnn( - pubKey [33]byte) (*lnwire.NodeAnnouncement, error) { + if !rls[direction].Allow() { + log.Debugf("Rate limiting update for channel "+ + "%v from direction %x", shortChanID, + pubKey.SerializeCompressed()) + nMsg.err <- nil + return nil, false + } + } + } - node, err := d.cfg.Router.FetchLightningNode(pubKey) - if err != nil { - return nil, err + update := &channeldb.ChannelEdgePolicy{ + SigBytes: upd.Signature.ToSignatureBytes(), + ChannelID: shortChanID, + LastUpdate: timestamp, + MessageFlags: upd.MessageFlags, + ChannelFlags: upd.ChannelFlags, + TimeLockDelta: upd.TimeLockDelta, + MinHTLC: upd.HtlcMinimumMsat, + MaxHTLC: upd.HtlcMaximumMsat, + FeeBaseMSat: lnwire.MilliSatoshi(upd.BaseFee), + FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate), + ExtraOpaqueData: upd.ExtraOpaqueData, } - return node.NodeAnnouncement(true) -} + if err := d.cfg.Router.UpdateEdge(update, ops...); err != nil { + if routing.IsError( + err, routing.ErrOutdated, + routing.ErrIgnored, + routing.ErrVBarrierShuttingDown, + ) { -// isMsgStale determines whether a message retrieved from the backing -// MessageStore is seen as stale by the current graph. -func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { - switch msg := msg.(type) { - case *lnwire.AnnounceSignatures: - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - msg.ShortChannelID, - ) + log.Debug(err) + } else { + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) - // If the channel cannot be found, it is most likely a leftover - // message for a channel that was closed, so we can consider it - // stale. - if err == channeldb.ErrEdgeNotFound { - return true - } - if err != nil { - log.Debugf("Unable to retrieve channel=%v from graph: "+ - "%v", err) - return false + log.Error(err) } - // If the proof exists in the graph, then we have successfully - // received the remote proof and assembled the full proof, so we - // can safely delete the local proof from the database. - return chanInfo.AuthProof != nil + nMsg.err <- err + return nil, false + } - case *lnwire.ChannelUpdate: - _, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + // If this is a local ChannelUpdate without an AuthProof, it means it + // is an update to a channel that is not (yet) supposed to be announced + // to the greater network. However, our channel counter party will need + // to be given the update, so we'll try sending the update directly to + // the remote peer. + if !nMsg.isRemote && chanInfo.AuthProof == nil { + // Get our peer's public key. + remotePubKey := remotePubFromChanInfo( + chanInfo, upd.ChannelFlags, + ) - // If the channel cannot be found, it is most likely a leftover - // message for a channel that was closed, so we can consider it - // stale. - if err == channeldb.ErrEdgeNotFound { - return true - } + log.Debugf("The message %v has no AuthProof, sending the "+ + "update to remote peer %x", upd.MsgType(), + remotePubKey) + + // Now we'll attempt to send the channel update message + // reliably to the remote peer in the background, so that we + // don't block if the peer happens to be offline at the moment. + err := d.reliableSender.sendMessage(upd, remotePubKey) if err != nil { - log.Debugf("Unable to retrieve channel=%v from graph: "+ - "%v", msg.ShortChannelID, err) - return false + err := fmt.Errorf("unable to reliably send %v for "+ + "channel=%v to peer=%x: %v", upd.MsgType(), + upd.ShortChannelID, remotePubKey, err) + nMsg.err <- err + return nil, false } + } - // Otherwise, we'll retrieve the correct policy that we - // currently have stored within our graph to check if this - // message is stale by comparing its timestamp. - var p *channeldb.ChannelEdgePolicy - if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 { - p = p1 - } else { - p = p2 - } + // Channel update announcement was successfully processed and now it + // can be broadcast to the rest of the network. However, we'll only + // broadcast the channel update announcement if it has an attached + // authentication proof. + var announcements []networkMsg + if chanInfo.AuthProof != nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nMsg.source, + msg: upd, + }) + } - // If the policy is still unknown, then we can consider this - // policy fresh. - if p == nil { - return false - } + nMsg.err <- nil + return announcements, true +} - timestamp := time.Unix(int64(msg.Timestamp), 0) - return p.LastUpdate.After(timestamp) +// handleAnnSig processes a new announcement signatures message. +func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg, + ann *lnwire.AnnounceSignatures) ([]networkMsg, bool) { - default: - // We'll make sure to not mark any unsupported messages as stale - // to ensure they are not removed. - return false - } -} + needBlockHeight := ann.ShortChannelID.BlockHeight + + d.cfg.ProofMatureDelta + shortChanID := ann.ShortChannelID.ToUint64() -// updateChannel creates a new fully signed update for the channel, and updates -// the underlying graph with the new state. -func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, - edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, - *lnwire.ChannelUpdate, error) { + prefix := "local" + if nMsg.isRemote { + prefix = "remote" + } - // Parse the unsigned edge into a channel update. - chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge) + log.Infof("Received new %v channel announcement for %v", prefix, + ann.ShortChannelID) - // We'll generate a new signature over a digest of the channel - // announcement itself and update the timestamp to ensure it propagate. - err := netann.SignChannelUpdate( - d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate, - netann.ChanUpdSetTimestamp, + // By the specification, channel announcement proofs should be sent + // after some number of confirmations after channel was registered in + // bitcoin blockchain. Therefore, we check if the proof is mature. + d.Lock() + premature := d.isPremature( + ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg, ) - if err != nil { - return nil, nil, err + if premature { + log.Warnf("Premature proof announcement, current block height"+ + "lower than needed: %v < %v", d.bestHeight, + needBlockHeight) + d.Unlock() + nMsg.err <- nil + return nil, false } + d.Unlock() - // Next, we'll set the new signature in place, and update the reference - // in the backing slice. - edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0) - edge.SigBytes = chanUpdate.Signature.ToSignatureBytes() + // Ensure that we know of a channel with the target channel ID before + // proceeding further. + // + // We must acquire the mutex for this channel ID before getting the + // channel from the database, to ensure what we read does not change + // before we call AddProof() later. + d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) + defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + ann.ShortChannelID, + ) - // To ensure that our signature is valid, we'll verify it ourself - // before committing it to the slice returned. - err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate) if err != nil { - return nil, nil, fmt.Errorf("generated invalid channel "+ - "update sig: %v", err) + proof := channeldb.NewWaitingProof(nMsg.isRemote, ann) + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { + err := fmt.Errorf("unable to store the proof for "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil, false + } + + log.Infof("Orphan %v proof announcement with short_chan_id=%v"+ + ", adding to waiting batch", prefix, shortChanID) + nMsg.err <- nil + return nil, false } - // Finally, we'll write the new edge policy to disk. - if err := d.cfg.Router.UpdateEdge(edge); err != nil { - return nil, nil, err + nodeID := nMsg.source.SerializeCompressed() + isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) + isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) + + // Ensure that channel that was retrieved belongs to the peer which + // sent the proof announcement. + if !(isFirstNode || isSecondNode) { + err := fmt.Errorf("channel that was received doesn't belong "+ + "to the peer which sent the proof, short_chan_id=%v", + shortChanID) + log.Error(err) + nMsg.err <- err + return nil, false } - // We'll also create the original channel announcement so the two can - // be broadcast along side each other (if necessary), but only if we - // have a full channel announcement for this channel. - var chanAnn *lnwire.ChannelAnnouncement - if info.AuthProof != nil { - chanID := lnwire.NewShortChanIDFromInt(info.ChannelID) - chanAnn = &lnwire.ChannelAnnouncement{ - ShortChannelID: chanID, - NodeID1: info.NodeKey1Bytes, - NodeID2: info.NodeKey2Bytes, - ChainHash: info.ChainHash, - BitcoinKey1: info.BitcoinKey1Bytes, - Features: lnwire.NewRawFeatureVector(), - BitcoinKey2: info.BitcoinKey2Bytes, - ExtraOpaqueData: edge.ExtraOpaqueData, - } - chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature( - info.AuthProof.NodeSig1Bytes, - ) - if err != nil { - return nil, nil, err - } - chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature( - info.AuthProof.NodeSig2Bytes, - ) - if err != nil { - return nil, nil, err + // If proof was sent by a local sub-system, then we'll send the + // announcement signature to the remote node so they can also + // reconstruct the full channel announcement. + if !nMsg.isRemote { + var remotePubKey [33]byte + if isFirstNode { + remotePubKey = chanInfo.NodeKey2Bytes + } else { + remotePubKey = chanInfo.NodeKey1Bytes } - chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature( - info.AuthProof.BitcoinSig1Bytes, - ) + // Since the remote peer might not be online we'll call a + // method that will attempt to deliver the proof when it comes + // online. + err := d.reliableSender.sendMessage(ann, remotePubKey) if err != nil { - return nil, nil, err - } - chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature( - info.AuthProof.BitcoinSig2Bytes, - ) - if err != nil { - return nil, nil, err + err := fmt.Errorf("unable to reliably send %v for "+ + "channel=%v to peer=%x: %v", ann.MsgType(), + ann.ShortChannelID, remotePubKey, err) + nMsg.err <- err + return nil, false } } - return chanAnn, chanUpdate, err -} + // Check if we already have the full proof for this channel. + if chanInfo.AuthProof != nil { + // If we already have the fully assembled proof, then the peer + // sending us their proof has probably not received our local + // proof yet. So be kind and send them the full proof. + if nMsg.isRemote { + peerID := nMsg.source.SerializeCompressed() + log.Debugf("Got AnnounceSignatures for channel with " + + "full proof.") -// SyncManager returns the gossiper's SyncManager instance. -func (d *AuthenticatedGossiper) SyncManager() *SyncManager { - return d.syncMgr -} + d.wg.Add(1) + go func() { + defer d.wg.Done() + log.Debugf("Received half proof for channel "+ + "%v with existing full proof. Sending"+ + " full proof to peer=%x", + ann.ChannelID, peerID) -// IsKeepAliveUpdate determines whether this channel update is considered a -// keep-alive update based on the previous channel update processed for the same -// direction. -func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, - prev *channeldb.ChannelEdgePolicy) bool { + ca, _, _, err := netann.CreateChanAnnouncement( + chanInfo.AuthProof, chanInfo, e1, e2, + ) + if err != nil { + log.Errorf("unable to gen ann: %v", + err) + return + } - // Both updates should be from the same direction. - if update.ChannelFlags&lnwire.ChanUpdateDirection != - prev.ChannelFlags&lnwire.ChanUpdateDirection { + err = nMsg.peer.SendMessage(false, ca) + if err != nil { + log.Errorf("Failed sending full proof"+ + " to peer=%x: %v", peerID, err) + return + } - return false + log.Debugf("Full proof sent to peer=%x for "+ + "chanID=%v", peerID, ann.ChannelID) + }() + } + + log.Debugf("Already have proof for channel with chanID=%v", + ann.ChannelID) + nMsg.err <- nil + return nil, true + } + + // Check that we received the opposite proof. If so, then we're now + // able to construct the full proof, and create the channel + // announcement. If we didn't receive the opposite half of the proof + // then we should store this one, and wait for the opposite to be + // received. + proof := channeldb.NewWaitingProof(nMsg.isRemote, ann) + oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey()) + if err != nil && err != channeldb.ErrWaitingProofNotFound { + err := fmt.Errorf("unable to get the opposite proof for "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil, false } - // The timestamp should always increase for a keep-alive update. - timestamp := time.Unix(int64(update.Timestamp), 0) - if !timestamp.After(prev.LastUpdate) { - return false + if err == channeldb.ErrWaitingProofNotFound { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { + err := fmt.Errorf("unable to store the proof for "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil, false + } + + log.Infof("1/2 of channel ann proof received for "+ + "short_chan_id=%v, waiting for other half", + shortChanID) + + nMsg.err <- nil + return nil, false } - // None of the remaining fields should change for a keep-alive update. - if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { - return false + // We now have both halves of the channel announcement proof, then + // we'll reconstruct the initial announcement so we can validate it + // shortly below. + var dbProof channeldb.ChannelAuthProof + if isFirstNode { + dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes() + dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes() + dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes() + dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes() + } else { + dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes() + dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes() + dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes() + dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes() } - if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { - return false + + chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( + &dbProof, chanInfo, e1, e2, + ) + if err != nil { + log.Error(err) + nMsg.err <- err + return nil, false } - if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { - return false + + // With all the necessary components assembled validate the full + // channel announcement proof. + if err := routing.ValidateChannelAnn(chanAnn); err != nil { + err := fmt.Errorf("channel announcement proof for "+ + "short_chan_id=%v isn't valid: %v", shortChanID, err) + + log.Error(err) + nMsg.err <- err + return nil, false } - if update.TimeLockDelta != prev.TimeLockDelta { - return false + + // If the channel was returned by the router it means that existence of + // funding point and inclusion of nodes bitcoin keys in it already + // checked by the router. In this stage we should check that node keys + // attest to the bitcoin keys by validating the signatures of + // announcement. If proof is valid then we'll populate the channel edge + // with it, so we can announce it on peer connect. + err = d.cfg.Router.AddProof(ann.ShortChannelID, &dbProof) + if err != nil { + err := fmt.Errorf("unable add proof to the channel chanID=%v:"+ + " %v", ann.ChannelID, err) + log.Error(err) + nMsg.err <- err + return nil, false } - if update.HtlcMinimumMsat != prev.MinHTLC { - return false + + err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) + if err != nil { + err := fmt.Errorf("unable to remove opposite proof for the "+ + "channel with chanID=%v: %v", ann.ChannelID, err) + log.Error(err) + nMsg.err <- err + return nil, false } - if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { - return false + + // Proof was successfully created and now can announce the channel to + // the remain network. + log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+ + ", adding to next ann batch", shortChanID) + + // Assemble the necessary announcements to add to the next broadcasting + // batch. + var announcements []networkMsg + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nMsg.source, + msg: chanAnn, + }) + if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: src, + msg: e1Ann, + }) } - if update.HtlcMaximumMsat != prev.MaxHTLC { - return false + if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: src, + msg: e2Ann, + }) } - if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { - return false + + // We'll also send along the node announcements for each channel + // participant if we know of them. To ensure our node announcement + // propagates to our channel counterparty, we'll set the source for + // each announcement to the node it belongs to, otherwise we won't send + // it since the source gets skipped. This isn't necessary for channel + // updates and announcement signatures since we send those directly to + // our channel counterparty through the gossiper's reliable sender. + node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes) + if err != nil { + log.Debugf("Unable to fetch node announcement for %x: %v", + chanInfo.NodeKey1Bytes, err) + } else { + if nodeKey1, err := chanInfo.NodeKey1(); err == nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nodeKey1, + msg: node1Ann, + }) + } } - return true -} -// latestHeight returns the gossiper's latest height known of the chain. -func (d *AuthenticatedGossiper) latestHeight() uint32 { - d.Lock() - defer d.Unlock() - return d.bestHeight + node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes) + if err != nil { + log.Debugf("Unable to fetch node announcement for %x: %v", + chanInfo.NodeKey2Bytes, err) + } else { + if nodeKey2, err := chanInfo.NodeKey2(); err == nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nodeKey2, + msg: node2Ann, + }) + } + } + + nMsg.err <- nil + return announcements, true } diff --git a/docs/release-notes/release-notes-0.15.0.md b/docs/release-notes/release-notes-0.15.0.md index 140908d69f..c93e8d6b00 100644 --- a/docs/release-notes/release-notes-0.15.0.md +++ b/docs/release-notes/release-notes-0.15.0.md @@ -152,6 +152,8 @@ gRPC performance metrics (latency to process `GetInfo`, etc)](https://github.com * [The channel-commit-interval is now clamped to a reasonable timeframe of 1h.](https://github.com/lightningnetwork/lnd/pull/6220) +* [A function in the gossiper `processNetworkAnnouncements` has been refactored for readability and for future deduplication efforts.](https://github.com/lightningnetwork/lnd/pull/6278) + # Contributors (Alphabetical Order) * 3nprob