Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accompanying Pull Request for Bootstrapping Musa #116

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Coordinator[K, A]) advanceBootstrap(ctx context.Context, ev routing.Boo
bstate := c.bootstrap.Advance(ctx, ev)
switch st := bstate.(type) {
case *routing.StateBootstrapMessage[K, A]:
c.sendBootstrapFindNode(ctx, st.NodeID, st.QueryID, st.Stats)
c.sendBootstrapFindNode(ctx, st.Node, st.QueryID, st.Stats)

case *routing.StateBootstrapWaiting:
// bootstrap waiting for a message response, nothing to do
Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent)
state := c.pool.Advance(ctx, ev)
switch st := state.(type) {
case *query.StatePoolQueryMessage[K, A]:
c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats)
c.sendQueryMessage(ctx, st.ProtocolID, st.Node, st.Message, st.QueryID, st.Stats)
case *query.StatePoolWaitingAtCapacity:
// nothing to do except wait for message response or timeout
case *query.StatePoolWaitingWithCapacity:
Expand All @@ -296,7 +296,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent)
}
}

func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeInfo[K, A], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendQueryMessage")
defer span.End()

Expand All @@ -308,7 +308,7 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres
}

c.advancePool(ctx, &query.EventPoolMessageFailure[K]{
NodeID: to,
NodeID: to.ID(),
QueryID: queryID,
Error: err,
})
Expand Down Expand Up @@ -337,19 +337,19 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres
}

c.advancePool(ctx, &query.EventPoolMessageResponse[K, A]{
NodeID: to,
Node: to,
QueryID: queryID,
Response: resp,
})
}

err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
}

func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeID[K], queryID query.QueryID, stats query.QueryStats) {
func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeInfo[K, A], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapFindNode")
defer span.End()

Expand All @@ -361,7 +361,7 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No
}

c.advanceBootstrap(ctx, &routing.EventBootstrapMessageFailure[K]{
NodeID: to,
NodeID: to.ID(),
Error: err,
})
}
Expand Down Expand Up @@ -389,13 +389,13 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No
}

c.advanceBootstrap(ctx, &routing.EventBootstrapMessageResponse[K, A]{
NodeID: to,
Node: to,
Response: resp,
})
}

protoID, msg := c.findNodeFn(c.self)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
Expand Down Expand Up @@ -443,14 +443,14 @@ func (c *Coordinator[K, A]) sendIncludeFindNode(ctx context.Context, to kad.Node
func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error {
ctx, span := util.StartSpan(ctx, "Coordinator.StartQuery")
defer span.End()
knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20)
// knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20)

c.schedulePoolEvent(ctx, &query.EventPoolAddQuery[K, A]{
QueryID: queryID,
Target: msg.Target(),
ProtocolID: protocolID,
Message: msg,
KnownClosestNodes: knownClosestPeers,
KnownClosestNodes: nil,
})

return nil
Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K

// Bootstrap instructs the coordinator to begin bootstrapping the routing table.
// While bootstrap is in progress, no other queries will make progress.
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K]) error {
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeInfo[K, A]) error {
protoID, msg := c.findNodeFn(c.self)

c.scheduleBootstrapEvent(ctx, &routing.EventBootstrapStart[K, A]{
Expand All @@ -510,7 +510,7 @@ type KademliaEvent interface {
// response from a node.
type KademliaOutboundQueryProgressedEvent[K kad.Key[K], A kad.Address[A]] struct {
QueryID query.QueryID
NodeID kad.NodeID[K]
NodeID kad.NodeInfo[K, A]
Response kad.Response[K, A]
Stats query.QueryStats
}
Expand Down
4 changes: 1 addition & 3 deletions coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ func TestBootstrap(t *testing.T) {

queryID := query.QueryID("bootstrap")

seeds := []kad.NodeID[key.Key8]{
nodes[1].ID(),
}
seeds := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[1]}
err = c.Bootstrap(ctx, seeds)
require.NoError(t, err)

Expand Down
7 changes: 7 additions & 0 deletions internal/kadtest/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type Info[K kad.Key[K], A kad.Address[A]] struct {

var _ kad.NodeInfo[key.Key8, net.IP] = (*Info[key.Key8, net.IP])(nil)

func NewEmptyInfo[K kad.Key[K], A kad.Address[A]](id *ID[K]) *Info[K, A] {
return &Info[K, A]{
id: id,
addrs: []A{},
}
}

func NewInfo[K kad.Key[K], A kad.Address[A]](id *ID[K], addrs []A) *Info[K, A] {
return &Info[K, A]{
id: id,
Expand Down
50 changes: 26 additions & 24 deletions query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,55 @@ package query
import (
"context"

"github.com/plprobelab/go-kademlia/internal/kadtest"

"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/key/trie"
)

// A NodeIter iterates nodes according to some strategy.
type NodeIter[K kad.Key[K]] interface {
type NodeIter[K kad.Key[K], A kad.Address[A]] interface {
// Add adds node information to the iterator
Add(*NodeStatus[K])
Add(*NodeStatus[K, A])

// Find returns the node information corresponding to the given Kademlia key
Find(K) (*NodeStatus[K], bool)
Find(K) (*NodeStatus[K, A], bool)

// Each applies fn to each entry in the iterator in order. Each stops and returns true if fn returns true.
// Otherwise Each returns false when there are no further entries.
Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool
Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool
}

// A ClosestNodesIter iterates nodes in order of ascending distance from a key.
type ClosestNodesIter[K kad.Key[K]] struct {
type ClosestNodesIter[K kad.Key[K], A kad.Address[A]] struct {
// target is the key whose distance to a node determines the position of that node in the iterator.
target K

// nodelist holds the nodes discovered so far, ordered by increasing distance from the target.
nodes *trie.Trie[K, *NodeStatus[K]]
nodes *trie.Trie[K, *NodeStatus[K, A]]
}

var _ NodeIter[key.Key8] = (*ClosestNodesIter[key.Key8])(nil)
var _ NodeIter[key.Key8, kadtest.StrAddr] = (*ClosestNodesIter[key.Key8, kadtest.StrAddr])(nil)

// NewClosestNodesIter creates a new ClosestNodesIter
func NewClosestNodesIter[K kad.Key[K]](target K) *ClosestNodesIter[K] {
return &ClosestNodesIter[K]{
func NewClosestNodesIter[K kad.Key[K], A kad.Address[A]](target K) *ClosestNodesIter[K, A] {
return &ClosestNodesIter[K, A]{
target: target,
nodes: trie.New[K, *NodeStatus[K]](),
nodes: trie.New[K, *NodeStatus[K, A]](),
}
}

func (iter *ClosestNodesIter[K]) Add(ni *NodeStatus[K]) {
iter.nodes.Add(ni.NodeID.Key(), ni)
func (iter *ClosestNodesIter[K, A]) Add(ni *NodeStatus[K, A]) {
iter.nodes.Add(ni.Node.ID().Key(), ni)
}

func (iter *ClosestNodesIter[K]) Find(k K) (*NodeStatus[K], bool) {
func (iter *ClosestNodesIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) {
found, ni := trie.Find(iter.nodes, k)
return ni, found
}

func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool {
func (iter *ClosestNodesIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool {
// get all the nodes in order of distance from the target
// TODO: turn this into a walk or iterator on trie.Trie
entries := trie.Closest(iter.nodes, iter.target, iter.nodes.Size())
Expand All @@ -63,37 +65,37 @@ func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Conte
}

// A SequentialIter iterates nodes in the order they were added to the iterator.
type SequentialIter[K kad.Key[K]] struct {
type SequentialIter[K kad.Key[K], A kad.Address[A]] struct {
// nodelist holds the nodes discovered so far, ordered by increasing distance from the target.
nodes []*NodeStatus[K]
nodes []*NodeStatus[K, A]
}

var _ NodeIter[key.Key8] = (*SequentialIter[key.Key8])(nil)
var _ NodeIter[key.Key8, kadtest.StrAddr] = (*SequentialIter[key.Key8, kadtest.StrAddr])(nil)

// NewSequentialIter creates a new SequentialIter
func NewSequentialIter[K kad.Key[K]]() *SequentialIter[K] {
return &SequentialIter[K]{
nodes: make([]*NodeStatus[K], 0),
func NewSequentialIter[K kad.Key[K], A kad.Address[A]]() *SequentialIter[K, A] {
return &SequentialIter[K, A]{
nodes: make([]*NodeStatus[K, A], 0),
}
}

func (iter *SequentialIter[K]) Add(ni *NodeStatus[K]) {
func (iter *SequentialIter[K, A]) Add(ni *NodeStatus[K, A]) {
iter.nodes = append(iter.nodes, ni)
}

// Find returns the node information corresponding to the given Kademlia key. It uses a linear
// search which makes it unsuitable for large numbers of entries.
func (iter *SequentialIter[K]) Find(k K) (*NodeStatus[K], bool) {
func (iter *SequentialIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) {
for i := range iter.nodes {
if key.Equal(k, iter.nodes[i].NodeID.Key()) {
if key.Equal(k, iter.nodes[i].Node.ID().Key()) {
return iter.nodes[i], true
}
}

return nil, false
}

func (iter *SequentialIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool {
func (iter *SequentialIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool {
for _, ns := range iter.nodes {
if fn(ctx, ns) {
return true
Expand Down
28 changes: 14 additions & 14 deletions query/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ func TestClosestNodesIter(t *testing.T) {
require.True(t, target.Xor(b.Key()).Compare(target.Xor(c.Key())) == -1)
require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1)

iter := NewClosestNodesIter(target)
iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target)

// add nodes in "random order"

iter.Add(&NodeStatus[key.Key8]{NodeID: b})
iter.Add(&NodeStatus[key.Key8]{NodeID: d})
iter.Add(&NodeStatus[key.Key8]{NodeID: a})
iter.Add(&NodeStatus[key.Key8]{NodeID: c})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c)})

// Each should iterate in order of distance from target

distances := make([]key.Key8, 0, 4)
iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8]) bool {
distances = append(distances, target.Xor(ns.NodeID.Key()))
iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8, kadtest.StrAddr]) bool {
distances = append(distances, target.Xor(ns.Node.ID().Key()))
return false
})

Expand All @@ -48,20 +48,20 @@ func TestSequentialIter(t *testing.T) {
c := kadtest.NewID(key.Key8(0b00010000)) // 16
d := kadtest.NewID(key.Key8(0b00100000)) // 32

iter := NewSequentialIter[key.Key8]()
iter := NewSequentialIter[key.Key8, kadtest.StrAddr]()

// add nodes in "random order"

iter.Add(&NodeStatus[key.Key8]{NodeID: b})
iter.Add(&NodeStatus[key.Key8]{NodeID: d})
iter.Add(&NodeStatus[key.Key8]{NodeID: a})
iter.Add(&NodeStatus[key.Key8]{NodeID: c})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a)})
iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c)})

// Each should iterate in order the nodes were added to the iiterator

order := make([]key.Key8, 0, 4)
iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8]) bool {
order = append(order, ns.NodeID.Key())
iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8, kadtest.StrAddr]) bool {
order = append(order, ns.Node.ID().Key())
return false
})

Expand Down
6 changes: 3 additions & 3 deletions query/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/plprobelab/go-kademlia/kad"
)

type NodeStatus[K kad.Key[K]] struct {
NodeID kad.NodeID[K]
State NodeState
type NodeStatus[K kad.Key[K], A kad.Address[A]] struct {
Node kad.NodeInfo[K, A]
State NodeState
}

type NodeState interface {
Expand Down
Loading
Loading