Skip to content

Commit

Permalink
chore: minor refactoring for protocols (#1762)
Browse files Browse the repository at this point in the history
* remove: unnecessary function

* remove: test

* update doc

* add: tests

* revert unintended change

* fix: ping return

* fix(tests): stub connections

* log warning instead of throwing an error
  • Loading branch information
danisharora099 authored Jan 11, 2024
1 parent dc96074 commit b99f828
Show file tree
Hide file tree
Showing 14 changed files with 425 additions and 298 deletions.
14 changes: 8 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 29 additions & 19 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Expand All @@ -9,14 +8,14 @@ import type {
PubsubTopic
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
import {
getConnectedPeersForProtocol,
getPeersForProtocol,
selectPeerForProtocol
sortPeersByLatency
} from "@waku/utils/libp2p";

import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand All @@ -30,7 +29,8 @@ export class BaseProtocol implements IBaseProtocol {

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
private log: Logger
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
Expand Down Expand Up @@ -64,22 +64,14 @@ export class BaseProtocol implements IBaseProtocol {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}

/**
* Retrieves a list of connected peers based on the specified criteria.
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
* @returns A list of peers that support the protocol sorted by latency.
*/
protected async getPeers(
{
numPeers,
Expand All @@ -99,8 +91,26 @@ export class BaseProtocol implements IBaseProtocol {
[this.multicodec]
);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
// Filter the peers based on discovery & number of peers requested
const filteredPeers = await filterPeersByDiscovery(
allPeersForProtocol,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
filteredPeers
);

if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

return sortedFilteredPeers;
}

initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

this.pubsubTopics = this.initializePubsubTopic(options);

Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";

import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";

describe("filterPeers function", function () {
describe("filterPeersByDiscovery function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
Expand All @@ -27,7 +27,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
const result = await filterPeersByDiscovery(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);
const result = await filterPeersByDiscovery(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);
const result = await filterPeersByDiscovery(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);
const result = await filterPeersByDiscovery(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
16 changes: 12 additions & 4 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,31 @@ import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";

/**
* Retrieves a list of peers based on the specified criteria.
* Retrieves a list of peers based on the specified criteria:
* 1. If numPeers is 0, return all peers
* 2. Bootstrap peers are prioritized
* 3. Non-bootstrap peers are randomly selected to fill up to numPeers
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
export async function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
let bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
.slice(0, maxBootstrapPeers);

// If numPeers is less than the number of bootstrap peers, adjust the bootstrapPeers array
if (numPeers > 0 && numPeers < bootstrapPeers.length) {
bootstrapPeers = bootstrapPeers.slice(0, numPeers);
}

// Collect non-bootstrap peers
const nonBootstrapPeers = peers.filter(
(peer) => !peer.tags.has(Tags.BOOTSTRAP)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class LightPush extends BaseProtocol implements ILightPush {
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
super(LightPushCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
}

Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Metadata extends BaseProtocol {
private readonly shardInfo: ShardingParams;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
super(MetadataCodec, libp2p.components, log);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
Expand Down Expand Up @@ -70,7 +70,10 @@ class Metadata extends BaseProtocol {
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);
const peer = await this.peerStore.get(peerId);
if (!peer) {
throw new Error(`Peer ${peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Store extends BaseProtocol implements IStore {
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
super(StoreCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol {

export interface PeerExchangeQueryParams {
numPeers: number;
peerId?: PeerId;
peerId: PeerId;
}

export interface PeerExchangeResponse {
Expand Down
7 changes: 5 additions & 2 deletions packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @param components - libp2p components
*/
constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components);
super(PeerExchangeCodec, components, log);
}

/**
Expand All @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.getPeer(params.peerId);
const peer = await this.peerStore.get(params.peerId);
if (!peer) {
throw new Error(`Peer ${params.peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
1 change: 1 addition & 0 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"fast-check": "^3.15.0",
"p-retry": "^6.1.0",
"p-timeout": "^6.1.0",
"portfinder": "^1.0.32",
Expand Down
Loading

0 comments on commit b99f828

Please sign in to comment.