Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor refactoring for protocols #1762

Merged
merged 11 commits into from
Jan 11, 2024
14 changes: 8 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 29 additions & 19 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Expand All @@ -9,14 +8,14 @@ import type {
PubsubTopic
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
import {
getConnectedPeersForProtocol,
getPeersForProtocol,
selectPeerForProtocol
sortPeersByLatency
} from "@waku/utils/libp2p";

import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand All @@ -30,7 +29,8 @@ export class BaseProtocol implements IBaseProtocol {

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
private log: Logger
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
Expand Down Expand Up @@ -64,22 +64,14 @@ export class BaseProtocol implements IBaseProtocol {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}

/**
* Retrieves a list of connected peers based on the specified criteria.
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/

* @returns A list of peers that support the protocol sorted by latency.
*/
protected async getPeers(
{
numPeers,
Expand All @@ -99,8 +91,26 @@ export class BaseProtocol implements IBaseProtocol {
[this.multicodec]
);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
// Filter the peers based on discovery & number of peers requested
const filteredPeers = await filterPeersByDiscovery(
allPeersForProtocol,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation of the function needs to be updated.

Copy link
Collaborator Author

@danisharora099 danisharora099 Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Doc for sortPeersByLatency was updated.
For the parent function getPeers, it has a generic definition that wasn't falsy. Updated it for more verbosity: 02aeab8

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fryorcraken merging this now
please let me know if you would prefer any follow up -- thanks!

this.peerStore,
filteredPeers
);

if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

return sortedFilteredPeers;
}

initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

this.pubsubTopics = this.initializePubsubTopic(options);

Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";

import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";

describe("filterPeers function", function () {
describe("filterPeersByDiscovery function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
Expand All @@ -27,7 +27,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
const result = await filterPeersByDiscovery(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);
const result = await filterPeersByDiscovery(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);
const result = await filterPeersByDiscovery(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);
const result = await filterPeersByDiscovery(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
16 changes: 12 additions & 4 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,31 @@ import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";

/**
* Retrieves a list of peers based on the specified criteria.
* Retrieves a list of peers based on the specified criteria:
* 1. If numPeers is 0, return all peers
* 2. Bootstrap peers are prioritized
* 3. Non-bootstrap peers are randomly selected to fill up to numPeers
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
export async function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
let bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
.slice(0, maxBootstrapPeers);

// If numPeers is less than the number of bootstrap peers, adjust the bootstrapPeers array
if (numPeers > 0 && numPeers < bootstrapPeers.length) {
bootstrapPeers = bootstrapPeers.slice(0, numPeers);
}

// Collect non-bootstrap peers
const nonBootstrapPeers = peers.filter(
(peer) => !peer.tags.has(Tags.BOOTSTRAP)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class LightPush extends BaseProtocol implements ILightPush {
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
super(LightPushCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
}

Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Metadata extends BaseProtocol {
private readonly shardInfo: ShardingParams;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
super(MetadataCodec, libp2p.components, log);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
Expand Down Expand Up @@ -70,7 +70,10 @@ class Metadata extends BaseProtocol {
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);
const peer = await this.peerStore.get(peerId);
if (!peer) {
throw new Error(`Peer ${peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Store extends BaseProtocol implements IStore {
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
super(StoreCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol {

export interface PeerExchangeQueryParams {
numPeers: number;
peerId?: PeerId;
peerId: PeerId;
}

export interface PeerExchangeResponse {
Expand Down
7 changes: 5 additions & 2 deletions packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @param components - libp2p components
*/
constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components);
super(PeerExchangeCodec, components, log);
}

/**
Expand All @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.getPeer(params.peerId);
const peer = await this.peerStore.get(params.peerId);
if (!peer) {
throw new Error(`Peer ${params.peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
1 change: 1 addition & 0 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"fast-check": "^3.15.0",
"p-retry": "^6.1.0",
"p-timeout": "^6.1.0",
"portfinder": "^1.0.32",
Expand Down
Loading
Loading