diff --git a/coord/coordinator.go b/coord/coordinator.go index a7a3955..0736c83 100644 --- a/coord/coordinator.go +++ b/coord/coordinator.go @@ -218,7 +218,7 @@ func (c *Coordinator[K, A]) advanceBootstrap(ctx context.Context, ev routing.Boo bstate := c.bootstrap.Advance(ctx, ev) switch st := bstate.(type) { case *routing.StateBootstrapMessage[K, A]: - c.sendBootstrapFindNode(ctx, st.NodeID, st.QueryID, st.Stats) + c.sendBootstrapFindNode(ctx, st.Node, st.QueryID, st.Stats) case *routing.StateBootstrapWaiting: // bootstrap waiting for a message response, nothing to do @@ -277,7 +277,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent) state := c.pool.Advance(ctx, ev) switch st := state.(type) { case *query.StatePoolQueryMessage[K, A]: - c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + c.sendQueryMessage(ctx, st.ProtocolID, st.Node, st.Message, st.QueryID, st.Stats) case *query.StatePoolWaitingAtCapacity: // nothing to do except wait for message response or timeout case *query.StatePoolWaitingWithCapacity: @@ -296,7 +296,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent) } } -func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { +func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeInfo[K, A], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { ctx, span := util.StartSpan(ctx, "Coordinator.sendQueryMessage") defer span.End() @@ -308,7 +308,7 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres } c.advancePool(ctx, &query.EventPoolMessageFailure[K]{ - NodeID: to, + NodeID: to.ID(), QueryID: queryID, Error: err, }) @@ -337,19 +337,19 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres } c.advancePool(ctx, &query.EventPoolMessageResponse[K, A]{ - NodeID: to, + Node: to, QueryID: queryID, Response: resp, }) } - err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) + err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse) if err != nil { onSendError(ctx, err) } } -func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeID[K], queryID query.QueryID, stats query.QueryStats) { +func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeInfo[K, A], queryID query.QueryID, stats query.QueryStats) { ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapFindNode") defer span.End() @@ -361,7 +361,7 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No } c.advanceBootstrap(ctx, &routing.EventBootstrapMessageFailure[K]{ - NodeID: to, + NodeID: to.ID(), Error: err, }) } @@ -389,13 +389,13 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No } c.advanceBootstrap(ctx, &routing.EventBootstrapMessageResponse[K, A]{ - NodeID: to, + Node: to, Response: resp, }) } protoID, msg := c.findNodeFn(c.self) - err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) + err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse) if err != nil { onSendError(ctx, err) } @@ -443,14 +443,14 @@ func (c *Coordinator[K, A]) sendIncludeFindNode(ctx context.Context, to kad.Node func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error { ctx, span := util.StartSpan(ctx, "Coordinator.StartQuery") defer span.End() - knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20) + // knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20) c.schedulePoolEvent(ctx, &query.EventPoolAddQuery[K, A]{ QueryID: queryID, Target: msg.Target(), ProtocolID: protocolID, Message: msg, - KnownClosestNodes: knownClosestPeers, + KnownClosestNodes: nil, }) return nil @@ -488,7 +488,7 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K // Bootstrap instructs the coordinator to begin bootstrapping the routing table. // While bootstrap is in progress, no other queries will make progress. -func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K]) error { +func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeInfo[K, A]) error { protoID, msg := c.findNodeFn(c.self) c.scheduleBootstrapEvent(ctx, &routing.EventBootstrapStart[K, A]{ @@ -510,7 +510,7 @@ type KademliaEvent interface { // response from a node. type KademliaOutboundQueryProgressedEvent[K kad.Key[K], A kad.Address[A]] struct { QueryID query.QueryID - NodeID kad.NodeID[K] + NodeID kad.NodeInfo[K, A] Response kad.Response[K, A] Stats query.QueryStats } diff --git a/coord/coordinator_test.go b/coord/coordinator_test.go index 4781593..7519aaf 100644 --- a/coord/coordinator_test.go +++ b/coord/coordinator_test.go @@ -314,9 +314,7 @@ func TestBootstrap(t *testing.T) { queryID := query.QueryID("bootstrap") - seeds := []kad.NodeID[key.Key8]{ - nodes[1].ID(), - } + seeds := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[1]} err = c.Bootstrap(ctx, seeds) require.NoError(t, err) diff --git a/internal/kadtest/ids.go b/internal/kadtest/ids.go index 6ce09dd..d161806 100644 --- a/internal/kadtest/ids.go +++ b/internal/kadtest/ids.go @@ -73,6 +73,13 @@ type Info[K kad.Key[K], A kad.Address[A]] struct { var _ kad.NodeInfo[key.Key8, net.IP] = (*Info[key.Key8, net.IP])(nil) +func NewEmptyInfo[K kad.Key[K], A kad.Address[A]](id *ID[K]) *Info[K, A] { + return &Info[K, A]{ + id: id, + addrs: []A{}, + } +} + func NewInfo[K kad.Key[K], A kad.Address[A]](id *ID[K], addrs []A) *Info[K, A] { return &Info[K, A]{ id: id, diff --git a/query/iter.go b/query/iter.go index eaa11b9..4a59eb5 100644 --- a/query/iter.go +++ b/query/iter.go @@ -3,53 +3,55 @@ package query import ( "context" + "github.com/plprobelab/go-kademlia/internal/kadtest" + "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/key/trie" ) // A NodeIter iterates nodes according to some strategy. -type NodeIter[K kad.Key[K]] interface { +type NodeIter[K kad.Key[K], A kad.Address[A]] interface { // Add adds node information to the iterator - Add(*NodeStatus[K]) + Add(*NodeStatus[K, A]) // Find returns the node information corresponding to the given Kademlia key - Find(K) (*NodeStatus[K], bool) + Find(K) (*NodeStatus[K, A], bool) // Each applies fn to each entry in the iterator in order. Each stops and returns true if fn returns true. // Otherwise Each returns false when there are no further entries. - Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool + Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool } // A ClosestNodesIter iterates nodes in order of ascending distance from a key. -type ClosestNodesIter[K kad.Key[K]] struct { +type ClosestNodesIter[K kad.Key[K], A kad.Address[A]] struct { // target is the key whose distance to a node determines the position of that node in the iterator. target K // nodelist holds the nodes discovered so far, ordered by increasing distance from the target. - nodes *trie.Trie[K, *NodeStatus[K]] + nodes *trie.Trie[K, *NodeStatus[K, A]] } -var _ NodeIter[key.Key8] = (*ClosestNodesIter[key.Key8])(nil) +var _ NodeIter[key.Key8, kadtest.StrAddr] = (*ClosestNodesIter[key.Key8, kadtest.StrAddr])(nil) // NewClosestNodesIter creates a new ClosestNodesIter -func NewClosestNodesIter[K kad.Key[K]](target K) *ClosestNodesIter[K] { - return &ClosestNodesIter[K]{ +func NewClosestNodesIter[K kad.Key[K], A kad.Address[A]](target K) *ClosestNodesIter[K, A] { + return &ClosestNodesIter[K, A]{ target: target, - nodes: trie.New[K, *NodeStatus[K]](), + nodes: trie.New[K, *NodeStatus[K, A]](), } } -func (iter *ClosestNodesIter[K]) Add(ni *NodeStatus[K]) { - iter.nodes.Add(ni.NodeID.Key(), ni) +func (iter *ClosestNodesIter[K, A]) Add(ni *NodeStatus[K, A]) { + iter.nodes.Add(ni.Node.ID().Key(), ni) } -func (iter *ClosestNodesIter[K]) Find(k K) (*NodeStatus[K], bool) { +func (iter *ClosestNodesIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) { found, ni := trie.Find(iter.nodes, k) return ni, found } -func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool { +func (iter *ClosestNodesIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool { // get all the nodes in order of distance from the target // TODO: turn this into a walk or iterator on trie.Trie entries := trie.Closest(iter.nodes, iter.target, iter.nodes.Size()) @@ -63,29 +65,29 @@ func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Conte } // A SequentialIter iterates nodes in the order they were added to the iterator. -type SequentialIter[K kad.Key[K]] struct { +type SequentialIter[K kad.Key[K], A kad.Address[A]] struct { // nodelist holds the nodes discovered so far, ordered by increasing distance from the target. - nodes []*NodeStatus[K] + nodes []*NodeStatus[K, A] } -var _ NodeIter[key.Key8] = (*SequentialIter[key.Key8])(nil) +var _ NodeIter[key.Key8, kadtest.StrAddr] = (*SequentialIter[key.Key8, kadtest.StrAddr])(nil) // NewSequentialIter creates a new SequentialIter -func NewSequentialIter[K kad.Key[K]]() *SequentialIter[K] { - return &SequentialIter[K]{ - nodes: make([]*NodeStatus[K], 0), +func NewSequentialIter[K kad.Key[K], A kad.Address[A]]() *SequentialIter[K, A] { + return &SequentialIter[K, A]{ + nodes: make([]*NodeStatus[K, A], 0), } } -func (iter *SequentialIter[K]) Add(ni *NodeStatus[K]) { +func (iter *SequentialIter[K, A]) Add(ni *NodeStatus[K, A]) { iter.nodes = append(iter.nodes, ni) } // Find returns the node information corresponding to the given Kademlia key. It uses a linear // search which makes it unsuitable for large numbers of entries. -func (iter *SequentialIter[K]) Find(k K) (*NodeStatus[K], bool) { +func (iter *SequentialIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) { for i := range iter.nodes { - if key.Equal(k, iter.nodes[i].NodeID.Key()) { + if key.Equal(k, iter.nodes[i].Node.ID().Key()) { return iter.nodes[i], true } } @@ -93,7 +95,7 @@ func (iter *SequentialIter[K]) Find(k K) (*NodeStatus[K], bool) { return nil, false } -func (iter *SequentialIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool { +func (iter *SequentialIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool { for _, ns := range iter.nodes { if fn(ctx, ns) { return true diff --git a/query/iter_test.go b/query/iter_test.go index 6cecb29..cdd71cc 100644 --- a/query/iter_test.go +++ b/query/iter_test.go @@ -22,20 +22,20 @@ func TestClosestNodesIter(t *testing.T) { require.True(t, target.Xor(b.Key()).Compare(target.Xor(c.Key())) == -1) require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) // add nodes in "random order" - iter.Add(&NodeStatus[key.Key8]{NodeID: b}) - iter.Add(&NodeStatus[key.Key8]{NodeID: d}) - iter.Add(&NodeStatus[key.Key8]{NodeID: a}) - iter.Add(&NodeStatus[key.Key8]{NodeID: c}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c)}) // Each should iterate in order of distance from target distances := make([]key.Key8, 0, 4) - iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8]) bool { - distances = append(distances, target.Xor(ns.NodeID.Key())) + iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8, kadtest.StrAddr]) bool { + distances = append(distances, target.Xor(ns.Node.ID().Key())) return false }) @@ -48,20 +48,20 @@ func TestSequentialIter(t *testing.T) { c := kadtest.NewID(key.Key8(0b00010000)) // 16 d := kadtest.NewID(key.Key8(0b00100000)) // 32 - iter := NewSequentialIter[key.Key8]() + iter := NewSequentialIter[key.Key8, kadtest.StrAddr]() // add nodes in "random order" - iter.Add(&NodeStatus[key.Key8]{NodeID: b}) - iter.Add(&NodeStatus[key.Key8]{NodeID: d}) - iter.Add(&NodeStatus[key.Key8]{NodeID: a}) - iter.Add(&NodeStatus[key.Key8]{NodeID: c}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a)}) + iter.Add(&NodeStatus[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c)}) // Each should iterate in order the nodes were added to the iiterator order := make([]key.Key8, 0, 4) - iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8]) bool { - order = append(order, ns.NodeID.Key()) + iter.Each(context.Background(), func(ctx context.Context, ns *NodeStatus[key.Key8, kadtest.StrAddr]) bool { + order = append(order, ns.Node.ID().Key()) return false }) diff --git a/query/node.go b/query/node.go index 0fd5960..bf5a682 100644 --- a/query/node.go +++ b/query/node.go @@ -6,9 +6,9 @@ import ( "github.com/plprobelab/go-kademlia/kad" ) -type NodeStatus[K kad.Key[K]] struct { - NodeID kad.NodeID[K] - State NodeState +type NodeStatus[K kad.Key[K], A kad.Address[A]] struct { + Node kad.NodeInfo[K, A] + State NodeState } type NodeState interface { diff --git a/query/pool.go b/query/pool.go index 7e6a469..4f473e3 100644 --- a/query/pool.go +++ b/query/pool.go @@ -135,7 +135,7 @@ func (p *Pool[K, A]) Advance(ctx context.Context, ev PoolEvent) PoolState { case *EventPoolMessageResponse[K, A]: if qry, ok := p.queryIndex[tev.QueryID]; ok { state, terminal := p.advanceQuery(ctx, qry, &EventQueryMessageResponse[K, A]{ - NodeID: tev.NodeID, + Node: tev.Node, Response: tev.Response, }) if terminal { @@ -197,7 +197,7 @@ func (p *Pool[K, A]) advanceQuery(ctx context.Context, qry *Query[K, A], qev Que return &StatePoolQueryMessage[K, A]{ QueryID: st.QueryID, Stats: st.Stats, - NodeID: st.NodeID, + Node: st.Node, ProtocolID: st.ProtocolID, Message: st.Message, }, true @@ -247,18 +247,18 @@ func (p *Pool[K, A]) removeQuery(queryID QueryID) { // addQuery adds a query to the pool, returning the new query id // TODO: remove target argument and use msg.Target -func (p *Pool[K, A]) addQuery(ctx context.Context, queryID QueryID, target K, protocolID address.ProtocolID, msg kad.Request[K, A], knownClosestNodes []kad.NodeID[K]) error { +func (p *Pool[K, A]) addQuery(ctx context.Context, queryID QueryID, target K, protocolID address.ProtocolID, msg kad.Request[K, A], knownClosestNodes []kad.NodeInfo[K, A]) error { if _, exists := p.queryIndex[queryID]; exists { return fmt.Errorf("query id already in use") } - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[K, A](target) qryCfg := DefaultQueryConfig[K]() qryCfg.Clock = p.cfg.Clock qryCfg.Concurrency = p.cfg.QueryConcurrency qryCfg.RequestTimeout = p.cfg.RequestTimeout - qry, err := NewQuery[K](p.self, queryID, protocolID, msg, iter, knownClosestNodes, qryCfg) + qry, err := NewQuery[K, A](p.self, queryID, protocolID, msg, iter, knownClosestNodes, qryCfg) if err != nil { return fmt.Errorf("new query: %w", err) } @@ -281,7 +281,7 @@ type StatePoolIdle struct{} // StatePoolQueryMessage indicates that a pool query is waiting to message a node. type StatePoolQueryMessage[K kad.Key[K], A kad.Address[A]] struct { QueryID QueryID - NodeID kad.NodeID[K] + Node kad.NodeInfo[K, A] ProtocolID address.ProtocolID Message kad.Request[K, A] Stats QueryStats @@ -322,11 +322,11 @@ type PoolEvent interface { // EventPoolAddQuery is an event that attempts to add a new query type EventPoolAddQuery[K kad.Key[K], A kad.Address[A]] struct { - QueryID QueryID // the id to use for the new query - Target K // the target key for the query - ProtocolID address.ProtocolID // the protocol that defines how the message should be interpreted - Message kad.Request[K, A] // the message the query should send to each node it traverses - KnownClosestNodes []kad.NodeID[K] // an initial set of close nodes the query should use + QueryID QueryID // the id to use for the new query + Target K // the target key for the query + ProtocolID address.ProtocolID // the protocol that defines how the message should be interpreted + Message kad.Request[K, A] // the message the query should send to each node it traverses + KnownClosestNodes []kad.NodeInfo[K, A] // an initial set of close nodes the query should use } // EventPoolStopQuery notifies a pool to stop a query. @@ -337,7 +337,7 @@ type EventPoolStopQuery struct { // EventPoolMessageResponse notifies a pool that a query that a sent message has received a successful response. type EventPoolMessageResponse[K kad.Key[K], A kad.Address[A]] struct { QueryID QueryID // the id of the query that sent the message - NodeID kad.NodeID[K] // the node the message was sent to + Node kad.NodeInfo[K, A] // the node the message was sent to Response kad.Response[K, A] // the message response sent by the node } diff --git a/query/pool_test.go b/query/pool_test.go index 3e8da96..d326337 100644 --- a/query/pool_test.go +++ b/query/pool_test.go @@ -113,11 +113,13 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) { // first thing the new pool should do is start the query state := p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID, - Target: target, - ProtocolID: protocolID, - Message: msg, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + QueryID: queryID, + Target: target, + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) @@ -128,7 +130,7 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) { require.Equal(t, queryID, st.QueryID) // the query should attempt to contact the node it was given - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // with the correct protocol ID require.Equal(t, protocolID, st.ProtocolID) @@ -160,23 +162,25 @@ func TestPoolMessageResponse(t *testing.T) { // first thing the new pool should do is start the query state := p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID, - Target: target, - ProtocolID: protocolID, - Message: msg, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + QueryID: queryID, + Target: target, + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) // the query should attempt to contact the node it was given st := state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // notify query that node was contacted successfully, but no closer nodes state = p.Advance(ctx, &EventPoolMessageResponse[key.Key8, kadtest.StrAddr]{ QueryID: queryID, - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // pool should respond that query has finished @@ -212,74 +216,84 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) { // Add the first query state := p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID1, - Target: target, - ProtocolID: protocolID, - Message: msg1, - KnownClosestNodes: []kad.NodeID[key.Key8]{a, b, c, d}, + QueryID: queryID1, + Target: target, + ProtocolID: protocolID, + Message: msg1, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + }, }) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) // the first query should attempt to contact the node it was given st := state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) msg2 := kadtest.NewRequest("2", target) queryID2 := QueryID("2") // Add the second query state = p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID2, - Target: target, - ProtocolID: protocolID, - Message: msg2, - KnownClosestNodes: []kad.NodeID[key.Key8]{a, b, c, d}, + QueryID: queryID2, + Target: target, + ProtocolID: protocolID, + Message: msg2, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + }, }) // the first query should continue its operation in preference to starting the new query require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // advance the pool again, the first query should continue its operation in preference to starting the new query state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) // advance the pool again, the first query is at capacity so the second query can start state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID2, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // notify first query that node was contacted successfully, but no closer nodes state = p.Advance(ctx, &EventPoolMessageResponse[key.Key8, kadtest.StrAddr]{ QueryID: queryID1, - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // first query starts a new message request require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) // notify first query that next node was contacted successfully, but no closer nodes state = p.Advance(ctx, &EventPoolMessageResponse[key.Key8, kadtest.StrAddr]{ QueryID: queryID1, - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), }) // first query is out of nodes to try so second query can proceed require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID2, st.QueryID) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) } func TestPoolRespectsConcurrency(t *testing.T) { @@ -304,47 +318,53 @@ func TestPoolRespectsConcurrency(t *testing.T) { // Add the first query state := p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID1, - Target: target, - ProtocolID: protocolID, - Message: msg1, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + QueryID: queryID1, + Target: target, + ProtocolID: protocolID, + Message: msg1, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) // the first query should attempt to contact the node it was given st := state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) msg2 := kadtest.NewRequest("2", target) queryID2 := QueryID("2") // Add the second query state = p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID2, - Target: target, - ProtocolID: protocolID, - Message: msg2, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + QueryID: queryID2, + Target: target, + ProtocolID: protocolID, + Message: msg2, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) // the second query should start since the first query has a request in flight require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID2, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) msg3 := kadtest.NewRequest("3", target) queryID3 := QueryID("3") // Add a third query state = p.Advance(ctx, &EventPoolAddQuery[key.Key8, kadtest.StrAddr]{ - QueryID: queryID3, - Target: target, - ProtocolID: protocolID, - Message: msg3, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + QueryID: queryID3, + Target: target, + ProtocolID: protocolID, + Message: msg3, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) // the third query should wait since the pool has reached maximum concurrency @@ -353,7 +373,7 @@ func TestPoolRespectsConcurrency(t *testing.T) { // notify first query that next node was contacted successfully, but no closer nodes state = p.Advance(ctx, &EventPoolMessageResponse[key.Key8, kadtest.StrAddr]{ QueryID: queryID1, - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // first query is out of nodes so it has finished @@ -366,5 +386,5 @@ func TestPoolRespectsConcurrency(t *testing.T) { require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID3, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) } diff --git a/query/query.go b/query/query.go index f637f21..7b50dc0 100644 --- a/query/query.go +++ b/query/query.go @@ -40,7 +40,7 @@ type StateQueryFinished struct { type StateQueryWaitingMessage[K kad.Key[K], A kad.Address[A]] struct { QueryID QueryID Stats QueryStats - NodeID kad.NodeID[K] + Node kad.NodeInfo[K, A] ProtocolID address.ProtocolID Message kad.Request[K, A] } @@ -72,7 +72,7 @@ type EventQueryCancel struct{} // EventQueryMessageResponse notifies a query that an attempt to send a message has received a successful response. type EventQueryMessageResponse[K kad.Key[K], A kad.Address[A]] struct { - NodeID kad.NodeID[K] // the node the message was sent to + Node kad.NodeInfo[K, A] // the node the message was sent to Response kad.Response[K, A] // the message response sent by the node } @@ -142,19 +142,19 @@ type Query[K kad.Key[K], A kad.Address[A]] struct { // cfg is a copy of the optional configuration supplied to the query cfg QueryConfig[K] - iter NodeIter[K] + iter NodeIter[K, A] protocolID address.ProtocolID msg kad.Request[K, A] stats QueryStats - // finished indicates that that the query has completed its work or has been stopped. + // finished indicates that the query has completed its work or has been stopped. finished bool // inFlight is number of requests in flight, will be <= concurrency inFlight int } -func NewQuery[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], id QueryID, protocolID address.ProtocolID, msg kad.Request[K, A], iter NodeIter[K], knownClosestNodes []kad.NodeID[K], cfg *QueryConfig[K]) (*Query[K, A], error) { +func NewQuery[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], id QueryID, protocolID address.ProtocolID, msg kad.Request[K, A], iter NodeIter[K, A], knownClosestNodes []kad.NodeInfo[K, A], cfg *QueryConfig[K]) (*Query[K, A], error) { if cfg == nil { cfg = DefaultQueryConfig[K]() } else if err := cfg.Validate(); err != nil { @@ -163,12 +163,13 @@ func NewQuery[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], id QueryID, pr for _, node := range knownClosestNodes { // exclude self from closest nodes - if key.Equal(node.Key(), self.Key()) { + if key.Equal(node.ID().Key(), self.Key()) { continue } - iter.Add(&NodeStatus[K]{ - NodeID: node, - State: &StateNodeNotContacted{}, + + iter.Add(&NodeStatus[K, A]{ + Node: node, + State: &StateNodeNotContacted{}, }) } @@ -200,7 +201,7 @@ func (q *Query[K, A]) Advance(ctx context.Context, ev QueryEvent) QueryState { Stats: q.stats, } case *EventQueryMessageResponse[K, A]: - q.onMessageResponse(ctx, tev.NodeID, tev.Response) + q.onMessageResponse(ctx, tev.Node, tev.Response) case *EventQueryMessageFailure[K]: q.onMessageFailure(ctx, tev.NodeID) case nil: @@ -225,7 +226,7 @@ func (q *Query[K, A]) Advance(ctx context.Context, ev QueryEvent) QueryState { var returnState QueryState - q.iter.Each(ctx, func(ctx context.Context, ni *NodeStatus[K]) bool { + q.iter.Each(ctx, func(ctx context.Context, ni *NodeStatus[K, A]) bool { switch st := ni.State.(type) { case *StateNodeWaiting: if q.cfg.Clock.Now().After(st.Deadline) { @@ -267,7 +268,7 @@ func (q *Query[K, A]) Advance(ctx context.Context, ev QueryEvent) QueryState { q.stats.Start = q.cfg.Clock.Now() } returnState = &StateQueryWaitingMessage[K, A]{ - NodeID: ni.NodeID, + Node: ni.Node, QueryID: q.id, Stats: q.stats, ProtocolID: q.protocolID, @@ -321,8 +322,8 @@ func (q *Query[K, A]) markFinished() { } // onMessageResponse processes the result of a successful response received from a node. -func (q *Query[K, A]) onMessageResponse(ctx context.Context, node kad.NodeID[K], resp kad.Response[K, A]) { - ni, found := q.iter.Find(node.Key()) +func (q *Query[K, A]) onMessageResponse(ctx context.Context, node kad.NodeInfo[K, A], resp kad.Response[K, A]) { + ni, found := q.iter.Find(node.ID().Key()) if !found { // got a rogue message return @@ -354,9 +355,9 @@ func (q *Query[K, A]) onMessageResponse(ctx context.Context, node kad.NodeID[K], if key.Equal(info.ID().Key(), q.self.Key()) { continue } - q.iter.Add(&NodeStatus[K]{ - NodeID: info.ID(), - State: &StateNodeNotContacted{}, + q.iter.Add(&NodeStatus[K, A]{ + Node: info, + State: &StateNodeNotContacted{}, }) } } diff --git a/query/query_test.go b/query/query_test.go index 25760c1..1086c01 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -55,14 +55,14 @@ func TestQueryMessagesNode(t *testing.T) { ctx := context.Background() target := key.Key8(0b00000001) - a := kadtest.NewID(key.Key8(0b00000100)) // 4 + a := kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](kadtest.NewID(key.Key8(0b00000100))) // 4 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{a} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{a} clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -83,7 +83,7 @@ func TestQueryMessagesNode(t *testing.T) { // check that we are messaging the correct node with the right message st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID, st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a.ID(), st.Node.ID()) require.Equal(t, protocolID, st.ProtocolID) require.Equal(t, msg, st.Message) require.Equal(t, clk.Now(), st.Stats.Start) @@ -109,13 +109,14 @@ func TestQueryMessagesNearest(t *testing.T) { require.Less(t, target.Xor(near.Key()), target.Xor(far.Key())) // knownNodes are in "random" order with furthest before nearest - knownNodes := []kad.NodeID[key.Key8]{ - far, - near, + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](far), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](near), } + clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -135,7 +136,7 @@ func TestQueryMessagesNearest(t *testing.T) { // check that we are contacting the nearest node first st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, near, st.NodeID) + require.Equal(t, near, st.Node.ID()) } func TestQueryCancelFinishesQuery(t *testing.T) { @@ -145,11 +146,13 @@ func TestQueryCancelFinishesQuery(t *testing.T) { a := kadtest.NewID(key.Key8(0b00000100)) // 4 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{a} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -192,9 +195,9 @@ func TestQueryNoClosest(t *testing.T) { target := key.Key8(0b00000011) // no known nodes to start with - knownNodes := []kad.NodeID[key.Key8]{} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{} - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) clk := clock.NewMock() cfg := DefaultQueryConfig[key.Key8]() @@ -237,11 +240,15 @@ func TestQueryWaitsAtCapacity(t *testing.T) { c := kadtest.NewID(key.Key8(0b00010000)) // 16 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{a, b, c} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -259,14 +266,14 @@ func TestQueryWaitsAtCapacity(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) require.Equal(t, 1, st.Stats.Requests) // advancing sends the message to the next node state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) require.Equal(t, 2, st.Stats.Requests) // advancing now reports that the query is waiting at capacity since there are 2 messages in flight @@ -292,11 +299,16 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) // knownNodes are in "random" order - knownNodes := []kad.NodeID[key.Key8]{b, c, a, d} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -315,7 +327,7 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) stwm := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 1, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -328,7 +340,7 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 2, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -341,7 +353,7 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 3, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -365,7 +377,7 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 4, stwm.Stats.Requests) @@ -400,11 +412,16 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) // knownNodes are in "random" order - knownNodes := []kad.NodeID[key.Key8]{b, c, a, d} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -422,7 +439,7 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) stwm := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 1, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -432,7 +449,7 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 2, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -442,7 +459,7 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 3, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -453,10 +470,12 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { require.IsType(t, &StateQueryWaitingAtCapacity{}, state) // notify query that first node was contacted successfully, now node d can be contacted - state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{NodeID: a}) + state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) stwm = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 4, stwm.Stats.Requests) require.Equal(t, 1, stwm.Stats.Success) @@ -486,11 +505,13 @@ func TestQueryCloserNodesAreAddedToIteration(t *testing.T) { require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{d} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -508,7 +529,7 @@ func TestQueryCloserNodesAreAddedToIteration(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) // advancing reports query has capacity state = qry.Advance(ctx, nil) @@ -516,7 +537,7 @@ func TestQueryCloserNodesAreAddedToIteration(t *testing.T) { // notify query that first node was contacted successfully, with closer nodes state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: d, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), Response: kadtest.NewResponse("resp_d", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(b, []kadtest.StrAddr{"addr_b"}), kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), @@ -526,7 +547,7 @@ func TestQueryCloserNodesAreAddedToIteration(t *testing.T) { // query should contact the next nearest uncontacted node st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) } func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { @@ -544,11 +565,14 @@ func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{d, a} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -566,13 +590,13 @@ func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // next the query attempts to contact second nearest node state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) // advancing reports query has no capacity state = qry.Advance(ctx, nil) @@ -580,7 +604,7 @@ func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { // notify query that second node was contacted successfully, with closer nodes state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: d, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), Response: kadtest.NewResponse("resp_d", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(b, []kadtest.StrAddr{"addr_b"}), kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), @@ -590,7 +614,7 @@ func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { // query should contact the next nearest uncontacted node, which is b st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) } func TestQueryCancelFinishesIteration(t *testing.T) { @@ -600,11 +624,13 @@ func TestQueryCancelFinishesIteration(t *testing.T) { a := kadtest.NewID(key.Key8(0b00000100)) // 4 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{a} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -622,7 +648,7 @@ func TestQueryCancelFinishesIteration(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // cancel the query so it is now finished state = qry.Advance(ctx, &EventQueryCancel{}) @@ -640,11 +666,13 @@ func TestQueryFinishedIgnoresLaterEvents(t *testing.T) { b := kadtest.NewID(key.Key8(0b00001000)) // 8 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{b} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -662,7 +690,7 @@ func TestQueryFinishedIgnoresLaterEvents(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // cancel the query so it is now finished state = qry.Advance(ctx, &EventQueryCancel{}) @@ -676,7 +704,7 @@ func TestQueryFinishedIgnoresLaterEvents(t *testing.T) { // notify query that second node was contacted successfully, with closer nodes state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), Response: kadtest.NewResponse("resp_b", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), }), @@ -701,11 +729,13 @@ func TestQueryWithCloserIterIgnoresMessagesFromUnknownNodes(t *testing.T) { c := kadtest.NewID(key.Key8(0b00010000)) // 16 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{c} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -723,7 +753,7 @@ func TestQueryWithCloserIterIgnoresMessagesFromUnknownNodes(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) stwm := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, 1, stwm.Stats.Requests) require.Equal(t, 0, stwm.Stats.Success) @@ -731,7 +761,7 @@ func TestQueryWithCloserIterIgnoresMessagesFromUnknownNodes(t *testing.T) { // notify query that second node was contacted successfully, with closer nodes state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), Response: kadtest.NewResponse("resp_b", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), }), @@ -756,11 +786,16 @@ func TestQueryWithCloserIterFinishesWhenNumResultsReached(t *testing.T) { d := kadtest.NewID(key.Key8(0b00100000)) // 32 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{a, b, c, d} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -779,27 +814,27 @@ func TestQueryWithCloserIterFinishesWhenNumResultsReached(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // contact second node state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // notify query that first node was contacted successfully state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // query attempts to contact third node require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) // notify query that second node was contacted successfully state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), }) // query has finished since it contacted the NumResults closest nodes @@ -814,12 +849,14 @@ func TestQueryWithCloserIterContinuesUntilNumResultsReached(t *testing.T) { b := kadtest.NewID(key.Key8(0b00001000)) // 8 c := kadtest.NewID(key.Key8(0b00010000)) // 16 - // one known node to start with, the furthesr - knownNodes := []kad.NodeID[key.Key8]{c} + // one known node to start with, the further + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -838,12 +875,12 @@ func TestQueryWithCloserIterContinuesUntilNumResultsReached(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) // notify query that node was contacted successfully and tell it about // a closer one state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: c, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), Response: kadtest.NewResponse("resp_c", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(b, []kadtest.StrAddr{"addr_b"}), }), @@ -852,12 +889,12 @@ func TestQueryWithCloserIterContinuesUntilNumResultsReached(t *testing.T) { // query attempts to contact second node require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // notify query that node was contacted successfully and tell it about // a closer one state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), Response: kadtest.NewResponse("resp_b", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), }), @@ -868,11 +905,11 @@ func TestQueryWithCloserIterContinuesUntilNumResultsReached(t *testing.T) { // to contact third node require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // notify query that second node was contacted successfully state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // query has finished since it contacted the NumResults closest nodes @@ -896,8 +933,14 @@ func TestQueryNotContactedMakesCapacity(t *testing.T) { require.True(t, target.Xor(b.Key()).Compare(target.Xor(c.Key())) == -1) require.True(t, target.Xor(c.Key()).Compare(target.Xor(d.Key())) == -1) - knownNodes := []kad.NodeID[key.Key8]{a, b, c, d} - iter := NewSequentialIter[key.Key8]() + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + } + + iter := NewSequentialIter[key.Key8, kadtest.StrAddr]() clk := clock.NewMock() cfg := DefaultQueryConfig[key.Key8]() @@ -916,19 +959,19 @@ func TestQueryNotContactedMakesCapacity(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // while the query has capacity the query should contact the next nearest node state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // while the query has capacity the query should contact the second nearest node state = qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) // the query should be at capacity state = qry.Advance(ctx, nil) @@ -938,7 +981,7 @@ func TestQueryNotContactedMakesCapacity(t *testing.T) { state = qry.Advance(ctx, &EventQueryMessageFailure[key.Key8]{NodeID: a}) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) // the query should be at capacity again state = qry.Advance(ctx, nil) @@ -954,11 +997,15 @@ func TestQueryAllNotContactedFinishes(t *testing.T) { c := kadtest.NewID(key.Key8(0b00010000)) // 16 // knownNodes are in "random" order - knownNodes := []kad.NodeID[key.Key8]{a, b, c} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + } clk := clock.NewMock() - iter := NewSequentialIter[key.Key8]() + iter := NewSequentialIter[key.Key8, kadtest.StrAddr]() cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -1014,11 +1061,15 @@ func TestQueryAllContactedFinishes(t *testing.T) { b := kadtest.NewID(key.Key8(0b00001000)) // 8 c := kadtest.NewID(key.Key8(0b00010000)) // 16 - knownNodes := []kad.NodeID[key.Key8]{a, b, c} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + } clk := clock.NewMock() - iter := NewSequentialIter[key.Key8]() + iter := NewSequentialIter[key.Key8, kadtest.StrAddr]() cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -1050,15 +1101,15 @@ func TestQueryAllContactedFinishes(t *testing.T) { require.IsType(t, &StateQueryWaitingAtCapacity{}, state) // notify query that first node was contacted successfully, but no closer nodes - state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{NodeID: a}) + state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a)}) require.IsType(t, &StateQueryWaitingWithCapacity{}, state) // notify query that second node was contacted successfully, but no closer nodes - state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{NodeID: b}) + state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b)}) require.IsType(t, &StateQueryWaitingWithCapacity{}, state) // notify query that third node was contacted successfully, but no closer nodes - state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{NodeID: c}) + state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c)}) // query has finished since it contacted all possible nodes, even though it didn't // reach the desired NumResults @@ -1076,11 +1127,13 @@ func TestQueryNeverMessagesSelf(t *testing.T) { b := kadtest.NewID(key.Key8(0b00001000)) // 8 // one known node to start with - knownNodes := []kad.NodeID[key.Key8]{b} + knownNodes := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + } clk := clock.NewMock() - iter := NewClosestNodesIter(target) + iter := NewClosestNodesIter[key.Key8, kadtest.StrAddr](target) cfg := DefaultQueryConfig[key.Key8]() cfg.Clock = clk @@ -1098,11 +1151,11 @@ func TestQueryNeverMessagesSelf(t *testing.T) { state := qry.Advance(ctx, nil) require.IsType(t, &StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateQueryWaitingMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // notify query that first node was contacted successfully, with closer nodes state = qry.Advance(ctx, &EventQueryMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), Response: kadtest.NewResponse("resp_b", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ kadtest.NewInfo(a, []kadtest.StrAddr{"addr_a"}), }), diff --git a/routing/bootstrap.go b/routing/bootstrap.go index 96e0a4d..a9c8cb1 100644 --- a/routing/bootstrap.go +++ b/routing/bootstrap.go @@ -99,7 +99,7 @@ func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) Bootst case *EventBootstrapStart[K, A]: // TODO: ignore start event if query is already in progress - iter := query.NewClosestNodesIter(b.self.Key()) + iter := query.NewClosestNodesIter[K, A](b.self.Key()) qryCfg := query.DefaultQueryConfig[K]() qryCfg.Clock = b.cfg.Clock @@ -108,7 +108,7 @@ func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) Bootst queryID := query.QueryID("bootstrap") - qry, err := query.NewQuery[K](b.self, queryID, tev.ProtocolID, tev.Message, iter, tev.KnownClosestNodes, qryCfg) + qry, err := query.NewQuery[K, A](b.self, queryID, tev.ProtocolID, tev.Message, iter, tev.KnownClosestNodes, qryCfg) if err != nil { // TODO: don't panic panic(err) @@ -118,7 +118,7 @@ func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) Bootst case *EventBootstrapMessageResponse[K, A]: return b.advanceQuery(ctx, &query.EventQueryMessageResponse[K, A]{ - NodeID: tev.NodeID, + Node: tev.Node, Response: tev.Response, }) case *EventBootstrapMessageFailure[K]: @@ -147,7 +147,7 @@ func (b *Bootstrap[K, A]) advanceQuery(ctx context.Context, qev query.QueryEvent return &StateBootstrapMessage[K, A]{ QueryID: st.QueryID, Stats: st.Stats, - NodeID: st.NodeID, + Node: st.Node, ProtocolID: st.ProtocolID, Message: st.Message, } @@ -188,7 +188,7 @@ type BootstrapState interface { // StateBootstrapMessage indicates that the bootstrap query is waiting to message a node. type StateBootstrapMessage[K kad.Key[K], A kad.Address[A]] struct { QueryID query.QueryID - NodeID kad.NodeID[K] + Node kad.NodeInfo[K, A] ProtocolID address.ProtocolID Message kad.Request[K, A] Stats query.QueryStats @@ -231,12 +231,12 @@ type EventBootstrapPoll struct{} type EventBootstrapStart[K kad.Key[K], A kad.Address[A]] struct { ProtocolID address.ProtocolID Message kad.Request[K, A] - KnownClosestNodes []kad.NodeID[K] + KnownClosestNodes []kad.NodeInfo[K, A] } // EventBootstrapMessageResponse notifies a bootstrap that a sent message has received a successful response. type EventBootstrapMessageResponse[K kad.Key[K], A kad.Address[A]] struct { - NodeID kad.NodeID[K] // the node the message was sent to + Node kad.NodeInfo[K, A] // the node the message was sent to Response kad.Response[K, A] // the message response sent by the node } diff --git a/routing/bootstrap_test.go b/routing/bootstrap_test.go index 43dddf7..d587eb7 100644 --- a/routing/bootstrap_test.go +++ b/routing/bootstrap_test.go @@ -82,9 +82,11 @@ func TestBootstrapStart(t *testing.T) { // start the bootstrap state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ - ProtocolID: protocolID, - Message: msg, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) @@ -95,7 +97,7 @@ func TestBootstrapStart(t *testing.T) { require.Equal(t, query.QueryID("bootstrap"), st.QueryID) // the query should attempt to contact the node it was given - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // with the correct protocol ID require.Equal(t, protocolID, st.ProtocolID) @@ -125,20 +127,22 @@ func TestBootstrapMessageResponse(t *testing.T) { // start the bootstrap state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ - ProtocolID: protocolID, - Message: msg, - KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + }, }) require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) // the bootstrap should attempt to contact the node it was given st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, query.QueryID("bootstrap"), st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // notify bootstrap that node was contacted successfully, but no closer nodes state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), }) // bootstrap should respond that its query has finished @@ -175,28 +179,33 @@ func TestBootstrapProgress(t *testing.T) { // start the bootstrap state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ - ProtocolID: protocolID, - Message: msg, - KnownClosestNodes: []kad.NodeID[key.Key8]{d, a, b, c}, + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](a), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), + kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), + }, }) // the bootstrap should attempt to contact the closest node it was given require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, query.QueryID("bootstrap"), st.QueryID) - require.Equal(t, a, st.NodeID) + require.Equal(t, a, st.Node.ID()) // next the bootstrap attempts to contact second nearest node state = bs.Advance(ctx, &EventBootstrapPoll{}) require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, b, st.NodeID) + require.Equal(t, b, st.Node.ID()) // next the bootstrap attempts to contact third nearest node state = bs.Advance(ctx, &EventBootstrapPoll{}) require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, c, st.NodeID) + require.Equal(t, c, st.Node.ID()) // now the bootstrap should be waiting since it is at request capacity state = bs.Advance(ctx, &EventBootstrapPoll{}) @@ -204,17 +213,17 @@ func TestBootstrapProgress(t *testing.T) { // notify bootstrap that node was contacted successfully, but no closer nodes state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: a, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), }) // now the bootstrap has capacity to contact fourth nearest node require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) - require.Equal(t, d, st.NodeID) + require.Equal(t, d, st.Node.ID()) // notify bootstrap that a node was contacted successfully state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: b, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](b), }) // bootstrap should respond that it is waiting for messages @@ -222,7 +231,7 @@ func TestBootstrapProgress(t *testing.T) { // notify bootstrap that a node was contacted successfully state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: c, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](c), }) // bootstrap should respond that it is waiting for last message @@ -230,7 +239,7 @@ func TestBootstrapProgress(t *testing.T) { // notify bootstrap that the final node was contacted successfully state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ - NodeID: d, + Node: kadtest.NewEmptyInfo[key.Key8, kadtest.StrAddr](d), }) // bootstrap should respond that its query has finished