Skip to content

Commit

Permalink
feat: store peer usage for protocols in metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Oct 9, 2024
1 parent b7bdb60 commit 919aeac
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 21 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class ConnectionManager
private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
public libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Expand Down
8 changes: 7 additions & 1 deletion packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface";
import type {
Libp2p,
Peer,
PeerId,
TypedEventEmitter
} from "@libp2p/interface";

import { PubsubTopic } from "./misc.js";

Expand Down Expand Up @@ -67,4 +72,5 @@ export interface IConnectionManager
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
libp2p: Libp2p;
}
90 changes: 71 additions & 19 deletions packages/sdk/src/protocols/peer_manager.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Peer, PeerId } from "@libp2p/interface";
import { utf8ToBytes } from "@noble/hashes/utils";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IHealthManager } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Mutex } from "async-mutex";

const METADATA_KEY = "usedByProtocol";

export class PeerManager {
private peers: Map<string, Peer> = new Map();
private healthManager: IHealthManager;

private readMutex = new Mutex();
Expand All @@ -26,48 +28,58 @@ export class PeerManager {
return this.writeLockHolder;
}

public getPeers(): Peer[] {
return Array.from(this.peers.values());
public async getPeers(): Promise<Peer[]> {
const allPeers = await this.connectionManager.libp2p.peerStore.all();
return allPeers.filter((peer) => this.isPeerUsedByProtocol(peer));
}

public async addPeer(peer: Peer): Promise<void> {
return this.writeMutex.runExclusive(async () => {
this.writeLockHolder = `addPeer: ${peer.id.toString()}`;
await this.connectionManager.attemptDial(peer.id);
this.peers.set(peer.id.toString(), peer);
await this._addPeer(peer);
this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
const peerCount = await this.getPeerCount();
this.healthManager.updateProtocolHealth(this.core.multicodec, peerCount);
this.writeLockHolder = null;
});
}

public async removePeer(peerId: PeerId): Promise<void> {
return this.writeMutex.runExclusive(() => {
return this.writeMutex.runExclusive(async () => {
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
this.peers.delete(peerId.toString());
this.log.info(`Removed peer: ${peerId.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
const peer = await this.connectionManager.libp2p.peerStore.get(peerId);
if (peer) {
await this._removePeer(peer);
this.log.info(`Removed peer: ${peerId.toString()}`);
const peerCount = await this.getPeerCount();
this.healthManager.updateProtocolHealth(
this.core.multicodec,
peerCount
);
}
this.writeLockHolder = null;
});
}

public async getPeerCount(): Promise<number> {
return this.readMutex.runExclusive(() => this.peers.size);
return this.readMutex.runExclusive(async () => {
const peers = await this.getPeers();
return peers.length;
});
}

public async hasPeers(): Promise<boolean> {
return this.readMutex.runExclusive(() => this.peers.size > 0);
return this.readMutex.runExclusive(async () => {
const peerCount = await this.getPeerCount();
return peerCount > 0;
});
}

public async removeExcessPeers(excessPeers: number): Promise<void> {
this.log.info(`Removing ${excessPeers} excess peer(s)`);
const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers);
const peers = await this.getPeers();
const peersToRemove = peers.slice(0, excessPeers);
for (const peer of peersToRemove) {
await this.removePeer(peer.id);
}
Expand All @@ -94,8 +106,9 @@ export class PeerManager {
const connectedPeers = await this.core.getPeers();

return this.readMutex.runExclusive(async () => {
const currentPeers = await this.getPeers();
const newPeers = connectedPeers
.filter((peer) => !this.peers.has(peer.id.toString()))
.filter((peer) => !currentPeers.some((p) => p.id.equals(peer.id)))
.slice(0, numPeers);

return newPeers;
Expand All @@ -110,4 +123,43 @@ export class PeerManager {
}
return addedPeers;
}

private async _addPeer(peer: Peer): Promise<void> {
const connectedPeers = this.connectionManager.libp2p.getPeers();
if (connectedPeers.some((p) => p.equals(peer.id))) {
return;
}
const _peer = await this.connectionManager.libp2p.peerStore.get(peer.id);
if (!_peer) {
return;
}
this.updatePeerMetadataWithUsageStatus(peer, true);
}

private async _removePeer(peer: Peer): Promise<void> {
this.updatePeerMetadataWithUsageStatus(peer, false);
}

private updatePeerMetadataWithUsageStatus(
peer: Peer,
isCurrentlyUsed: boolean
): void {
if (isCurrentlyUsed) {
peer.metadata.set(
METADATA_KEY,
utf8ToBytes(this.core.multicodec.toString())
);
} else {
peer.metadata.delete(METADATA_KEY);
}
}

private isPeerUsedByProtocol(peer: Peer): boolean {
const usedByProtocol = peer.metadata.get(METADATA_KEY);
return usedByProtocol
? utf8ToBytes(this.core.multicodec.toString()).every(
(byte, index) => byte === usedByProtocol[index]
)
: false;
}
}

0 comments on commit 919aeac

Please sign in to comment.