diff --git a/package-lock.json b/package-lock.json index cb8799416c..204126a6e3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11822,6 +11822,15 @@ "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", "license": "MIT" }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "license": "MIT", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -39397,7 +39406,7 @@ "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "^15.1.0", - "@libp2p/bootstrap": "^10.1.2", + "@libp2p/bootstrap": "^10", "@libp2p/identify": "^2.1.2", "@libp2p/mplex": "^10.1.2", "@libp2p/ping": "^1.1.2", @@ -39409,6 +39418,7 @@ "@waku/message-hash": "0.1.16", "@waku/proto": "^0.0.8", "@waku/utils": "0.0.20", + "async-mutex": "^0.5.0", "libp2p": "^1.8.1" }, "devDependencies": { diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 8ce6162a50..bcc6ff4ec4 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,16 +1,12 @@ import type { Libp2p } from "@libp2p/interface"; -import type { Peer, PeerStore, Stream } from "@libp2p/interface"; +import type { Peer, Stream } from "@libp2p/interface"; import type { IBaseProtocolCore, Libp2pComponents, PubsubTopic } from "@waku/interfaces"; -import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; -import { - getConnectedPeersForProtocolAndShard, - getPeersForProtocol, - sortPeersByLatency -} from "@waku/utils/libp2p"; +import { Logger } from "@waku/utils"; +import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; @@ -26,7 +22,7 @@ export class BaseProtocol implements IBaseProtocolCore { protected constructor( public multicodec: string, - private components: Libp2pComponents, + protected components: Libp2pComponents, private log: Logger, public readonly pubsubTopics: PubsubTopic[] ) { @@ -50,24 +46,24 @@ export class BaseProtocol implements IBaseProtocolCore { return this.streamManager.getStream(peer); } - public get peerStore(): PeerStore { - return this.components.peerStore; - } - + //TODO: move to SDK /** * Returns known peers from the address book (`libp2p.peerStore`) that support * the class protocol. Waku may or may not be currently connected to these * peers. */ public async allPeers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); + return getPeersForProtocol(this.components.peerStore, [this.multicodec]); } - public async connectedPeers(): Promise { + public async connectedPeers(withOpenStreams = false): Promise { const peers = await this.allPeers(); return peers.filter((peer) => { - return ( - this.components.connectionManager.getConnections(peer.id).length > 0 + const connections = this.components.connectionManager.getConnections( + peer.id + ); + return connections.some((c) => + c.streams.some((s) => s.protocol === this.multicodec) ); }); } @@ -77,9 +73,8 @@ export class BaseProtocol implements IBaseProtocolCore { * * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - - * @returns A list of peers that support the protocol sorted by latency. - */ + * @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap. + */ public async getPeers( { numPeers, @@ -88,7 +83,7 @@ export class BaseProtocol implements IBaseProtocolCore { numPeers: number; maxBootstrapPeers: number; } = { - maxBootstrapPeers: 1, + maxBootstrapPeers: 0, numPeers: 0 } ): Promise { @@ -96,21 +91,21 @@ export class BaseProtocol implements IBaseProtocolCore { const connectedPeersForProtocolAndShard = await getConnectedPeersForProtocolAndShard( this.components.connectionManager.getConnections(), - this.peerStore, + this.components.peerStore, [this.multicodec], pubsubTopicsToShardInfo(this.pubsubTopics) ); // Filter the peers based on discovery & number of peers requested const filteredPeers = filterPeersByDiscovery( - connectedPeersForProtocolAndShard, + allAvailableConnectedPeers, numPeers, maxBootstrapPeers ); // Sort the peers by latency const sortedFilteredPeers = await sortPeersByLatency( - this.peerStore, + this.components.peerStore, filteredPeers ); diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5300718b2b..fdc19698a2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { wakuMessage: WakuMessage, peerIdStr: string ) => Promise, + private handleError: (error: Error) => Promise, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { () => { log.info("Receiving pipe closed."); }, - (e) => { - log.error("Error with receiving pipe", e); + async (e) => { + log.error( + "Error with receiving pipe", + e, + " -- ", + "on peer ", + connection.remotePeer.toString(), + " -- ", + "stream ", + stream + ); + await this.handleError(e); } ); } catch (e) { diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 3dbf7ed8a2..1744450c7b 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata { pubsubTopicsToShardInfo(this.pubsubTopics) ); - const peer = await this.peerStore.get(peerId); + const peer = await this.libp2pComponents.peerStore.get(peerId); if (!peer) { return { shardInfo: null, diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 43d337728d..6030b86f76 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { numPeers: BigInt(numPeers) }); - const peer = await this.peerStore.get(peerId); + const peer = await this.components.peerStore.get(peerId); if (!peer) { return { peerInfos: null, diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5b5e9ea919..4f324287fc 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -1,6 +1,6 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import type { Peer, PeerStore } from "@libp2p/interface"; +import type { Peer } from "@libp2p/interface"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; @@ -18,14 +18,14 @@ export type IBaseProtocolCore = { multicodec: string; peerStore: PeerStore; allPeers: () => Promise; - connectedPeers: () => Promise; + connectedPeers: (withOpenStreams?: boolean) => Promise; addLibp2pEventListener: Libp2p["addEventListener"]; removeLibp2pEventListener: Libp2p["removeEventListener"]; }; export type IBaseProtocolSDK = { readonly connectedPeers: Peer[]; - renewPeer: (peerToDisconnect: PeerId) => Promise; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly numPeersToUse: number; }; @@ -36,10 +36,6 @@ export type NetworkConfig = StaticSharding | AutoSharding; * Options for using LightPush and Filter */ export type ProtocolUseOptions = { - /** - * Optional flag to enable auto-retry with exponential backoff - */ - autoRetry?: boolean; /** * Optional flag to force using all available peers */ @@ -48,14 +44,6 @@ export type ProtocolUseOptions = { * Optional maximum number of attempts for exponential backoff */ maxAttempts?: number; - /** - * Optional initial delay in milliseconds for exponential backoff - */ - initialDelay?: number; - /** - * Optional maximum delay in milliseconds for exponential backoff - */ - maxDelay?: number; }; export type ProtocolCreateOptions = { diff --git a/packages/sdk/package.json b/packages/sdk/package.json index e2a6c6dc0b..befad4a17f 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -58,7 +58,7 @@ }, "dependencies": { "@chainsafe/libp2p-noise": "^15.1.0", - "@libp2p/bootstrap": "^10.1.2", + "@libp2p/bootstrap": "^10", "@libp2p/identify": "^2.1.2", "@libp2p/mplex": "^10.1.2", "@libp2p/ping": "^1.1.2", @@ -67,9 +67,10 @@ "@waku/core": "0.0.32", "@waku/discovery": "0.0.5", "@waku/interfaces": "0.0.27", + "@waku/message-hash": "0.1.16", "@waku/proto": "^0.0.8", "@waku/utils": "0.0.20", - "@waku/message-hash": "0.1.16", + "async-mutex": "^0.5.0", "libp2p": "^1.8.1" }, "devDependencies": { diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 52009aa05c..d091bd0724 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,36 +1,38 @@ import type { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager, getHealthManager } from "@waku/core"; +import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { IBaseProtocolSDK, IHealthManager, + PeerIdStr, ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; +import { Mutex } from "async-mutex"; interface Options { numPeersToUse?: number; maintainPeersInterval?: number; } -const RENEW_TIME_LOCK_DURATION = 30 * 1000; const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { - private healthManager: IHealthManager; + private peerManager: PeerManager; public readonly numPeersToUse: number; - private peers: Peer[] = []; + private peers: Map = new Map(); private maintainPeersIntervalId: ReturnType< typeof window.setInterval > | null = null; private log: Logger; - private maintainPeersLock = false; private readonly renewPeersLocker = new RenewPeerLocker( RENEW_TIME_LOCK_DURATION ); + private peersMutex = new Mutex(); + public constructor( protected core: BaseProtocol, protected connectionManager: ConnectionManager, @@ -38,17 +40,21 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ) { this.log = new Logger(`sdk:${core.multicodec}`); - this.healthManager = getHealthManager(); + this.peerManager = new PeerManager(connectionManager, core, this.log); this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + this.log.info( + `Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms` + ); + // void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } public get connectedPeers(): Peer[] { - return this.peers; + return Array.from(this.peers.values()); } /** @@ -56,17 +62,10 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param peerToDisconnect The peer to disconnect from. * @returns The new peer that was found and connected to. */ - public async renewPeer(peerToDisconnect: PeerId): Promise { - this.log.info(`Renewing peer ${peerToDisconnect}`); - - await this.connectionManager.dropConnection(peerToDisconnect); + public async renewPeer(peerToDisconnect: PeerId): Promise { + this.log.info(`Attempting to renew peer ${peerToDisconnect}`); - const peer = (await this.findAndAddPeers(1))[0]; - if (!peer) { - this.log.error( - "Failed to find a new peer to replace the disconnected one." - ); - } + await this.connectionManager.dropConnection(peerToDisconnect); const updatedPeers = this.peers.filter( (peer) => !peer.id.equals(peerToDisconnect) @@ -77,9 +76,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); + const newPeer = await this.findAndAddPeers(1); + if (newPeer.length === 0) { + this.log.error( + "Failed to find a new peer to replace the disconnected one" + ); + return undefined; + } + this.renewPeersLocker.lock(peerToDisconnect); - return peer; + return newPeer[0]; } /** @@ -90,75 +97,113 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { clearInterval(this.maintainPeersIntervalId); this.maintainPeersIntervalId = null; this.log.info("Maintain peers interval stopped"); + } else { + this.log.debug("Maintain peers interval was not running"); } } + //TODO: validate if adding event listeners for peer connect and disconnect is needed + // private setupEventListeners(): void { + // this.core.addLibp2pEventListener( + // "peer:connect", + // () => void this.confirmPeers() + // ); + // this.core.addLibp2pEventListener( + // "peer:disconnect", + // () => void this.confirmPeers() + // ); + // } + /** - * Checks if there are peers to send a message to. - * If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. - * If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager. - * If `autoRetry` is `false`, returns `false` if no peers are found. - * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff. - * Returns `true` if peers are found, `false` otherwise. + * Checks if there are sufficient peers to send a message to. + * If `forceUseAllPeers` is `false` (default), returns `true` if there are any connected peers. + * If `forceUseAllPeers` is `true`, attempts to connect to `numPeersToUse` peers. * @param options Optional options object - * @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) - * @param options.forceUseAllPeers Optional flag to force using all available peers (default: false) - * @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10) - * @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3) - * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) + * @param options.forceUseAllPeers Optional flag to force connecting to `numPeersToUse` peers (default: false) + * @param options.maxAttempts Optional maximum number of attempts to reach the required number of peers (default: 3) + * @returns `true` if the required number of peers are connected, `false` otherwise */ - protected hasPeers = async ( + protected async hasPeers( options: Partial = {} - ): Promise => { - const { - autoRetry = false, - forceUseAllPeers = false, - initialDelay = 10, - maxAttempts = 3, - maxDelay = 100 - } = options; + ): Promise { + const { forceUseAllPeers = false, maxAttempts = 3 } = options; - if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; + let needsMaintenance: boolean; + let currentPeerCount: number; + + const release = await this.peersMutex.acquire(); + try { + currentPeerCount = this.connectedPeers.length; + needsMaintenance = forceUseAllPeers || currentPeerCount === 0; + } finally { + release(); + } + + if (!needsMaintenance) return true; let attempts = 0; while (attempts < maxAttempts) { attempts++; if (await this.maintainPeers()) { - if (this.peers.length < this.numPeersToUse) { - this.log.warn( - `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` - ); + const finalRelease = await this.peersMutex.acquire(); + try { + if (this.peers.size < this.numPeersToUse) { + this.log.warn( + `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` + ); + } + return true; + } finally { + finalRelease(); } + } + if (!autoRetry) { + return false; + } + //TODO: handle autoRetry + } + + for (let attempts = 0; attempts < maxAttempts; attempts++) { + this.log.debug( + `Attempt ${attempts + 1}/${maxAttempts} to reach required number of peers` + ); + await this.maintainPeers(); + + if (this.connectedPeers.length >= this.numPeersToUse) { + this.log.info( + `Required number of peers (${this.numPeersToUse}) reached` + ); return true; } - if (!autoRetry) return false; - const delayMs = Math.min( - initialDelay * Math.pow(2, attempts - 1), - maxDelay + + this.log.warn( + `Found only ${this.connectedPeers.length}/${this.numPeersToUse} required peers. Retrying...` ); - await delay(delayMs); } - this.log.error("Failed to find peers to send message to"); + this.log.error( + `Failed to find required number of peers (${this.numPeersToUse}) after ${maxAttempts} attempts` + ); return false; - }; + } /** * Starts an interval to maintain the peers list to `numPeersToUse`. * @param interval The interval in milliseconds to maintain the peers. */ private async startMaintainPeersInterval(interval: number): Promise { - this.log.info("Starting maintain peers interval"); + this.log.info( + `Starting maintain peers interval with ${interval}ms interval` + ); try { - await this.maintainPeers(); + // await this.maintainPeers(); this.maintainPeersIntervalId = setInterval(() => { + this.log.debug("Running scheduled peer maintenance"); this.maintainPeers().catch((error) => { - this.log.error("Error during maintain peers interval:", error); + this.log.error("Error during scheduled peer maintenance:", error); }); }, interval); - this.log.info( - `Maintain peers interval started with interval ${interval}ms` - ); + this.log.info("Maintain peers interval started successfully"); } catch (error) { this.log.error("Error starting maintain peers interval:", error); throw error; @@ -169,25 +214,51 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * Maintains the peers list to `numPeersToUse`. */ private async maintainPeers(): Promise { - if (this.maintainPeersLock) { - return false; - } + try { + await this.confirmPeers(); + + const numPeersToAdd = await this.peersMutex.runExclusive(() => { + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + return this.numPeersToUse - this.peers.size; + }); this.maintainPeersLock = true; + await this.confirmPeers(); this.log.info(`Maintaining peers, current count: ${this.peers.length}`); try { - const numPeersToAdd = this.numPeersToUse - this.peers.length; + await this.confirmPeers(); + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + + const numPeersToAdd = this.numPeersToUse - this.peers.size; if (numPeersToAdd > 0) { - await this.findAndAddPeers(numPeersToAdd); + this.log.info(`Attempting to add ${numPeersToAdd} peer(s)`); + await this.peerManager.findAndAddPeers(numPeersToAdd); + } else { + this.log.info( + `Attempting to remove ${Math.abs(numPeersToAdd)} excess peer(s)` + ); + await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd)); } - this.log.info( - `Peer maintenance completed, current count: ${this.peers.length}` - ); - this.renewPeersLocker.cleanUnlocked(); - } finally { - this.maintainPeersLock = false; - } - return true; + }); + } + + private async confirmPeers(): Promise { + const connectedPeers = await this.core.connectedPeers(); + const currentPeers = this.peers; + const peersToAdd = connectedPeers.filter( + (p) => !currentPeers.some((cp) => cp.id.equals(p.id)) + ); + const peersToRemove = currentPeers.filter( + (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) + ); + + peersToAdd.forEach((p) => this.peers.push(p)); + peersToRemove.forEach((p) => { + const index = this.peers.findIndex((cp) => cp.id.equals(p.id)); + if (index !== -1) this.peers.splice(index, 1); + }); + + this.updatePeers(this.peers); } /** @@ -195,25 +266,32 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param numPeers The number of peers to find and add. */ private async findAndAddPeers(numPeers: number): Promise { - this.log.info(`Finding and adding ${numPeers} new peers`); + let newPeers: Peer[]; + const release = await this.peersMutex.acquire(); try { - const additionalPeers = await this.findAdditionalPeers(numPeers); - const dials = additionalPeers.map((peer) => - this.connectionManager.attemptDial(peer.id) - ); - - await Promise.all(dials); + this.log.info(`Finding and adding ${numPeers} new peers`); + newPeers = await this.findAdditionalPeers(numPeers); + } finally { + release(); + } - const updatedPeers = [...this.peers, ...additionalPeers]; - this.updatePeers(updatedPeers); + const dials = await Promise.all( + newPeers.map((peer) => this.connectionManager.attemptDial(peer.id)) + ); + const finalRelease = await this.peersMutex.acquire(); + try { + const successfulPeers = newPeers.filter((_, index) => dials[index]); + successfulPeers.forEach((peer) => + this.peers.set(peer.id.toString(), peer) + ); + this.updatePeers(new Map(this.peers)); this.log.info( - `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` + `Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}` ); - return additionalPeers; - } catch (error) { - this.log.error("Error finding and adding new peers:", error); - throw error; + return successfulPeers; + } finally { + finalRelease(); } } @@ -234,9 +312,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } newPeers = newPeers - .filter( - (peer) => this.peers.some((p) => p.id.equals(peer.id)) === false - ) + .filter((peer) => !this.peers.has(peer.id.toString())) .filter((peer) => !this.renewPeersLocker.isLocked(peer.id)) .slice(0, numPeers); @@ -247,11 +323,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } - private updatePeers(peers: Peer[]): void { + private updatePeers(peers: Map): void { this.peers = peers; this.healthManager.updateProtocolHealth( this.core.multicodec, - this.peers.length + this.peers.size ); } } @@ -276,7 +352,7 @@ class RenewPeerLocker { } public cleanUnlocked(): void { - Object.entries(this.peers).forEach(([id, lock]) => { + Array.from(this.peers.entries()).forEach(([id, lock]) => { if (this.isTimeUnlocked(lock)) { this.peers.delete(id.toString()); } diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 3889e7638b..01cea6859f 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,4 +1,4 @@ -export const DEFAULT_KEEP_ALIVE = 30 * 1000; +export const DEFAULT_KEEP_ALIVE = 10_000; export const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index c8840ea0e4..c9db3cd80e 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -54,6 +54,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, + async (error: Error) => { + log.error("Error with receiving pipe", error); + }, connectionManager.configuredPubsubTopics, libp2p ), diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 51a9e16e8d..4300246624 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly protocol: FilterCore, private readonly connectionManager: ConnectionManager, private readonly getPeers: () => Peer[], - private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + private readonly renewPeer: ( + peerToDisconnect: PeerId + ) => Promise ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -251,11 +253,13 @@ export class SubscriptionManager implements ISubscriptionSDK { } private startSubscriptionsMaintenance(interval: number): void { + log.info("Starting subscriptions maintenance"); this.startKeepAlivePings(interval); this.startConnectionListener(); } private stopSubscriptionsMaintenance(): void { + log.info("Stopping subscriptions maintenance"); this.stopKeepAlivePings(); this.stopConnectionListener(); } @@ -299,9 +303,10 @@ export class SubscriptionManager implements ISubscriptionSDK { } this.keepAliveTimer = setInterval(() => { - void this.ping().catch((error) => { - log.error("Error in keep-alive ping cycle:", error); - }); + log.info("Sending keep-alive ping"); + void this.ping() + .then(() => log.info("Keep-alive ping successful")) + .catch((error) => log.error("Error in keep-alive ping cycle:", error)); }, interval) as unknown as number; } diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts new file mode 100644 index 0000000000..dcdc024f1b --- /dev/null +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -0,0 +1,113 @@ +import { Peer, PeerId } from "@libp2p/interface"; +import { ConnectionManager, getHealthManager } from "@waku/core"; +import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { IHealthManager } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; +import { Mutex } from "async-mutex"; + +export class PeerManager { + private peers: Map = new Map(); + private healthManager: IHealthManager; + + private readMutex = new Mutex(); + private writeMutex = new Mutex(); + private writeLockHolder: string | null = null; + + public constructor( + private readonly connectionManager: ConnectionManager, + private readonly core: BaseProtocol, + private readonly log: Logger + ) { + this.healthManager = getHealthManager(); + this.healthManager.updateProtocolHealth(this.core.multicodec, 0); + } + + public getWriteLockHolder(): string | null { + return this.writeLockHolder; + } + + public getPeers(): Peer[] { + return Array.from(this.peers.values()); + } + + public async addPeer(peer: Peer): Promise { + return this.writeMutex.runExclusive(async () => { + this.writeLockHolder = `addPeer: ${peer.id.toString()}`; + await this.connectionManager.attemptDial(peer.id); + this.peers.set(peer.id.toString(), peer); + this.log.info(`Added and dialed peer: ${peer.id.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); + this.writeLockHolder = null; + }); + } + + public async removePeer(peerId: PeerId): Promise { + return this.writeMutex.runExclusive(() => { + this.writeLockHolder = `removePeer: ${peerId.toString()}`; + this.peers.delete(peerId.toString()); + this.log.info(`Removed peer: ${peerId.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); + this.writeLockHolder = null; + }); + } + + public async getPeerCount(): Promise { + return this.readMutex.runExclusive(() => this.peers.size); + } + + public async hasPeers(): Promise { + return this.readMutex.runExclusive(() => this.peers.size > 0); + } + + public async removeExcessPeers(excessPeers: number): Promise { + this.log.info(`Removing ${excessPeers} excess peer(s)`); + const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers); + for (const peer of peersToRemove) { + await this.removePeer(peer.id); + } + } + + /** + * Finds and adds new peers to the peers list. + * @param numPeers The number of peers to find and add. + */ + public async findAndAddPeers(numPeers: number): Promise { + const additionalPeers = await this.findPeers(numPeers); + if (additionalPeers.length === 0) { + this.log.warn("No additional peers found"); + return []; + } + return this.addMultiplePeers(additionalPeers); + } + + /** + * Finds additional peers. + * @param numPeers The number of peers to find. + */ + public async findPeers(numPeers: number): Promise { + const connectedPeers = await this.core.getPeers(); + + return this.readMutex.runExclusive(async () => { + const newPeers = connectedPeers + .filter((peer) => !this.peers.has(peer.id.toString())) + .slice(0, numPeers); + + return newPeers; + }); + } + + public async addMultiplePeers(peers: Peer[]): Promise { + const addedPeers: Peer[] = []; + for (const peer of peers) { + await this.addPeer(peer); + addedPeers.push(peer); + } + return addedPeers; + } +} diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index 1e420dd921..b0d8dc1c3a 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -18,7 +18,7 @@ export class ReliabilityMonitorManager { public static createReceiverMonitor( pubsubTopic: PubsubTopic, getPeers: () => Peer[], - renewPeer: (peerId: PeerId) => Promise, + renewPeer: (peerId: PeerId) => Promise, getContentTopics: () => ContentTopic[], protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -42,7 +42,7 @@ export class ReliabilityMonitorManager { } public static createSenderMonitor( - renewPeer: (peerId: PeerId) => Promise + renewPeer: (peerId: PeerId) => Promise ): SenderReliabilityMonitor { if (!ReliabilityMonitorManager.senderMonitor) { ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 440e35829c..985c52a59d 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -32,7 +32,7 @@ export class ReceiverReliabilityMonitor { public constructor( private readonly pubsubTopic: PubsubTopic, private getPeers: () => Peer[], - private renewPeer: (peerId: PeerId) => Promise, + private renewPeer: (peerId: PeerId) => Promise, private getContentTopics: () => ContentTopic[], private protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -163,15 +163,21 @@ export class ReceiverReliabilityMonitor { private async renewAndSubscribePeer( peerId: PeerId ): Promise { + const peerIdStr = peerId.toString(); try { - if (this.peerRenewalLocks.has(peerId.toString())) { - log.info(`Peer ${peerId.toString()} is already being renewed.`); + if (this.peerRenewalLocks.has(peerIdStr)) { + log.info(`Peer ${peerIdStr} is already being renewed.`); return; } - this.peerRenewalLocks.add(peerId.toString()); + this.peerRenewalLocks.add(peerIdStr); const newPeer = await this.renewPeer(peerId); + if (!newPeer) { + log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); + return; + } + await this.protocolSubscribe( this.pubsubTopic, newPeer, @@ -181,16 +187,16 @@ export class ReceiverReliabilityMonitor { this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + this.peerFailures.delete(peerIdStr); + this.missedMessagesByPeer.delete(peerIdStr); + delete this.receivedMessagesHashes.nodes[peerIdStr]; return newPeer; } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); return; } finally { - this.peerRenewalLocks.delete(peerId.toString()); + this.peerRenewalLocks.delete(peerIdStr); } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts index 0ffe9a1659..914c321da8 100644 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -11,7 +11,9 @@ export class SenderReliabilityMonitor { private readonly maxAttemptsBeforeRenewal = DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + public constructor( + private renewPeer: (peerId: PeerId) => Promise + ) {} public async attemptRetriesOrRenew( peerId: PeerId, @@ -42,13 +44,19 @@ export class SenderReliabilityMonitor { } else { try { const newPeer = await this.renewPeer(peerId); - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); + if (newPeer) { + log.info( + `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); + this.attempts.delete(peerIdStr); + this.attempts.set(newPeer.id.toString(), 0); + await protocolSend(); + } else { + log.error( + `Failed to renew peer ${peerId.toString()}: New peer is undefined` + ); + } } catch (error) { log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); } diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 252340a5bf..dd0ba3ad5c 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -1,22 +1,13 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { LightPushCodec, waitForRemotePeer } from "@waku/core"; import { - ContentTopicInfo, createLightNode, Libp2pComponents, type LightNode, - Protocols, - ShardInfo, Tags, utf8ToBytes } from "@waku/sdk"; -import { - encodeRelayShard, - ensureShardingConfigured, - shardInfoToPubsubTopics -} from "@waku/utils"; -import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p"; +import { encodeRelayShard } from "@waku/utils"; import { expect } from "chai"; import fc from "fast-check"; import Sinon from "sinon"; @@ -24,414 +15,9 @@ import Sinon from "sinon"; import { afterEachCustom, beforeEachCustom, - DefaultTestShardInfo, - delay, - makeLogFileName, - ServiceNode, - tearDownNodes + DefaultTestShardInfo } from "../src/index.js"; -describe("getConnectedPeersForProtocolAndShard", function () { - let waku: LightNode; - let serviceNode1: ServiceNode; - let serviceNode2: ServiceNode; - const contentTopic = "/test/2/waku-light-push/utf8"; - const autoshardingClusterId = 6; - - beforeEachCustom(this, async () => { - serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - serviceNode2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([serviceNode1, serviceNode2], waku); - }); - - it("same cluster, same shard: nodes connect", async function () { - this.timeout(15000); - - const shardInfo: ShardInfo = { - clusterId: 2, - shards: [2] - }; - - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true, - relay: true - }); - - const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo }); - await waku.start(); - await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); - await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo).shardInfo - ); - expect(peers.length).to.be.greaterThan(0); - }); - - it("same cluster, different shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 2, - shards: [2] - }; - - // Separate shard - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // Same shard - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, same shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 3, - shards: [1] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - shardInfo2 - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, different shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 3, - shards: [2] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); - const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - shardInfo2 - ); - expect(peers.length).to.be.equal(1); - }); - - it("same cluster, same shard: nodes connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo }); - await waku.start(); - await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); - await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo).shardInfo - ); - expect(peers.length).to.be.greaterThan(0); - }); - - it("same cluster, different shard: nodes connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: ["/test/5/waku-light-push/utf8"] - }; - - // Separate shard - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // Same shard - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, same shard: nodes don't connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: 2, - contentTopics: [contentTopic] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, different shard: nodes don't connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: 2, - contentTopics: ["/test/5/waku-light-push/utf8"] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); - const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); -}); - describe("getPeers", function () { let peerStore: PeerStore; let connectionManager: Libp2pComponents["connectionManager"]; @@ -566,7 +152,8 @@ describe("getPeers", function () { for (const peer of allPeers) { connections.push({ status: "open", - remotePeer: peer.id + remotePeer: peer.id, + streams: [{ protocol: waku.lightPush.protocol.multicodec }] } as unknown as Connection); } return connections; diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 44c6dbee7c..fdc0915a92 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); + expect(pushResponse.successes.length).to.eq(numServiceNodes); } diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index a275f4e970..9cd66d1812 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -52,11 +52,15 @@ describe("Waku Light Push: Peer Management: E2E", function () { } }); - it("Failed peers are renewed", async function () { + it.only("Failed peers are renewed", async function () { // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); + const response1 = await waku.lightPush.send( + encoder, + { + payload: utf8ToBytes("Hello_World") + }, + { forceUseAllPeers: true } + ); expect(response1.successes.length).to.be.equal( waku.lightPush.numPeersToUse diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index fa9b97c85d..80feaec6b3 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,8 +1,6 @@ -import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { ShardInfo } from "@waku/interfaces"; +import type { Peer, PeerStore } from "@libp2p/interface"; import { bytesToUtf8 } from "../bytes/index.js"; -import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -69,39 +67,3 @@ export async function getPeersForProtocol( }); return peers; } - -export async function getConnectedPeersForProtocolAndShard( - connections: Connection[], - peerStore: PeerStore, - protocols: string[], - shardInfo?: ShardInfo -): Promise { - const openConnections = connections.filter( - (connection) => connection.status === "open" - ); - - const peerPromises = openConnections.map(async (connection) => { - const peer = await peerStore.get(connection.remotePeer); - const supportsProtocol = protocols.some((protocol) => - peer.protocols.includes(protocol) - ); - - if (supportsProtocol) { - if (shardInfo) { - const encodedPeerShardInfo = peer.metadata.get("shardInfo"); - const peerShardInfo = - encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); - - if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { - return peer; - } - } else { - return peer; - } - } - return null; - }); - - const peersWithNulls = await Promise.all(peerPromises); - return peersWithNulls.filter((peer): peer is Peer => peer !== null); -} diff --git a/packages/utils/src/logger/index.ts b/packages/utils/src/logger/index.ts index 725d55f665..1ece843d06 100644 --- a/packages/utils/src/logger/index.ts +++ b/packages/utils/src/logger/index.ts @@ -3,6 +3,7 @@ import debug, { Debugger } from "debug"; const APP_NAME = "waku"; export class Logger { + private _debug: Debugger; private _info: Debugger; private _warn: Debugger; private _error: Debugger; @@ -12,11 +13,16 @@ export class Logger { } public constructor(prefix?: string) { + this._debug = debug(Logger.createDebugNamespace("debug", prefix)); this._info = debug(Logger.createDebugNamespace("info", prefix)); this._warn = debug(Logger.createDebugNamespace("warn", prefix)); this._error = debug(Logger.createDebugNamespace("error", prefix)); } + public get debug(): Debugger { + return this._debug; + } + public get info(): Debugger { return this._info; } @@ -29,7 +35,10 @@ export class Logger { return this._error; } - public log(level: "info" | "warn" | "error", ...args: unknown[]): void { + public log( + level: "debug" | "info" | "warn" | "error", + ...args: unknown[] + ): void { const logger = this[level] as (...args: unknown[]) => void; logger(...args); }