Skip to content

Commit

Permalink
merge: master
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Jan 10, 2024
1 parent 69406bf commit fb1ce98
Show file tree
Hide file tree
Showing 33 changed files with 477 additions and 107 deletions.
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec } from "./lib/light_push/index.js";
export { wakuLightPush } from "./lib/light_push/index.js";
export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";

Expand Down
42 changes: 31 additions & 11 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import {
getConnectedPeersForProtocol,
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
selectPeerForProtocol
} from "@waku/utils/libp2p";
Expand All @@ -27,11 +27,15 @@ export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
public options?: ProtocolCreateOptions
) {
this.pubsubTopics = this.initializePubsubTopic(options);

this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
Expand Down Expand Up @@ -60,10 +64,19 @@ export class BaseProtocol implements IBaseProtocol {
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async peers(): Promise<Peer[]> {
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
);
});
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
Expand Down Expand Up @@ -92,18 +105,25 @@ export class BaseProtocol implements IBaseProtocol {
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol
const allPeersForProtocol = await getConnectedPeersForProtocol(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec]
);
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec]
);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
return filterPeers(
connectedPeersForProtocolAndShard,
numPeers,
maxBootstrapPeers
);
}

initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
private initializePubsubTopic(
options?: ProtocolCreateOptions
): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import type { PeerInfo } from "@libp2p/interface/peer-info";
import type { Peer } from "@libp2p/interface/peer-store";
import type { PeerStore } from "@libp2p/interface/peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { decodeRelayShard } from "@waku/enr";
import {
ConnectionManagerOptions,
EConnectionStateEvents,
Expand All @@ -18,7 +17,7 @@ import {
ShardInfo
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
Expand Down Expand Up @@ -380,6 +379,8 @@ export class ConnectionManager
},
"peer:connect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
log.info(`Connected to peer ${evt.detail.toString()}`);

const peerId = evt.detail;

this.keepAliveManager.start(
Expand Down
7 changes: 1 addition & 6 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class Subscription {
}

class Filter extends BaseProtocol implements IReceiver {
private readonly pubsubTopics: PubsubTopic[] = [];
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;

Expand All @@ -279,9 +278,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options);
super(FilterCodecs.SUBSCRIBE, libp2p.components, options);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -300,8 +297,6 @@ class Filter extends BaseProtocol implements IReceiver {

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

//TODO: get a relevant peer for the topic/shard
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/lib/filterPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 10);
const result = filterPeers(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});

Expand Down Expand Up @@ -56,7 +56,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 0);
const result = filterPeers(mockPeers, 0, 0);

// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
Expand Down Expand Up @@ -95,7 +95,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 0, 1);
const result = filterPeers(mockPeers, 0, 1);

// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ describe("filterPeers function", function () {
}
] as unknown as Peer[];

const result = await filterPeers(mockPeers, 5, 2);
const result = filterPeers(mockPeers, 5, 2);

// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import { Tags } from "@waku/interfaces";
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
export function filterPeers(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
): Peer[] {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
Expand Down
6 changes: 1 addition & 5 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
SendError,
SendResult
} from "@waku/interfaces";
Expand Down Expand Up @@ -44,12 +43,10 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options);
super(LightPushCodec, libp2p.components, options);
}

private async preparePushMessage(
Expand Down Expand Up @@ -108,7 +105,6 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}

//TODO: get a relevant peer for the topic/shard
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
Expand Down
55 changes: 42 additions & 13 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type {
IMetadata,
Libp2pComponents,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import { encodeRelayShard, Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand All @@ -20,13 +19,21 @@ const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardingParams;
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
handshakesConfirmed: Set<PeerId> = new Set();

checkHandshake(peerId: PeerId): boolean {
const handshakesArr = [...this.handshakesConfirmed];
return handshakesArr.some((id) => id.equals(peerId));
}

constructor(
public shardInfo: ShardingParams,
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, shardInfo && { shardInfo });
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
Expand All @@ -53,12 +60,10 @@ class Metadata extends BaseProtocol {
const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
await this.savePeerShardInfo(
connection.remotePeer,
remoteShardInfoResponse
);
} catch (error) {
log.error("Error handling metadata request", error);
}
Expand All @@ -84,9 +89,19 @@ class Metadata extends BaseProtocol {

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

await this.savePeerShardInfo(peerId, decodedResponse);

return decodedResponse;
}

public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
if (this.checkHandshake(peerId)) return;

await this.query(peerId);

return;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

Expand All @@ -101,6 +116,20 @@ class Metadata extends BaseProtocol {

return response;
}

private async savePeerShardInfo(
peerId: PeerId,
shardInfo: ShardInfo
): Promise<void> {
// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
});

this.handshakesConfirmed.add(peerId);
}
}

export function wakuMetadata(
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
IDecoder,
IStore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic
ProtocolCreateOptions
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
Expand Down Expand Up @@ -74,12 +73,10 @@ export interface QueryOptions {
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store extends BaseProtocol implements IStore {
private readonly pubsubTopics: PubsubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options);
super(StoreCodec, libp2p.components, options);
}

/**
Expand Down
Loading

0 comments on commit fb1ce98

Please sign in to comment.