Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor refactoring for protocols #1762

Merged
merged 11 commits into from
Jan 11, 2024
29 changes: 15 additions & 14 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Expand All @@ -9,10 +8,10 @@ import type {
ShardInfo
} from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { DefaultPubsubTopic } from "./constants.js";
import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand Down Expand Up @@ -60,15 +59,6 @@ export class BaseProtocol implements IBaseProtocol {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}

/**
* Retrieves a list of peers based on the specified criteria.
*
Expand All @@ -93,8 +83,19 @@ export class BaseProtocol implements IBaseProtocol {
this.multicodec
]);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
// Filter the peers based on discovery & number of peers requested
const filteredPeers = await filterPeersByDiscovery(
allPeersForProtocol,
numPeers,
maxBootstrapPeers
);

const sortedFilteredPeers = await sortPeersByLatency(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation of the function needs to be updated.

Copy link
Collaborator Author

@danisharora099 danisharora099 Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Doc for sortPeersByLatency was updated.
For the parent function getPeers, it has a generic definition that wasn't falsy. Updated it for more verbosity: 02aeab8

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fryorcraken merging this now
please let me know if you would prefer any follow up -- thanks!

this.peerStore,
filteredPeers
);

return sortedFilteredPeers;
}

initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";

import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";

describe("filterPeers function", function () {
describe("filterPeersByDiscovery function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
Expand All @@ -27,7 +27,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
const result = await filterPeersByDiscovery(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);
const result = await filterPeersByDiscovery(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);
const result = await filterPeersByDiscovery(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);
const result = await filterPeersByDiscovery(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";

/**
* Retrieves a list of peers based on the specified criteria.
* Retrieves a list of peers based on the specified criteria:
* 1. If numPeers is 0, return all peers
* 2. Bootstrap peers are prioritized
* 3. Non-bootstrap peers are randomly selected to fill up to numPeers
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
export async function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class Metadata extends BaseProtocol {
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);
const peer = await this.peerStore.get(peerId);
if (!peer) {
throw new Error(`Peer ${peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol {

export interface PeerExchangeQueryParams {
numPeers: number;
peerId?: PeerId;
peerId: PeerId;
}

export interface PeerExchangeResponse {
Expand Down
5 changes: 4 additions & 1 deletion packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.getPeer(params.peerId);
const peer = await this.peerStore.get(params.peerId);
if (!peer) {
throw new Error(`Peer ${params.peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
1 change: 1 addition & 0 deletions packages/tests/tests/peer_exchange.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe("Peer Exchange", () => {
let peerInfos: PeerInfo[] = [];
while (peerInfos.length <= 0) {
peerInfos = (await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
})) as PeerInfo[];
await delay(3000);
Expand Down
150 changes: 0 additions & 150 deletions packages/tests/tests/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
Expand All @@ -12,10 +9,8 @@ import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { selectPeerForProtocol } from "@waku/utils/libp2p";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import sinon from "sinon";

import {
delay,
Expand Down Expand Up @@ -121,148 +116,3 @@ describe("Util: toAsyncIterator: Filter", () => {
expect(result.done).to.eq(true);
});
});

const TestCodec = "test/1";

describe("selectPeerForProtocol", () => {
let peerStore: PeerStore;
adklempner marked this conversation as resolved.
Show resolved Hide resolved
const protocols = [TestCodec];

let lowPingPeer: Peer,
midPingPeer: Peer,
highPingPeer: Peer,
differentCodecPeer: Peer,
anotherDifferentCodecPeer: Peer;

beforeEach(async function () {
this.timeout(10000);
const waku = await createLightNode();
await waku.start();
await delay(3000);
peerStore = waku.libp2p.peerStore;

const [
lowPingPeerId,
midPingPeerId,
highPingPeerId,
differentCodecPeerId,
anotherDifferentCodecPeerId
] = await Promise.all([
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId()
]);

lowPingPeer = {
id: lowPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("50"))
} as Peer;

midPingPeer = {
id: midPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("100"))
} as Peer;

highPingPeer = {
id: highPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("500"))
} as Peer;

differentCodecPeer = {
id: differentCodecPeerId,
protocols: ["DifferentCodec"]
} as Peer;

anotherDifferentCodecPeer = {
id: anotherDifferentCodecPeerId,
protocols: ["AnotherDifferentCodec"]
} as Peer;
});

afterEach(() => {
sinon.restore();
});

it("should return the peer with the lowest ping", async function () {
const mockPeers = [highPingPeer, lowPingPeer, midPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(result.peer).to.deep.equal(lowPingPeer);
expect(result.protocol).to.equal(TestCodec);
});

it("should return the peer with the provided peerId", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

const result = await selectPeerForProtocol(
peerStore,
protocols,
targetPeer
);
expect(result.peer).to.deep.equal(mockPeer);
});

it("should return a random peer when all peers have the same latency", async function () {
const mockPeers = [highPingPeer, highPingPeer, highPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(mockPeers).to.deep.include(result.peer);
});

it("should throw an error when no peer matches the given protocols", async function () {
const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

await expect(
selectPeerForProtocol(peerStore, protocols)
).to.be.rejectedWith(
`Failed to find known peer that registers protocols: ${protocols}`
);
});

it("should throw an error when the selected peer does not register the required protocols", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

await expect(
selectPeerForProtocol(peerStore, protocols, targetPeer)
).to.be.rejectedWith(
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
);
});
});
Loading
Loading