-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: improve peer manager and re-integrate to light push #2191
base: master
Are you sure you want to change the base?
Changes from all commits
c93b58e
7efbd26
a6dd499
e897d5c
e1813bc
9bdc2af
2edd856
7ae3b91
d0c6905
1eaedb3
984326b
8a3337d
d4210c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,7 @@ | |
"upgrader", | ||
"vacp", | ||
"varint", | ||
"weboko", | ||
"waku", | ||
"wakuconnect", | ||
"wakunode", | ||
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,10 +5,7 @@ import type { | |
Libp2pComponents, | ||
PubsubTopic | ||
} from "@waku/interfaces"; | ||
import { Logger } from "@waku/utils"; | ||
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; | ||
|
||
import { filterPeersByDiscovery } from "./filterPeers.js"; | ||
import { StreamManager } from "./stream_manager/index.js"; | ||
|
||
/** | ||
|
@@ -23,7 +20,6 @@ export class BaseProtocol implements IBaseProtocolCore { | |
protected constructor( | ||
public multicodec: string, | ||
protected components: Libp2pComponents, | ||
private log: Logger, | ||
public readonly pubsubTopics: PubsubTopic[] | ||
) { | ||
this.addLibp2pEventListener = components.events.addEventListener.bind( | ||
|
@@ -45,73 +41,4 @@ export class BaseProtocol implements IBaseProtocolCore { | |
protected async getStream(peer: Peer): Promise<Stream> { | ||
return this.streamManager.getStream(peer); | ||
} | ||
|
||
/** | ||
* Returns known peers from the address book (`libp2p.peerStore`) that support | ||
* the class protocol. Waku may or may not be currently connected to these | ||
* peers. | ||
*/ | ||
public async allPeers(): Promise<Peer[]> { | ||
return getPeersForProtocol(this.components.peerStore, [this.multicodec]); | ||
} | ||
|
||
public async connectedPeers(): Promise<Peer[]> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still have access to this utility function from elsewhere? |
||
const peers = await this.allPeers(); | ||
return peers.filter((peer) => { | ||
const connections = this.components.connectionManager.getConnections( | ||
peer.id | ||
); | ||
return connections.length > 0; | ||
}); | ||
} | ||
|
||
/** | ||
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency. | ||
* | ||
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. | ||
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. | ||
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap. | ||
*/ | ||
public async getPeers( | ||
{ | ||
numPeers, | ||
maxBootstrapPeers | ||
}: { | ||
numPeers: number; | ||
maxBootstrapPeers: number; | ||
} = { | ||
maxBootstrapPeers: 0, | ||
numPeers: 0 | ||
} | ||
): Promise<Peer[]> { | ||
// Retrieve all connected peers that support the protocol & shard (if configured) | ||
const allAvailableConnectedPeers = await this.connectedPeers(); | ||
|
||
// Filter the peers based on discovery & number of peers requested | ||
const filteredPeers = filterPeersByDiscovery( | ||
allAvailableConnectedPeers, | ||
numPeers, | ||
maxBootstrapPeers | ||
); | ||
|
||
// Sort the peers by latency | ||
const sortedFilteredPeers = await sortPeersByLatency( | ||
this.components.peerStore, | ||
filteredPeers | ||
); | ||
|
||
if (sortedFilteredPeers.length === 0) { | ||
this.log.warn( | ||
"No peers found. Ensure you have a connection to the network." | ||
); | ||
} | ||
|
||
if (sortedFilteredPeers.length < numPeers) { | ||
this.log.warn( | ||
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.` | ||
); | ||
} | ||
|
||
return sortedFilteredPeers; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,6 @@ | |
IConnectionStateEvents, | ||
IPeersByDiscoveryEvents, | ||
IRelay, | ||
KeepAliveOptions, | ||
PeersByDiscoveryResult, | ||
PubsubTopic, | ||
ShardInfo | ||
|
@@ -20,23 +19,36 @@ | |
import { Logger } from "@waku/utils"; | ||
|
||
import { KeepAliveManager } from "./keep_alive_manager.js"; | ||
import { getPeerPing } from "./utils.js"; | ||
|
||
const log = new Logger("connection-manager"); | ||
|
||
export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; | ||
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; | ||
export const DEFAULT_MAX_PARALLEL_DIALS = 3; | ||
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; | ||
const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; | ||
const DEFAULT_MAX_PARALLEL_DIALS = 3; | ||
|
||
const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60; | ||
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60; | ||
|
||
type ConnectionManagerConstructorOptions = { | ||
libp2p: Libp2p; | ||
pubsubTopics: PubsubTopic[]; | ||
relay?: IRelay; | ||
config?: Partial<ConnectionManagerOptions>; | ||
}; | ||
|
||
export class ConnectionManager | ||
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> | ||
implements IConnectionManager | ||
{ | ||
private static instances = new Map<string, ConnectionManager>(); | ||
// TODO(weboko): make it private | ||
public readonly pubsubTopics: PubsubTopic[]; | ||
|
||
private keepAliveManager: KeepAliveManager; | ||
private options: ConnectionManagerOptions; | ||
private libp2p: Libp2p; | ||
private dialAttemptsForPeer: Map<string, number> = new Map(); | ||
private dialErrorsForPeer: Map<string, any> = new Map(); | ||
Check warning on line 51 in packages/core/src/lib/connection_manager/connection_manager.ts GitHub Actions / check
|
||
|
||
private currentActiveParallelDialCount = 0; | ||
private pendingPeerDialQueue: Array<PeerId> = []; | ||
|
@@ -51,29 +63,6 @@ | |
return this.isP2PNetworkConnected; | ||
} | ||
|
||
public static create( | ||
peerId: string, | ||
libp2p: Libp2p, | ||
keepAliveOptions: KeepAliveOptions, | ||
pubsubTopics: PubsubTopic[], | ||
relay?: IRelay, | ||
options?: ConnectionManagerOptions | ||
): ConnectionManager { | ||
let instance = ConnectionManager.instances.get(peerId); | ||
if (!instance) { | ||
instance = new ConnectionManager( | ||
libp2p, | ||
keepAliveOptions, | ||
pubsubTopics, | ||
relay, | ||
options | ||
); | ||
ConnectionManager.instances.set(peerId, instance); | ||
} | ||
|
||
return instance; | ||
} | ||
|
||
public stop(): void { | ||
this.keepAliveManager.stopAll(); | ||
this.libp2p.removeEventListener( | ||
|
@@ -156,27 +145,26 @@ | |
}; | ||
} | ||
|
||
private constructor( | ||
libp2p: Libp2p, | ||
keepAliveOptions: KeepAliveOptions, | ||
public readonly configuredPubsubTopics: PubsubTopic[], | ||
relay?: IRelay, | ||
options?: Partial<ConnectionManagerOptions> | ||
) { | ||
public constructor(options: ConnectionManagerConstructorOptions) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. confirming that we are shifting out of the singleton pattern? |
||
super(); | ||
this.libp2p = libp2p; | ||
this.configuredPubsubTopics = configuredPubsubTopics; | ||
this.libp2p = options.libp2p; | ||
this.pubsubTopics = options.pubsubTopics; | ||
this.options = { | ||
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, | ||
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, | ||
maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS, | ||
...options | ||
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC, | ||
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC, | ||
...options.config | ||
}; | ||
|
||
this.keepAliveManager = new KeepAliveManager({ | ||
relay, | ||
libp2p, | ||
options: keepAliveOptions | ||
relay: options.relay, | ||
libp2p: options.libp2p, | ||
options: { | ||
pingKeepAlive: this.options.pingKeepAlive, | ||
relayKeepAlive: this.options.relayKeepAlive | ||
} | ||
}); | ||
|
||
this.startEventListeners() | ||
|
@@ -193,6 +181,29 @@ | |
); | ||
} | ||
|
||
public async getConnectedPeers(codec?: string): Promise<Peer[]> { | ||
const peerIDs = this.libp2p.getPeers(); | ||
|
||
if (peerIDs.length === 0) { | ||
return []; | ||
} | ||
|
||
const peers = await Promise.all( | ||
peerIDs.map(async (id) => { | ||
try { | ||
return await this.libp2p.peerStore.get(id); | ||
} catch (e) { | ||
return null; | ||
} | ||
}) | ||
); | ||
|
||
return peers | ||
.filter((p) => !!p) | ||
.filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true)) | ||
.sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[]; | ||
Comment on lines
+201
to
+204
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably abstract this logic into a private helper to differentiate functionalities, something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reference: public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();
// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);
// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.components.peerStore,
filteredPeers
);
if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}
if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}
return sortedFilteredPeers;
} |
||
} | ||
|
||
private async dialPeerStorePeers(): Promise<void> { | ||
const peerInfos = await this.libp2p.peerStore.all(); | ||
const dialPromises = []; | ||
|
@@ -253,7 +264,7 @@ | |
// Handle generic error | ||
log.error( | ||
`Error dialing peer ${peerId.toString()} - ${ | ||
(error as any).message | ||
Check warning on line 267 in packages/core/src/lib/connection_manager/connection_manager.ts GitHub Actions / check
|
||
}` | ||
); | ||
} | ||
|
@@ -478,7 +489,7 @@ | |
|
||
log.warn( | ||
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ | ||
this.configuredPubsubTopics | ||
this.pubsubTopics | ||
}). | ||
Not dialing.` | ||
); | ||
|
@@ -573,7 +584,7 @@ | |
const pubsubTopics = shardInfoToPubsubTopics(shardInfo); | ||
|
||
const isTopicConfigured = pubsubTopics.some((topic) => | ||
this.configuredPubsubTopics.includes(topic) | ||
this.pubsubTopics.includes(topic) | ||
); | ||
return isTopicConfigured; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export { ConnectionManager } from "./connection_manager.js"; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import type { Peer } from "@libp2p/interface"; | ||
import { bytesToUtf8 } from "@waku/utils/bytes"; | ||
|
||
/** | ||
* Reads peer's metadata and retrieves ping value. | ||
* @param peer Peer or null | ||
* @returns -1 if no ping attached, otherwise returns ping value | ||
*/ | ||
export const getPeerPing = (peer: Peer | null): number => { | ||
if (!peer) { | ||
return -1; | ||
} | ||
|
||
try { | ||
const bytes = peer.metadata.get("ping"); | ||
|
||
if (!bytes) { | ||
return -1; | ||
} | ||
|
||
return Number(bytesToUtf8(bytes)); | ||
} catch (e) { | ||
return -1; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we deleting the export for
KeepAliveManager
?