Skip to content

Commit

Permalink
configure to also suport non-sharded configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Dec 14, 2023
1 parent 2a82cba commit 6f94d6b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 15 deletions.
11 changes: 5 additions & 6 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
PubsubTopic,
ShardInfo
} from "@waku/interfaces";
import { pubsubTopicsToShardInfo, shardInfoToPubsubTopics } from "@waku/utils";
import { shardInfoToPubsubTopics } from "@waku/utils";
import {
getPeersForProtocol,
getPeersForProtocolAndShard,
Expand All @@ -28,12 +28,15 @@ export class BaseProtocol implements IBaseProtocol {
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];
public shardInfo?: ShardInfo;

constructor(
public multicodec: string,
private components: Libp2pComponents,
shardInfo?: ShardInfo
) {
this.shardInfo = shardInfo;

this.pubsubTopics = this.initializePubsubTopic(shardInfo);

this.addLibp2pEventListener = components.events.addEventListener.bind(
Expand All @@ -59,10 +62,6 @@ export class BaseProtocol implements IBaseProtocol {
return this.components.peerStore;
}

public get shardInfo(): ShardInfo {
return pubsubTopicsToShardInfo(this.pubsubTopics);
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
Expand Down Expand Up @@ -100,7 +99,7 @@ export class BaseProtocol implements IBaseProtocol {
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all peers that support the protocol & shard
// Retrieve all peers that support the protocol & shard (if configured)
const peersForProtocolAndShard = await getPeersForProtocolAndShard(
this.peerStore,
[this.multicodec],
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
constructor(
public shardInfo: ShardInfo,
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, shardInfo);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
Expand Down
4 changes: 3 additions & 1 deletion packages/interfaces/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { ShardInfo } from "./enr.js";
import type { IBaseProtocol } from "./protocols.js";

export interface IMetadata extends IBaseProtocol {
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocol, "shardInfo"> {
shardInfo: ShardInfo;
query(peerId: PeerId): Promise<ShardInfo | undefined>;
}
1 change: 1 addition & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export enum Protocols {
}

export interface IBaseProtocol {
shardInfo?: ShardInfo;
multicodec: string;
peerStore: PeerStore;
peers: () => Promise<Peer[]>;
Expand Down
19 changes: 13 additions & 6 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,27 @@ export async function selectLowestLatencyPeer(

/**
* Returns the list of peers that supports the given protocol and shard.
* If shard is not configured, all peers that support the protocol are returned.
*/

export async function getPeersForProtocolAndShard(
peerStore: PeerStore,
protocols: string[],
shardInfo: ShardInfo
shardInfo?: ShardInfo
): Promise<Peer[]> {
const peers: Peer[] = [];
await peerStore.forEach((peer) => {
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);

if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
if (shardInfo) {
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);

if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
if (protocols.some((protocol) => peer.protocols.includes(protocol))) {
peers.push(peer);
}
}
} else {
if (protocols.some((protocol) => peer.protocols.includes(protocol))) {
peers.push(peer);
}
Expand Down

0 comments on commit 6f94d6b

Please sign in to comment.