Skip to content

Commit

Permalink
Merge pull request lightningnetwork#4804 from wpaulino/proper-gossip-…
Browse files Browse the repository at this point in the history
…query-reply-chunk-splitting

discovery: adhere to proper channel chunk splitting for ReplyChannelRange
  • Loading branch information
cfromknecht authored Dec 11, 2020
2 parents 62a5cdb + 871a6f1 commit d870bb5
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 246 deletions.
61 changes: 47 additions & 14 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"math"
"net"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -1704,12 +1705,25 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
return newChanIDs, nil
}

// BlockChannelRange represents a range of channels for a given block height.
type BlockChannelRange struct {
// Height is the height of the block all of the channels below were
// included in.
Height uint32

// Channels is the list of channels identified by their short ID
// representation known to us that were included in the block height
// above.
Channels []lnwire.ShortChannelID
}

// FilterChannelRange returns the channel ID's of all known channels which were
// mined in a block height within the passed range. This method can be used to
// quickly share with a peer the set of channels we know of within a particular
// range to catch them up after a period of time offline.
func (c *ChannelGraph) FilterChannelRange(startHeight, endHeight uint32) ([]uint64, error) {
var chanIDs []uint64
// mined in a block height within the passed range. The channel IDs are grouped
// by their common block height. This method can be used to quickly share with a
// peer the set of channels we know of within a particular range to catch them
// up after a period of time offline.
func (c *ChannelGraph) FilterChannelRange(startHeight,
endHeight uint32) ([]BlockChannelRange, error) {

startChanID := &lnwire.ShortChannelID{
BlockHeight: startHeight,
Expand All @@ -1728,6 +1742,7 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, endHeight uint32) ([]uint
byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())

var channelsPerBlock map[uint32][]lnwire.ShortChannelID
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
Expand All @@ -1742,33 +1757,51 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, endHeight uint32) ([]uint

// We'll now iterate through the database, and find each
// channel ID that resides within the specified range.
var cid uint64
for k, _ := cursor.Seek(chanIDStart[:]); k != nil &&
bytes.Compare(k, chanIDEnd[:]) <= 0; k, _ = cursor.Next() {

// This channel ID rests within the target range, so
// we'll convert it into an integer and add it to our
// returned set.
cid = byteOrder.Uint64(k)
chanIDs = append(chanIDs, cid)
// we'll add it to our returned set.
rawCid := byteOrder.Uint64(k)
cid := lnwire.NewShortChanIDFromInt(rawCid)
channelsPerBlock[cid.BlockHeight] = append(
channelsPerBlock[cid.BlockHeight], cid,
)
}

return nil
}, func() {
chanIDs = nil
channelsPerBlock = make(map[uint32][]lnwire.ShortChannelID)
})

switch {
// If we don't know of any channels yet, then there's nothing to
// filter, so we'll return an empty slice.
case err == ErrGraphNoEdgesFound:
return chanIDs, nil
case err == ErrGraphNoEdgesFound || len(channelsPerBlock) == 0:
return nil, nil

case err != nil:
return nil, err
}

return chanIDs, nil
// Return the channel ranges in ascending block height order.
blocks := make([]uint32, 0, len(channelsPerBlock))
for block := range channelsPerBlock {
blocks = append(blocks, block)
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i] < blocks[j]
})

channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
for _, block := range blocks {
channelRanges = append(channelRanges, BlockChannelRange{
Height: block,
Channels: channelsPerBlock[block],
})
}

return channelRanges, nil
}

// FetchChanInfos returns the set of channel edges that correspond to the passed
Expand Down
39 changes: 27 additions & 12 deletions channeldb/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1848,24 +1848,32 @@ func TestFilterChannelRange(t *testing.T) {
t.Fatalf("expected zero chans, instead got %v", len(resp))
}

// To start, we'll create a set of channels, each mined in a block 10
// To start, we'll create a set of channels, two mined in a block 10
// blocks after the prior one.
startHeight := uint32(100)
endHeight := startHeight
const numChans = 10
chanIDs := make([]uint64, 0, numChans)
for i := 0; i < numChans; i++ {
channelRanges := make([]BlockChannelRange, 0, numChans/2)
for i := 0; i < numChans/2; i++ {
chanHeight := endHeight
channel, chanID := createEdge(
uint32(chanHeight), uint32(i+1), 0, 0, node1, node2,
channel1, chanID1 := createEdge(
chanHeight, uint32(i+1), 0, 0, node1, node2,
)

if err := graph.AddChannelEdge(&channel); err != nil {
if err := graph.AddChannelEdge(&channel1); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}

chanIDs = append(chanIDs, chanID.ToUint64())
channel2, chanID2 := createEdge(
chanHeight, uint32(i+2), 0, 0, node1, node2,
)
if err := graph.AddChannelEdge(&channel2); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}

channelRanges = append(channelRanges, BlockChannelRange{
Height: chanHeight,
Channels: []lnwire.ShortChannelID{chanID1, chanID2},
})
endHeight += 10
}

Expand All @@ -1876,15 +1884,15 @@ func TestFilterChannelRange(t *testing.T) {
startHeight uint32
endHeight uint32

resp []uint64
resp []BlockChannelRange
}{
// If we query for the entire range, then we should get the same
// set of short channel IDs back.
{
startHeight: startHeight,
endHeight: endHeight,

resp: chanIDs,
resp: channelRanges,
},

// If we query for a range of channels right before our range, we
Expand All @@ -1900,7 +1908,7 @@ func TestFilterChannelRange(t *testing.T) {
startHeight: endHeight - 10,
endHeight: endHeight - 10,

resp: chanIDs[9:],
resp: channelRanges[4:],
},

// If we query for just the first height, we should only get a
Expand All @@ -1909,7 +1917,14 @@ func TestFilterChannelRange(t *testing.T) {
startHeight: startHeight,
endHeight: startHeight,

resp: chanIDs[:1],
resp: channelRanges[:1],
},

{
startHeight: startHeight + 10,
endHeight: endHeight - 10,

resp: channelRanges[1:5],
},
}
for i, queryCase := range queryCases {
Expand Down
28 changes: 9 additions & 19 deletions discovery/chan_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type ChannelGraphTimeSeries interface {
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error)

// FilterChannelRange returns the set of channels that we created
// between the start height and the end height. We'll use this to to a
// remote peer's QueryChannelRange message.
// between the start height and the end height. The channel IDs are
// grouped by their common block height. We'll use this to to a remote
// peer's QueryChannelRange message.
FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error)
startHeight, endHeight uint32) ([]channeldb.BlockChannelRange, error)

// FetchChanAnns returns a full set of channel announcements as well as
// their updates that match the set of specified short channel ID's.
Expand Down Expand Up @@ -203,26 +204,15 @@ func (c *ChanSeries) FilterKnownChanIDs(chain chainhash.Hash,
}

// FilterChannelRange returns the set of channels that we created between the
// start height and the end height. We'll use this respond to a remote peer's
// QueryChannelRange message.
// start height and the end height. The channel IDs are grouped by their common
// block height. We'll use this respond to a remote peer's QueryChannelRange
// message.
//
// NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *ChanSeries) FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) {
startHeight, endHeight uint32) ([]channeldb.BlockChannelRange, error) {

chansInRange, err := c.graph.FilterChannelRange(startHeight, endHeight)
if err != nil {
return nil, err
}

chanResp := make([]lnwire.ShortChannelID, 0, len(chansInRange))
for _, chanID := range chansInRange {
chanResp = append(
chanResp, lnwire.NewShortChanIDFromInt(chanID),
)
}

return chanResp, nil
return c.graph.FilterChannelRange(startHeight, endHeight)
}

// FetchChanAnns returns a full set of channel announcements as well as their
Expand Down
76 changes: 18 additions & 58 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,6 @@ type AuthenticatedGossiper struct {
// every new block height.
blockEpochs *chainntnfs.BlockEpochEvent

// prematureAnnouncements maps a block height to a set of network
// messages which are "premature" from our PoV. A message is premature
// if it claims to be anchored in a block which is beyond the current
// main chain tip as we know it. Premature network messages will be
// processed once the chain tip as we know it extends to/past the
// premature height.
//
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg

// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
Expand Down Expand Up @@ -338,21 +328,22 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32),
syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
}),
}

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
BestHeight: gossiper.latestHeight,
})

gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
NotifyWhenOnline: cfg.NotifyWhenOnline,
NotifyWhenOffline: cfg.NotifyWhenOffline,
Expand Down Expand Up @@ -1045,33 +1036,11 @@ func (d *AuthenticatedGossiper) networkHandler() {
d.Lock()
blockHeight := uint32(newBlock.Height)
d.bestHeight = blockHeight
d.Unlock()

log.Debugf("New block: height=%d, hash=%s", blockHeight,
newBlock.Hash)

// Next we check if we have any premature announcements
// for this height, if so, then we process them once
// more as normal announcements.
premature := d.prematureAnnouncements[blockHeight]
if len(premature) == 0 {
d.Unlock()
continue
}
delete(d.prematureAnnouncements, blockHeight)
d.Unlock()

log.Infof("Re-processing %v premature announcements "+
"for height %v", len(premature), blockHeight)

for _, ann := range premature {
emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
)
}
}

// The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements
// we've received since the last trickle tick.
Expand Down Expand Up @@ -1501,7 +1470,6 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) []networkMsg {

// isPremature *MUST* be called with the gossiper's lock held.
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
Expand Down Expand Up @@ -1593,18 +1561,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// to be fully verified once we advance forward in the chain.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only "+
"height %v is known",
msg.ShortChannelID.ToUint64(),
msg.ShortChannelID.BlockHeight,
d.bestHeight)

d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
nMsg,
)
d.Unlock()
return nil
}
Expand Down Expand Up @@ -1824,11 +1786,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"height %v, only height %v is known",
shortChanID, blockHeight,
d.bestHeight)

d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
nMsg,
)
d.Unlock()
return nil
}
Expand Down Expand Up @@ -2124,10 +2081,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// to other clients if this constraint was changed.
d.Lock()
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight],
nMsg,
)
log.Infof("Premature proof announcement, "+
"current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch",
Expand Down Expand Up @@ -2644,3 +2597,10 @@ func IsKeepAliveUpdate(update *lnwire.ChannelUpdate,
}
return true
}

// latestHeight returns the gossiper's latest height known of the chain.
func (d *AuthenticatedGossiper) latestHeight() uint32 {
d.Lock()
defer d.Unlock()
return d.bestHeight
}
Loading

0 comments on commit d870bb5

Please sign in to comment.