Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: protocols filter peers per shard #1756

Merged
merged 17 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec } from "./lib/light_push/index.js";
export { wakuLightPush } from "./lib/light_push/index.js";
export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";

Expand Down
42 changes: 31 additions & 11 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import {
getConnectedPeersForProtocol,
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
selectPeerForProtocol
} from "@waku/utils/libp2p";
Expand All @@ -27,11 +27,15 @@ export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
public options?: ProtocolCreateOptions
) {
this.pubsubTopics = this.initializePubsubTopic(options);

this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
Expand Down Expand Up @@ -60,10 +64,19 @@ export class BaseProtocol implements IBaseProtocol {
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async peers(): Promise<Peer[]> {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
);
});
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
Expand Down Expand Up @@ -92,18 +105,25 @@ export class BaseProtocol implements IBaseProtocol {
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol
const allPeersForProtocol = await getConnectedPeersForProtocol(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec]
);
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec]
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
return filterPeers(
connectedPeersForProtocolAndShard,
numPeers,
maxBootstrapPeers
);
}

initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
private initializePubsubTopic(
options?: ProtocolCreateOptions
): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import type { Peer } from "@libp2p/interface/peer-store";
import type { PeerStore } from "@libp2p/interface/peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { decodeRelayShard } from "@waku/enr";
import {
ConnectionManagerOptions,
EConnectionStateEvents,
Expand All @@ -18,7 +17,7 @@
ShardInfo
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
Expand All @@ -38,7 +37,7 @@
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 40 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 40 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand Down Expand Up @@ -244,7 +243,7 @@
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 246 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 246 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}`
);
}
Expand Down Expand Up @@ -380,6 +379,8 @@
},
"peer:connect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
log.info(`Connected to peer ${evt.detail.toString()}`);

const peerId = evt.detail;

this.keepAliveManager.start(
Expand Down
7 changes: 1 addition & 6 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class Subscription {
}

class Filter extends BaseProtocol implements IReceiver {
private readonly pubsubTopics: PubsubTopic[] = [];
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;

Expand All @@ -279,9 +278,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options);
super(FilterCodecs.SUBSCRIBE, libp2p.components, options);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -300,8 +297,6 @@ class Filter extends BaseProtocol implements IReceiver {

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

//TODO: get a relevant peer for the topic/shard
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
const result = filterPeers(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);
const result = filterPeers(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);
const result = filterPeers(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);
const result = filterPeers(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import { Tags } from "@waku/interfaces";
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
*/
export async function filterPeers(
export function filterPeers(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
): Peer[] {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
Expand Down
6 changes: 1 addition & 5 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
SendError,
SendResult
} from "@waku/interfaces";
Expand Down Expand Up @@ -44,12 +43,10 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options);
super(LightPushCodec, libp2p.components, options);
}

private async preparePushMessage(
Expand Down Expand Up @@ -108,7 +105,6 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}

//TODO: get a relevant peer for the topic/shard
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
Expand Down
55 changes: 42 additions & 13 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type {
IMetadata,
Libp2pComponents,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import { encodeRelayShard, Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand All @@ -20,13 +19,21 @@ const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardingParams;
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
handshakesConfirmed: Set<PeerId> = new Set();

checkHandshake(peerId: PeerId): boolean {
const handshakesArr = [...this.handshakesConfirmed];
return handshakesArr.some((id) => id.equals(peerId));
}

constructor(
public shardInfo: ShardingParams,
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, shardInfo && { shardInfo });
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
Expand All @@ -53,12 +60,10 @@ class Metadata extends BaseProtocol {
const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
await this.savePeerShardInfo(
connection.remotePeer,
remoteShardInfoResponse
);
} catch (error) {
log.error("Error handling metadata request", error);
}
Expand All @@ -84,9 +89,19 @@ class Metadata extends BaseProtocol {

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

await this.savePeerShardInfo(peerId, decodedResponse);

return decodedResponse;
}

public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
if (this.checkHandshake(peerId)) return;
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved

await this.query(peerId);

return;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

Expand All @@ -101,6 +116,20 @@ class Metadata extends BaseProtocol {

return response;
}

private async savePeerShardInfo(
peerId: PeerId,
shardInfo: ShardInfo
): Promise<void> {
// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
});

this.handshakesConfirmed.add(peerId);
}
}

export function wakuMetadata(
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
IDecoder,
IStore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic
ProtocolCreateOptions
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
Expand Down Expand Up @@ -74,12 +73,10 @@ export interface QueryOptions {
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store extends BaseProtocol implements IStore {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options);
super(StoreCodec, libp2p.components, options);
}

/**
Expand Down
Loading
Loading