diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 145f208fe8..bbe6bb351b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -20,7 +20,7 @@ export type SubscriptionCallback = { export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; - maxMissedMessagesThreshold?: number; + enableLightPushFilterCheck?: boolean; }; export interface ISubscription { diff --git a/packages/message-hash/src/index.ts b/packages/message-hash/src/index.ts index 825143ca95..b22f17215a 100644 --- a/packages/message-hash/src/index.ts +++ b/packages/message-hash/src/index.ts @@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256"; import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; import { - bytesToUtf8, + bytesToHex, concat, numberToBytes, utf8ToBytes @@ -56,6 +56,6 @@ export function messageHashStr( message: IProtoMessage | IDecodedMessage ): string { const hash = messageHash(pubsubTopic, message); - const hashStr = bytesToUtf8(hash); + const hashStr = bytesToHex(hash); return hashStr; } diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 01cea6859f..7a4af02a30 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,5 +1,8 @@ export const DEFAULT_KEEP_ALIVE = 10_000; +export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; +export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; export const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE + keepAlive: DEFAULT_KEEP_ALIVE, + enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK }; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 6f5320812b..2d8f356150 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -6,6 +6,7 @@ import { type IDecodedMessage, type IDecoder, type IFilter, + type ILightPush, type Libp2p, NetworkConfig, type ProtocolCreateOptions, @@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter { public constructor( connectionManager: ConnectionManager, - libp2p: Libp2p, + private libp2p: Libp2p, + private lightPush?: ILightPush, options?: ProtocolCreateOptions ) { super( @@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter { this.protocol, this.connectionManager, () => this.connectedPeers, - this.renewPeer.bind(this) + this.renewPeer.bind(this), + this.libp2p, + this.lightPush ) ); @@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter { export function wakuFilter( connectionManager: ConnectionManager, + lightPush?: ILightPush, init?: ProtocolCreateOptions ): (libp2p: Libp2p) => IFilter { - return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init); + return (libp2p: Libp2p) => + new Filter(connectionManager, libp2p, lightPush, init); } diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 7087ee8e84..1dc2724ab5 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,6 +1,12 @@ import type { Peer } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, FilterCore } from "@waku/core"; +import { + ConnectionManager, + createDecoder, + createEncoder, + FilterCore, + LightPushCore +} from "@waku/core"; import { type Callback, type ContentTopic, @@ -8,8 +14,10 @@ import { EConnectionStateEvents, type IDecodedMessage, type IDecoder, + type ILightPush, type IProtoMessage, type ISubscription, + type Libp2p, type PeerIdStr, ProtocolError, type PubsubTopic, @@ -23,7 +31,12 @@ import { groupByContentTopic, Logger } from "@waku/utils"; import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; -import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { + DEFAULT_KEEP_ALIVE, + DEFAULT_LIGHT_PUSH_FILTER_CHECK, + DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL, + DEFAULT_SUBSCRIBE_OPTIONS +} from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); @@ -33,6 +46,8 @@ export class SubscriptionManager implements ISubscription { private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; private keepAliveInterval: ReturnType | null = null; + private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK; + private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -45,7 +60,9 @@ export class SubscriptionManager implements ISubscription { private readonly getPeers: () => Peer[], private readonly renewPeer: ( peerToDisconnect: PeerId - ) => Promise + ) => Promise, + private readonly libp2p: Libp2p, + private readonly lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -56,7 +73,8 @@ export class SubscriptionManager implements ISubscription { this.renewPeer.bind(this), () => Array.from(this.subscriptionCallbacks.keys()), this.protocol.subscribe.bind(this.protocol), - this.protocol.addLibp2pEventListener.bind(this.protocol) + this.protocol.addLibp2pEventListener.bind(this.protocol), + this.sendLightPushCheckMessage.bind(this) ); } @@ -65,11 +83,10 @@ export class SubscriptionManager implements ISubscription { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { - this.reliabilityMonitor.setMaxMissedMessagesThreshold( - options.maxMissedMessagesThreshold - ); this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; + this.enableLightPushFilterCheck = + options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -87,11 +104,20 @@ export class SubscriptionManager implements ISubscription { } } + if (this.enableLightPushFilterCheck) { + decodersArray.push( + createDecoder( + this.buildLightPushContentTopic(), + this.pubsubTopic + ) as IDecoder + ); + } + const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); const promises = this.getPeers().map(async (peer) => - this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) + this.subscribeWithPeerVerification(peer, contentTopics) ); const results = await Promise.allSettled(promises); @@ -109,6 +135,11 @@ export class SubscriptionManager implements ISubscription { callback } as unknown as SubscriptionCallback; + // don't handle case of internal content topic + if (contentTopic === this.buildLightPushContentTopic()) { + return; + } + // The callback and decoder may override previous values, this is on // purpose as the user may call `subscribe` to refresh the subscription this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); @@ -176,10 +207,9 @@ export class SubscriptionManager implements ISubscription { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const alreadyReceived = this.reliabilityMonitor.processIncomingMessage( - message, - this.pubsubTopic, - peerIdStr + const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived( + peerIdStr, + message as IProtoMessage ); if (alreadyReceived) { @@ -202,6 +232,19 @@ export class SubscriptionManager implements ISubscription { await pushMessage(subscriptionCallback, this.pubsubTopic, message); } + private async subscribeWithPeerVerification( + peer: Peer, + contentTopics: string[] + ): Promise { + const result = await this.protocol.subscribe( + this.pubsubTopic, + peer, + contentTopics + ); + await this.sendLightPushCheckMessage(peer); + return result; + } + private handleResult( results: PromiseSettledResult[], type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" @@ -324,6 +367,46 @@ export class SubscriptionManager implements ISubscription { clearInterval(this.keepAliveInterval); this.keepAliveInterval = null; } + + private async sendLightPushCheckMessage(peer: Peer): Promise { + if ( + this.lightPush && + this.libp2p && + this.reliabilityMonitor.shouldVerifyPeer(peer.id) + ) { + const encoder = createEncoder({ + contentTopic: this.buildLightPushContentTopic(), + pubsubTopic: this.pubsubTopic, + ephemeral: true + }); + + const message = { payload: new Uint8Array(1) }; + const protoMessage = await encoder.toProtoObj(message); + + // make a delay to be sure message is send when subscription is in place + setTimeout( + (async () => { + const result = await (this.lightPush!.protocol as LightPushCore).send( + encoder, + message, + peer + ); + this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage); + if (result.failure) { + log.error( + `failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}` + ); + return; + } + }) as () => void, + DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL + ); + } + } + + private buildLightPushContentTopic(): string { + return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; + } } async function pushMessage( diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index f75c5fdc92..f92d1fa866 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -24,7 +24,8 @@ export class ReliabilityMonitorManager { peer: Peer, contentTopics: ContentTopic[] ) => Promise, - addLibp2pEventListener: Libp2p["addEventListener"] + addLibp2pEventListener: Libp2p["addEventListener"], + sendLightPushMessage: (peer: Peer) => Promise ): ReceiverReliabilityMonitor { if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; @@ -36,7 +37,8 @@ export class ReliabilityMonitorManager { renewPeer, getContentTopics, protocolSubscribe, - addLibp2pEventListener + addLibp2pEventListener, + sendLightPushMessage ); ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); return monitor; @@ -50,7 +52,6 @@ export class ReliabilityMonitorManager { public static stopAll(): void { for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxMissedMessagesThreshold(undefined); monitor.setMaxPingFailures(undefined); this.receiverMonitors.delete(pubsubTopic); } diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 61d700817f..ea58c1b17c 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -8,24 +8,20 @@ import { PubsubTopic } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; -import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; - -type ReceivedMessageHashes = { - all: Set; - nodes: Record>; -}; - -const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; +import { bytesToUtf8 } from "@waku/utils/bytes"; const log = new Logger("sdk:receiver:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; +const MESSAGE_VERIFICATION_DELAY = 5_000; export class ReceiverReliabilityMonitor { - private receivedMessagesHashes: ReceivedMessageHashes; - private missedMessagesByPeer: Map = new Map(); - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private receivedMessagesFormPeer = new Set(); + private receivedMessages = new Set(); + private scheduledVerification = new Map(); + private verifiedPeers = new Set(); + private peerFailures: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; private peerRenewalLocks: Set = new Set(); @@ -40,18 +36,9 @@ export class ReceiverReliabilityMonitor { peer: Peer, contentTopics: ContentTopic[] ) => Promise, - private addLibp2pEventListener: Libp2p["addEventListener"] + private addLibp2pEventListener: Libp2p["addEventListener"], + private sendLightPushMessage: (peer: Peer) => Promise ) { - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - this.addLibp2pEventListener("peer:disconnect", (evt) => { const peerId = evt.detail; if (this.getPeers().some((p) => p.id.equals(peerId))) { @@ -60,13 +47,6 @@ export class ReceiverReliabilityMonitor { }); } - public setMaxMissedMessagesThreshold(value: number | undefined): void { - if (value === undefined) { - return; - } - this.maxMissedMessagesThreshold = value; - } - public setMaxPingFailures(value: number | undefined): void { if (value === undefined) { return; @@ -99,77 +79,79 @@ export class ReceiverReliabilityMonitor { } } - public processIncomingMessage( - message: WakuMessage, - pubsubTopic: PubsubTopic, - peerIdStr?: string + public notifyMessageReceived( + peerIdStr: string, + message: IProtoMessage ): boolean { - const alreadyReceived = this.addMessageToCache( - message, - pubsubTopic, - peerIdStr + const hash = this.buildMessageHash(message); + + this.verifiedPeers.add(peerIdStr); + this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`); + + log.info( + `notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}` ); - return alreadyReceived; + log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`); + + if (this.receivedMessages.has(hash)) { + return true; + } + + this.receivedMessages.add(hash); + + return false; } - private addMessageToCache( - message: WakuMessage, - pubsubTopic: PubsubTopic, - peerIdStr?: string - ): boolean { - const hashedMessageStr = messageHashStr( - pubsubTopic, - message as IProtoMessage - ); + public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void { + const peerIdStr = peerId.toString(); + const hash = this.buildMessageHash(message); + + log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`); + + if (this.scheduledVerification.has(peerIdStr)) { + log.warn( + `notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}` + ); + return; + } - const alreadyReceived = - this.receivedMessagesHashes.all.has(hashedMessageStr); - this.receivedMessagesHashes.all.add(hashedMessageStr); + const timeout = window.setTimeout( + (async () => { + const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); + const receivedTestMessage = this.receivedMessagesFormPeer.has( + `${peerIdStr}-${hash}` + ); + + if (receivedAnyMessage || receivedTestMessage) { + log.info( + `notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}` + ); + return; + } - if (peerIdStr) { - const hashesForPeer = this.receivedMessagesHashes.nodes[peerIdStr]; - if (!hashesForPeer) { log.warn( - `Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.` + `notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}` ); - this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); - } - this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); - } + this.scheduledVerification.delete(peerIdStr); + await this.renewAndSubscribePeer(peerId); + }) as () => void, + MESSAGE_VERIFICATION_DELAY + ); - return alreadyReceived; + this.scheduledVerification.set(peerIdStr, timeout); } - // @ts-expect-error Turned off until properly investigated: https://github.com/waku-org/js-waku/issues/2075 - private async checkAndRenewPeers(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } + public shouldVerifyPeer(peerId: PeerId): boolean { + const peerIdStr = peerId.toString(); + + const isPeerVerified = this.verifiedPeers.has(peerIdStr); + const isVerificationPending = this.scheduledVerification.has(peerIdStr); + + return !(isPeerVerified || isVerificationPending); + } + + private buildMessageHash(message: IProtoMessage): string { + return messageHashStr(this.pubsubTopic, message); } private async renewAndSubscribePeer( @@ -196,12 +178,9 @@ export class ReceiverReliabilityMonitor { this.getContentTopics() ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + await this.sendLightPushMessage(newPeer); this.peerFailures.delete(peerIdStr); - this.missedMessagesByPeer.delete(peerIdStr); - delete this.receivedMessagesHashes.nodes[peerIdStr]; return newPeer; } catch (error) { @@ -211,14 +190,4 @@ export class ReceiverReliabilityMonitor { this.peerRenewalLocks.delete(peerIdStr); } } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 1c450420cf..eea596d7a9 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -116,7 +116,11 @@ export class WakuNode implements IWaku { } if (protocolsEnabled.filter) { - const filter = wakuFilter(this.connectionManager, options); + const filter = wakuFilter( + this.connectionManager, + this.lightPush, + options + ); this.filter = filter(libp2p); }