Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use metadata protocol for awaiting connection to remote peer #1759

Merged
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"cSpell.enabled": true,
"editor.defaultFormatter": "dbaeumer.vscode-eslint",
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
},
"editor.formatOnSave": false, // Disable general format on save
"typescript.tsdk": "node_modules/typescript/lib",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush } from "./lib/light_push/index.js";
export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";

Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,19 @@ export class BaseProtocol implements IBaseProtocol {
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async peers(): Promise<Peer[]> {
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
);
});
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 40 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 40 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand Down Expand Up @@ -243,7 +243,7 @@
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 246 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 246 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
}`
);
}
Expand Down Expand Up @@ -379,6 +379,8 @@
},
"peer:connect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
log.info(`Connected to peer ${evt.detail.toString()}`);

const peerId = evt.detail;

this.keepAliveManager.start(
Expand Down
41 changes: 35 additions & 6 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
handshakesConfirmed: Set<PeerId> = new Set();

checkHandshake(peerId: PeerId): boolean {
const handshakesArr = [...this.handshakesConfirmed];
return handshakesArr.some((id) => id.equals(peerId));
}

constructor(
public shardInfo: ShardInfo,
libp2p: Libp2pComponents
Expand Down Expand Up @@ -48,12 +55,10 @@ class Metadata extends BaseProtocol implements IMetadata {
const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
await this.savePeerShardInfo(
connection.remotePeer,
remoteShardInfoResponse
);
} catch (error) {
log.error("Error handling metadata request", error);
}
Expand All @@ -79,9 +84,19 @@ class Metadata extends BaseProtocol implements IMetadata {

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

await this.savePeerShardInfo(peerId, decodedResponse);

return decodedResponse;
}

public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
if (this.checkHandshake(peerId)) return;

await this.query(peerId);

return;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

Expand All @@ -96,6 +111,20 @@ class Metadata extends BaseProtocol implements IMetadata {

return response;
}

private async savePeerShardInfo(
peerId: PeerId,
shardInfo: ShardInfo
): Promise<void> {
// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
});

this.handshakesConfirmed.add(peerId);
}
}

export function wakuMetadata(
Expand Down
67 changes: 56 additions & 11 deletions packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { IdentifyResult } from "@libp2p/interface";
import type { IBaseProtocol, IRelay, Waku } from "@waku/interfaces";
import type { IBaseProtocol, IMetadata, IRelay, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { pEvent } from "p-event";

const log = new Logger("wait-for-remote-peer");

/**
Expand Down Expand Up @@ -32,6 +31,11 @@
): Promise<void> {
protocols = protocols ?? getEnabledProtocols(waku);

const isShardingEnabled = waku.shardInfo !== undefined;
const metadataService = isShardingEnabled
? waku.libp2p.services.metadata
: undefined;

if (!waku.isStarted()) return Promise.reject("Waku node is not started");

const promises = [];
Expand All @@ -45,19 +49,19 @@
if (protocols.includes(Protocols.Store)) {
if (!waku.store)
throw new Error("Cannot wait for Store peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.store));
promises.push(waitForConnectedPeer(waku.store, metadataService));
}

if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush)
throw new Error("Cannot wait for LightPush peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.lightPush));
promises.push(waitForConnectedPeer(waku.lightPush, metadataService));
}

if (protocols.includes(Protocols.Filter)) {
if (!waku.filter)
throw new Error("Cannot wait for Filter peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.filter));
promises.push(waitForConnectedPeer(waku.filter, metadataService));
}

if (timeoutMs) {
Expand All @@ -73,21 +77,62 @@

/**
* Wait for a peer with the given protocol to be connected.
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
*/
async function waitForConnectedPeer(protocol: IBaseProtocol): Promise<void> {
async function waitForConnectedPeer(
protocol: IBaseProtocol,
metadataService?: IMetadata
): Promise<void> {
const codec = protocol.multicodec;
const peers = await protocol.peers();
const peers = await protocol.connectedPeers();

if (peers.length) {
log.info(`${codec} peer found: `, peers[0].id.toString());
return;
if (!metadataService) {
log.info(`${codec} peer found: `, peers[0].id.toString());
return;
}

// once a peer is connected, we need to confirm the metadata handshake with at least one of those peers if sharding is enabled
try {
await Promise.any(
peers.map((peer) => metadataService.confirmOrAttemptHandshake(peer.id))
);
return;
} catch (e) {
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED")

Check warning on line 102 in packages/core/src/lib/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 102 in packages/core/src/lib/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
log.error(
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
);

log.error(`Error waiting for handshake confirmation: ${e}`);
}
}

log.info(`Waiting for ${codec} peer`);

// else we'll just wait for the next peer to connect
await new Promise<void>((resolve) => {
const cb = (evt: CustomEvent<IdentifyResult>): void => {
if (evt.detail?.protocols?.includes(codec)) {
protocol.removeLibp2pEventListener("peer:identify", cb);
resolve();
if (metadataService) {
metadataService
.confirmOrAttemptHandshake(evt.detail.peerId)
.then(() => {
protocol.removeLibp2pEventListener("peer:identify", cb);
resolve();
})
.catch((e) => {
if (e.code === "ERR_CONNECTION_BEING_CLOSED")
log.error(
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
);

log.error(`Error waiting for handshake confirmation: ${e}`);
});
} else {
protocol.removeLibp2pEventListener("peer:identify", cb);
resolve();
}
}
};
protocol.addLibp2pEventListener("peer:identify", cb);
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
private pubsubShardInfo?: ShardInfo,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter,
Expand Down Expand Up @@ -109,6 +109,10 @@ export class WakuNode implements Waku {
);
}

get shardInfo(): ShardInfo | undefined {
return this.pubsubShardInfo;
}

/**
* Dials to the provided peer.
*
Expand Down
1 change: 1 addition & 0 deletions packages/interfaces/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import type { IBaseProtocol } from "./protocols.js";
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocol, "shardInfo"> {
shardInfo: ShardInfo;
confirmOrAttemptHandshake(peerId: PeerId): Promise<void>;
query(peerId: PeerId): Promise<ShardInfo | undefined>;
}
3 changes: 2 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ export interface IBaseProtocol {
shardInfo?: ShardInfo;
multicodec: string;
peerStore: PeerStore;
peers: () => Promise<Peer[]>;
allPeers: () => Promise<Peer[]>;
connectedPeers: () => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"];
removeLibp2pEventListener: Libp2p["removeEventListener"];
}
Expand Down
3 changes: 3 additions & 0 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { ShardInfo } from "./enr.js";
import type { IFilter } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
Expand All @@ -17,6 +18,8 @@ export interface Waku {
filter?: IFilter;
lightPush?: ILightPush;

shardInfo?: ShardInfo;

connectionManager: IConnectionManager;

dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
Expand Down
3 changes: 2 additions & 1 deletion packages/tests/tests/filter/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ export async function runNodes(
filter: true,
lightpush: true,
relay: true,
pubsubTopic: pubsubTopics
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })
},
{ retries: 3 }
);
Expand Down
Loading
Loading