Skip to content

Commit

Permalink
feat!: protocols filter peers as per configured shard (#1756)
Browse files Browse the repository at this point in the history
* merge: master

* fix: tests

* update: interfafces

* rm: comments

* metadata: store peerIdStr instead of peerId

* chore(utils): move fast-utils to dev deps

* fix: allow autosharding nodes to get peers (#1785)

* fix: merge

* fix: build

* fix: failing tests from master merge

---------

Co-authored-by: Arseniy Klempner <[email protected]>
  • Loading branch information
danisharora099 and adklempner authored Jan 19, 2024
1 parent bb680e4 commit 477c2a5
Show file tree
Hide file tree
Showing 34 changed files with 664 additions and 129 deletions.
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec } from "./lib/light_push/index.js";
export { wakuLightPush } from "./lib/light_push/index.js";
export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";

Expand Down
41 changes: 29 additions & 12 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
import {
getConnectedPeersForProtocol,
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
} from "@waku/utils/libp2p";
Expand All @@ -25,12 +25,16 @@ export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];

constructor(
public multicodec: string,
private components: Libp2pComponents,
private log: Logger
private log: Logger,
private options?: ProtocolCreateOptions
) {
this.pubsubTopics = this.initializePubsubTopic(options);

this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
Expand Down Expand Up @@ -59,10 +63,19 @@ export class BaseProtocol implements IBaseProtocol {
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async peers(): Promise<Peer[]> {
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
);
});
}

/**
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
Expand All @@ -83,16 +96,18 @@ export class BaseProtocol implements IBaseProtocol {
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol
const allPeersForProtocol = await getConnectedPeersForProtocol(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec]
);
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
this.options?.shardInfo
);

// Filter the peers based on discovery & number of peers requested
const filteredPeers = await filterPeersByDiscovery(
allPeersForProtocol,
const filteredPeers = filterPeersByDiscovery(
connectedPeersForProtocolAndShard,
numPeers,
maxBootstrapPeers
);
Expand All @@ -112,7 +127,9 @@ export class BaseProtocol implements IBaseProtocol {
return sortedFilteredPeers;
}

initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
private initializePubsubTopic(
options?: ProtocolCreateOptions
): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Peer, PeerId, PeerInfo, PeerStore } from "@libp2p/interface";
import { CustomEvent, TypedEventEmitter } from "@libp2p/interface";
import { decodeRelayShard } from "@waku/enr";
import {
ConnectionManagerOptions,
EConnectionStateEvents,
Expand All @@ -15,7 +14,7 @@ import {
ShardInfo
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
Expand Down Expand Up @@ -377,6 +376,8 @@ export class ConnectionManager
},
"peer:connect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
log.info(`Connected to peer ${evt.detail.toString()}`);

const peerId = evt.detail;

this.keepAliveManager.start(
Expand Down
7 changes: 1 addition & 6 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class Subscription {
}

class Filter extends BaseProtocol implements IReceiver {
private readonly pubsubTopics: PubsubTopic[] = [];
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;

Expand All @@ -279,9 +278,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

this.pubsubTopics = this.initializePubsubTopic(options);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, options);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -300,8 +297,6 @@ class Filter extends BaseProtocol implements IReceiver {

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

//TODO: get a relevant peer for the topic/shard
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("filterPeersByDiscovery function", function () {
}
] as unknown as Peer[];

const result = await filterPeersByDiscovery(mockPeers, 0, 10);
const result = filterPeersByDiscovery(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeersByDiscovery function", function () {
}
] as unknown as Peer[];

const result = await filterPeersByDiscovery(mockPeers, 0, 0);
const result = filterPeersByDiscovery(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeersByDiscovery function", function () {
}
] as unknown as Peer[];

const result = await filterPeersByDiscovery(mockPeers, 0, 1);
const result = filterPeersByDiscovery(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeersByDiscovery function", function () {
}
] as unknown as Peer[];

const result = await filterPeersByDiscovery(mockPeers, 5, 2);
const result = filterPeersByDiscovery(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import { Tags } from "@waku/interfaces";
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
* @returns An array of peers based on the specified criteria.
*/
export async function filterPeersByDiscovery(
export function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
): Peer[] {
// Collect the bootstrap peers up to the specified maximum
let bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
Expand Down
6 changes: 1 addition & 5 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
SendError,
SendResult
} from "@waku/interfaces";
Expand Down Expand Up @@ -43,12 +42,10 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
super(LightPushCodec, libp2p.components, log, options);
}

private async preparePushMessage(
Expand Down Expand Up @@ -107,7 +104,6 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}

//TODO: get a relevant peer for the topic/shard
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
Expand Down
51 changes: 38 additions & 13 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { PeerId } from "@libp2p/interface";
import { IncomingStreamData } from "@libp2p/interface";
import { encodeRelayShard } from "@waku/enr";
import type {
IMetadata,
Libp2pComponents,
PeerIdStr,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import { encodeRelayShard, Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand All @@ -20,13 +20,16 @@ const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardingParams;
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components, log);
handshakesConfirmed: Set<PeerIdStr> = new Set();

constructor(
public shardInfo: ShardingParams,
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, log, shardInfo && { shardInfo });
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
Expand All @@ -53,12 +56,10 @@ class Metadata extends BaseProtocol {
const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
await this.savePeerShardInfo(
connection.remotePeer,
remoteShardInfoResponse
);
} catch (error) {
log.error("Error handling metadata request", error);
}
Expand Down Expand Up @@ -87,9 +88,19 @@ class Metadata extends BaseProtocol {

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

await this.savePeerShardInfo(peerId, decodedResponse);

return decodedResponse;
}

public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
if (this.handshakesConfirmed.has(peerId.toString())) return;

await this.query(peerId);

return;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

Expand All @@ -104,6 +115,20 @@ class Metadata extends BaseProtocol {

return response;
}

private async savePeerShardInfo(
peerId: PeerId,
shardInfo: ShardInfo
): Promise<void> {
// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
});

this.handshakesConfirmed.add(peerId.toString());
}
}

export function wakuMetadata(
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
IDecoder,
IStore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic
ProtocolCreateOptions
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
Expand Down Expand Up @@ -74,12 +73,10 @@ export interface QueryOptions {
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store extends BaseProtocol implements IStore {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components, log);
this.pubsubTopics = this.initializePubsubTopic(options);
super(StoreCodec, libp2p.components, log, options);
}

/**
Expand Down
Loading

0 comments on commit 477c2a5

Please sign in to comment.