Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve peer manager and re-integrate to light push #2191

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"upgrader",
"vacp",
"varint",
"weboko",
"waku",
"wakuconnect",
"wakunode",
Expand Down
1,655 changes: 714 additions & 941 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
},
"devDependencies": {
"@libp2p/peer-id": "^5.0.1",
"@libp2p/interface": "^2.1.3",
"@multiformats/multiaddr": "^12.0.0",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";
export { ConnectionManager } from "./lib/connection_manager/index.js";

export { getHealthManager } from "./lib/health_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we deleting the export for KeepAliveManager?

export { StreamManager } from "./lib/stream_manager/index.js";

export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
73 changes: 0 additions & 73 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import type {
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";

/**
Expand All @@ -23,7 +20,6 @@ export class BaseProtocol implements IBaseProtocolCore {
protected constructor(
public multicodec: string,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
Expand All @@ -45,73 +41,4 @@ export class BaseProtocol implements IBaseProtocolCore {
protected async getStream(peer: Peer): Promise<Stream> {
return this.streamManager.getStream(peer);
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have access to this utility function from elsewhere?
Basically getting the connected peers for a particular protocol

const peers = await this.allPeers();
return peers.filter((peer) => {
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.length > 0;
});
}

/**
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.components.peerStore,
filteredPeers
);

if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}

return sortedFilteredPeers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
IConnectionStateEvents,
IPeersByDiscoveryEvents,
IRelay,
KeepAliveOptions,
PeersByDiscoveryResult,
PubsubTopic,
ShardInfo
Expand All @@ -20,23 +19,36 @@
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
import { getPeerPing } from "./utils.js";

const log = new Logger("connection-manager");

export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
const DEFAULT_MAX_PARALLEL_DIALS = 3;

const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60;

type ConnectionManagerConstructorOptions = {
libp2p: Libp2p;
pubsubTopics: PubsubTopic[];
relay?: IRelay;
config?: Partial<ConnectionManagerOptions>;
};

export class ConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
// TODO(weboko): make it private
public readonly pubsubTopics: PubsubTopic[];

private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 51 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 51 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand All @@ -51,29 +63,6 @@
return this.isP2PNetworkConnected;
}

public static create(
peerId: string,
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
pubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: ConnectionManagerOptions
): ConnectionManager {
let instance = ConnectionManager.instances.get(peerId);
if (!instance) {
instance = new ConnectionManager(
libp2p,
keepAliveOptions,
pubsubTopics,
relay,
options
);
ConnectionManager.instances.set(peerId, instance);
}

return instance;
}

public stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
Expand Down Expand Up @@ -156,27 +145,26 @@
};
}

private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
public readonly configuredPubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
public constructor(options: ConnectionManagerConstructorOptions) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirming that we are shifting out of the singleton pattern?

super();
this.libp2p = libp2p;
this.configuredPubsubTopics = configuredPubsubTopics;
this.libp2p = options.libp2p;
this.pubsubTopics = options.pubsubTopics;
this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS,
...options
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC,
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC,
...options.config
};

this.keepAliveManager = new KeepAliveManager({
relay,
libp2p,
options: keepAliveOptions
relay: options.relay,
libp2p: options.libp2p,
options: {
pingKeepAlive: this.options.pingKeepAlive,
relayKeepAlive: this.options.relayKeepAlive
}
});

this.startEventListeners()
Expand All @@ -193,6 +181,29 @@
);
}

public async getConnectedPeers(codec?: string): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true))
.sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[];
Comment on lines +201 to +204
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably abstract this logic into a private helper to differentiate functionalities, something like:

return filterPeersByProtocols(peers).sortPeersByLatency

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference:

public async getPeers(
    {
      numPeers,
      maxBootstrapPeers
    }: {
      numPeers: number;
      maxBootstrapPeers: number;
    } = {
      maxBootstrapPeers: 0,
      numPeers: 0
    }
  ): Promise<Peer[]> {
    // Retrieve all connected peers that support the protocol & shard (if configured)
    const allAvailableConnectedPeers = await this.connectedPeers();

    // Filter the peers based on discovery & number of peers requested
    const filteredPeers = filterPeersByDiscovery(
      allAvailableConnectedPeers,
      numPeers,
      maxBootstrapPeers
    );

    // Sort the peers by latency
    const sortedFilteredPeers = await sortPeersByLatency(
      this.components.peerStore,
      filteredPeers
    );

    if (sortedFilteredPeers.length === 0) {
      this.log.warn(
        "No peers found. Ensure you have a connection to the network."
      );
    }

    if (sortedFilteredPeers.length < numPeers) {
      this.log.warn(
        `Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
      );
    }

    return sortedFilteredPeers;
  }

}

private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = [];
Expand Down Expand Up @@ -253,7 +264,7 @@
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 267 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 267 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}`
);
}
Expand Down Expand Up @@ -478,7 +489,7 @@

log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubsubTopics
this.pubsubTopics
}).
Not dialing.`
);
Expand Down Expand Up @@ -573,7 +584,7 @@
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);

const isTopicConfigured = pubsubTopics.some((topic) =>
this.configuredPubsubTopics.includes(topic)
this.pubsubTopics.includes(topic)
);
return isTopicConfigured;
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/lib/connection_manager/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { ConnectionManager } from "./connection_manager.js";
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import type { PeerId } from "@libp2p/interface";
import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";

import { createEncoder } from "./message/version_0.js";
import { createEncoder } from "../message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = new Logger("keep-alive");

type KeepAliveOptions = {
pingKeepAlive: number;
relayKeepAlive: number;
};

type CreateKeepAliveManagerOptions = {
options: KeepAliveOptions;
libp2p: Libp2p;
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/lib/connection_manager/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { Peer } from "@libp2p/interface";
import { bytesToUtf8 } from "@waku/utils/bytes";

/**
* Reads peer's metadata and retrieves ping value.
* @param peer Peer or null
* @returns -1 if no ping attached, otherwise returns ping value
*/
export const getPeerPing = (peer: Peer | null): number => {
if (!peer) {
return -1;
}

try {
const bytes = peer.metadata.get("ping");

if (!bytes) {
return -1;
}

return Number(bytesToUtf8(bytes));
} catch (e) {
return -1;
}
};
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);

libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
Expand Down
Loading
Loading