From c6d40c30c81bd03c322c22d853464c3db0ba3d64 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 22 Nov 2024 13:04:42 +0200 Subject: [PATCH 1/5] fix: relayer to do only single connection attempt at a time --- packages/core/src/controllers/relayer.ts | 117 ++++++++++++-------- packages/core/src/controllers/subscriber.ts | 7 +- 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/packages/core/src/controllers/relayer.ts b/packages/core/src/controllers/relayer.ts index 09d0437c2..4237c24f6 100644 --- a/packages/core/src/controllers/relayer.ts +++ b/packages/core/src/controllers/relayer.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-console */ import { EventEmitter } from "events"; import { JsonRpcProvider } from "@walletconnect/jsonrpc-provider"; import { @@ -101,6 +102,7 @@ export class Relayer extends IRelayer { */ private heartBeatTimeout = toMiliseconds(THIRTY_SECONDS + ONE_SECOND); private reconnectTimeout: NodeJS.Timeout | undefined; + private connectPromise: Promise | undefined; constructor(opts: RelayerOptions) { super(opts); @@ -301,59 +303,29 @@ export class Relayer extends IRelayer { await this.transportDisconnect(); } - public async transportOpen(relayUrl?: string) { - await this.confirmOnlineStateOrThrow(); - if (relayUrl && relayUrl !== this.relayUrl) { - this.relayUrl = relayUrl; - await this.transportDisconnect(); - } - - // Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception - // It wont be able to reconnect - await this.createProvider(); - - this.connectionAttemptInProgress = true; - this.transportExplicitlyClosed = false; - try { - await new Promise(async (resolve, reject) => { - const onDisconnect = () => { - this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); - reject(new Error(`Connection interrupted while trying to subscribe`)); - }; - this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); - - await createExpiringPromise( - this.provider.connect(), - toMiliseconds(ONE_MINUTE), - `Socket stalled when trying to connect to ${this.relayUrl}`, - ) - .catch((e) => { - reject(e); - }) + async transportOpen(relayUrl?: string) { + const random = Math.floor(Math.random() * 1e3); + if (this.connectPromise) { + console.log(`Waiting for existing connection attempt to resolve... :${random}`); + await this.connectPromise; + } else { + this.connectPromise = new Promise(async (resolve, reject) => { + await this.connect(relayUrl) + .then(resolve) + .catch(reject) .finally(() => { - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = undefined; + this.connectPromise = undefined; }); - this.subscriber.start().catch((error) => { - this.logger.error(error); - this.onDisconnectHandler(); - }); - this.hasExperiencedNetworkDisruption = false; - resolve(); }); - } catch (e) { - this.logger.error(e); - const error = e as Error; - this.hasExperiencedNetworkDisruption = true; - if (!this.isConnectionStalled(error.message)) { - throw e; - } - } finally { - this.connectionAttemptInProgress = false; + await this.connectPromise; + } + if (!this.connected) { + throw new Error(`Couldn't establish socket connection to the relay server: ${this.relayUrl}`); } } public async restartTransport(relayUrl?: string) { + console.log("Restarting transport..."); if (this.connectionAttemptInProgress) return; this.relayUrl = relayUrl || this.relayUrl; await this.confirmOnlineStateOrThrow(); @@ -400,6 +372,59 @@ export class Relayer extends IRelayer { } // ---------- Private ----------------------------------------------- // + + private async connect(relayUrl?: string) { + await this.confirmOnlineStateOrThrow(); + if (relayUrl && relayUrl !== this.relayUrl) { + this.relayUrl = relayUrl; + await this.transportDisconnect(); + } + + // Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception + // It wont be able to reconnect + await this.createProvider(); + + this.connectionAttemptInProgress = true; + this.transportExplicitlyClosed = false; + try { + await new Promise(async (resolve, reject) => { + const onDisconnect = () => { + this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); + reject(new Error(`Connection interrupted while trying to subscribe`)); + }; + this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); + + await createExpiringPromise( + this.provider.connect(), + toMiliseconds(ONE_MINUTE), + `Socket stalled when trying to connect to ${this.relayUrl}`, + ) + .catch((e) => { + reject(e); + }) + .finally(() => { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = undefined; + }); + this.subscriber.start().catch((error) => { + this.logger.error(error); + this.onDisconnectHandler(); + }); + this.hasExperiencedNetworkDisruption = false; + resolve(); + }); + } catch (e) { + this.logger.error(e); + const error = e as Error; + this.hasExperiencedNetworkDisruption = true; + if (!this.isConnectionStalled(error.message)) { + throw e; + } + } finally { + this.connectionAttemptInProgress = false; + } + } + /* * In Node, we must detect when the connection is stalled and terminate it. * The logic is, if we don't receive ping from the relay within a certain time, we terminate the connection. diff --git a/packages/core/src/controllers/subscriber.ts b/packages/core/src/controllers/subscriber.ts index 9d002b915..d75026cf5 100644 --- a/packages/core/src/controllers/subscriber.ts +++ b/packages/core/src/controllers/subscriber.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-console */ import { EventEmitter } from "events"; import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat"; import { ErrorResponse, RequestArguments } from "@walletconnect/jsonrpc-types"; @@ -249,7 +250,10 @@ export class Subscriber extends ISubscriber { return subId; } const subscribe = createExpiringPromise( - this.relayer.request(request).catch((e) => this.logger.warn(e)), + this.relayer.request(request).catch((e) => { + console.log("error", e); + this.logger.warn(e); + }), this.subscribeTimeout, `Subscribing to ${topic} failed, please try again`, ); @@ -433,6 +437,7 @@ export class Subscriber extends ISubscriber { if (typeof persisted === "undefined") return; if (!persisted.length) return; if (this.subscriptions.size) { + console.log("RESTORE_WILL_OVERRIDE", persisted, this.subscriptions.values()); const { message } = getInternalError("RESTORE_WILL_OVERRIDE", this.name); this.logger.error(message); this.logger.error(`${this.name}: ${JSON.stringify(this.values)}`); From d919e6e249b68cd9cfbbaf54c18bfbe83540b858 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 22 Nov 2024 13:16:12 +0200 Subject: [PATCH 2/5] refactor: network drop test --- packages/core/test/relayer.spec.ts | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/packages/core/test/relayer.spec.ts b/packages/core/test/relayer.spec.ts index fe56659c2..a4cb5ab47 100644 --- a/packages/core/test/relayer.spec.ts +++ b/packages/core/test/relayer.spec.ts @@ -17,7 +17,7 @@ import { disconnectSocket, TEST_CORE_OPTIONS, throttle } from "./shared"; import { ICore, IRelayer, ISubscriber } from "@walletconnect/types"; import Sinon from "sinon"; import { JsonRpcRequest } from "@walletconnect/jsonrpc-utils"; -import { generateRandomBytes32, hashMessage } from "@walletconnect/utils"; +import { createExpiringPromise, generateRandomBytes32, hashMessage } from "@walletconnect/utils"; describe("Relayer", () => { const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger })); @@ -330,10 +330,26 @@ describe("Relayer", () => { it("should restart transport after connection drop", async () => { const randomSessionIdentifier = relayer.core.crypto.randomSessionIdentifier; - await relayer.provider.connection.close(); - expect(relayer.connected).to.be.false; - await throttle(1000); - expect(relayer.connected).to.be.true; + + const timeout = setTimeout(() => { + throw new Error("Connection did not restart after disconnect"); + }, 5_001); + await Promise.all([ + new Promise((resolve) => { + relayer.once(RELAYER_EVENTS.connect, () => { + expect(relayer.connected).to.be.true; + resolve(); + }); + }), + new Promise((resolve) => { + relayer.once(RELAYER_EVENTS.disconnect, () => { + expect(relayer.connected).to.be.false; + resolve(); + }); + }), + relayer.provider.connection.close(), + ]); + clearTimeout(timeout); // the identifier should be the same expect(relayer.core.crypto.randomSessionIdentifier).to.eq(randomSessionIdentifier); }); From 16e9b2692048287b0d8f697ae36e1fc22a16090f Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 22 Nov 2024 13:39:59 +0200 Subject: [PATCH 3/5] feat: adds 5 time retry to connect --- packages/core/src/controllers/relayer.ts | 75 ++++++++++++++---------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/packages/core/src/controllers/relayer.ts b/packages/core/src/controllers/relayer.ts index 4237c24f6..e4545a23d 100644 --- a/packages/core/src/controllers/relayer.ts +++ b/packages/core/src/controllers/relayer.ts @@ -386,42 +386,53 @@ export class Relayer extends IRelayer { this.connectionAttemptInProgress = true; this.transportExplicitlyClosed = false; - try { - await new Promise(async (resolve, reject) => { - const onDisconnect = () => { - this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); - reject(new Error(`Connection interrupted while trying to subscribe`)); - }; - this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); + let attempt = 1; - await createExpiringPromise( - this.provider.connect(), - toMiliseconds(ONE_MINUTE), - `Socket stalled when trying to connect to ${this.relayUrl}`, - ) - .catch((e) => { - reject(e); - }) - .finally(() => { - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = undefined; + while (attempt < 6) { + try { + await new Promise(async (resolve, reject) => { + const onDisconnect = () => { + this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); + reject(new Error(`Connection interrupted while trying to subscribe`)); + }; + this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect); + + await createExpiringPromise( + this.provider.connect(), + toMiliseconds(ONE_MINUTE), + `Socket stalled when trying to connect to ${this.relayUrl}`, + ) + .catch((e) => { + reject(e); + }) + .finally(() => { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = undefined; + }); + this.subscriber.start().catch((error) => { + this.logger.error(error); + this.onDisconnectHandler(); }); - this.subscriber.start().catch((error) => { - this.logger.error(error); - this.onDisconnectHandler(); + this.hasExperiencedNetworkDisruption = false; + resolve(); }); - this.hasExperiencedNetworkDisruption = false; - resolve(); - }); - } catch (e) { - this.logger.error(e); - const error = e as Error; - this.hasExperiencedNetworkDisruption = true; - if (!this.isConnectionStalled(error.message)) { - throw e; + } catch (e) { + this.logger.error(e); + const error = e as Error; + this.hasExperiencedNetworkDisruption = true; + if (!this.isConnectionStalled(error.message)) { + throw e; + } + } finally { + this.connectionAttemptInProgress = false; } - } finally { - this.connectionAttemptInProgress = false; + + if (this.connected) { + break; + } + + await new Promise((resolve) => setTimeout(resolve, toMiliseconds(attempt * 1))); + attempt++; } } From 9d50cfc074ffcc8c96d912cae7562be66ef9a4b4 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 22 Nov 2024 13:54:43 +0200 Subject: [PATCH 4/5] refactor: compare arrays before rejecting --- packages/core/src/controllers/relayer.ts | 24 ++++++++++++--------- packages/core/src/controllers/subscriber.ts | 6 +++++- packages/utils/src/misc.ts | 9 ++++++++ 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/packages/core/src/controllers/relayer.ts b/packages/core/src/controllers/relayer.ts index e4545a23d..a39d03fad 100644 --- a/packages/core/src/controllers/relayer.ts +++ b/packages/core/src/controllers/relayer.ts @@ -133,7 +133,7 @@ export class Relayer extends IRelayer { try { await this.transportOpen(); } catch (e) { - this.logger.warn(e); + this.logger.warn(e, (e as Error)?.message); } } } @@ -285,7 +285,7 @@ export class Relayer extends IRelayer { Array.from(this.requestsInFlight.values()).map((request) => request.promise), ); } catch (e) { - this.logger.warn(e); + this.logger.warn(e, (e as Error)?.message); } } @@ -349,7 +349,7 @@ export class Relayer extends IRelayer { try { await this.onMessageEvent(message); } catch (e) { - this.logger.warn(e); + this.logger.warn(e, (e as Error)?.message); } } this.logger.trace(`Batch of ${sortedMessages.length} message events processed`); @@ -410,14 +410,14 @@ export class Relayer extends IRelayer { this.reconnectTimeout = undefined; }); this.subscriber.start().catch((error) => { - this.logger.error(error); + this.logger.error(error, (error as Error)?.message); this.onDisconnectHandler(); }); this.hasExperiencedNetworkDisruption = false; resolve(); }); } catch (e) { - this.logger.error(e); + this.logger.error(e, (e as Error)?.message); const error = e as Error; this.hasExperiencedNetworkDisruption = true; if (!this.isConnectionStalled(error.message)) { @@ -455,7 +455,7 @@ export class Relayer extends IRelayer { } this.resetPingTimeout(); } catch (e) { - this.logger.warn(e); + this.logger.warn(e, (e as Error)?.message); } } @@ -468,7 +468,7 @@ export class Relayer extends IRelayer { this.provider?.connection?.socket?.terminate(); }, this.heartBeatTimeout); } catch (e) { - this.logger.warn(e); + this.logger.warn(e, (e as Error)?.message); } }; @@ -582,7 +582,7 @@ export class Relayer extends IRelayer { }; private onProviderErrorHandler = (error: Error) => { - this.logger.error(error); + this.logger.error(error, (error as Error)?.message); this.events.emit(RELAYER_EVENTS.error, error); // close the transport when a fatal error is received as there's no way to recover from it // usual cases are missing/invalid projectId, expired jwt token, invalid origin etc @@ -618,7 +618,9 @@ export class Relayer extends IRelayer { await this.transportDisconnect(); this.transportExplicitlyClosed = false; } else { - await this.restartTransport().catch((error) => this.logger.error(error)); + await this.restartTransport().catch((error) => + this.logger.error(error, (error as Error)?.message), + ); } }); } @@ -632,7 +634,9 @@ export class Relayer extends IRelayer { if (this.transportExplicitlyClosed) return; if (this.reconnectTimeout) return; this.reconnectTimeout = setTimeout(async () => { - await this.transportOpen().catch((error) => this.logger.error(error)); + await this.transportOpen().catch((error) => + this.logger.error(error, (error as Error)?.message), + ); }, toMiliseconds(RELAYER_RECONNECT_TIMEOUT)); } diff --git a/packages/core/src/controllers/subscriber.ts b/packages/core/src/controllers/subscriber.ts index d75026cf5..637b7be89 100644 --- a/packages/core/src/controllers/subscriber.ts +++ b/packages/core/src/controllers/subscriber.ts @@ -20,6 +20,7 @@ import { createExpiringPromise, hashMessage, isValidArray, + areArraysEqual, } from "@walletconnect/utils"; import { CORE_STORAGE_PREFIX, @@ -436,7 +437,10 @@ export class Subscriber extends ISubscriber { const persisted = await this.getRelayerSubscriptions(); if (typeof persisted === "undefined") return; if (!persisted.length) return; - if (this.subscriptions.size) { + if ( + this.subscriptions.size && + !areArraysEqual(persisted, Array.from(this.subscriptions.values())) + ) { console.log("RESTORE_WILL_OVERRIDE", persisted, this.subscriptions.values()); const { message } = getInternalError("RESTORE_WILL_OVERRIDE", this.name); this.logger.error(message); diff --git a/packages/utils/src/misc.ts b/packages/utils/src/misc.ts index 11940c7f3..201720c3d 100644 --- a/packages/utils/src/misc.ts +++ b/packages/utils/src/misc.ts @@ -355,6 +355,15 @@ export function mergeArrays(a: T[] = [], b: T[] = []): T[] { return [...new Set([...a, ...b])]; } +export function areArraysEqual(arr1: T[], arr2: T[]): boolean { + try { + if (arr1.length !== arr2.length) return false; + return arr1.every((value, index) => value === arr2[index]); + } catch (err) { + return false; + } +} + export async function handleDeeplinkRedirect({ id, topic, From f1849ae5fa1a8feb6b33bc5201df5d8ec74aba66 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 22 Nov 2024 14:00:28 +0200 Subject: [PATCH 5/5] chore: prettier --- packages/utils/src/misc.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/misc.ts b/packages/utils/src/misc.ts index 201720c3d..797645d88 100644 --- a/packages/utils/src/misc.ts +++ b/packages/utils/src/misc.ts @@ -357,8 +357,8 @@ export function mergeArrays(a: T[] = [], b: T[] = []): T[] { export function areArraysEqual(arr1: T[], arr2: T[]): boolean { try { - if (arr1.length !== arr2.length) return false; - return arr1.every((value, index) => value === arr2[index]); + if (arr1.length !== arr2.length) return false; + return arr1.every((value, index) => value === arr2[index]); } catch (err) { return false; }