Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: single connection attempt at a time #5515

Draft
wants to merge 5 commits into
base: v2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 94 additions & 54 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import { EventEmitter } from "events";
import { JsonRpcProvider } from "@walletconnect/jsonrpc-provider";
import {
Expand Down Expand Up @@ -101,6 +102,7 @@ export class Relayer extends IRelayer {
*/
private heartBeatTimeout = toMiliseconds(THIRTY_SECONDS + ONE_SECOND);
private reconnectTimeout: NodeJS.Timeout | undefined;
private connectPromise: Promise<void> | undefined;

constructor(opts: RelayerOptions) {
super(opts);
Expand Down Expand Up @@ -131,7 +133,7 @@ export class Relayer extends IRelayer {
try {
await this.transportOpen();
} catch (e) {
this.logger.warn(e);
this.logger.warn(e, (e as Error)?.message);
}
}
}
Expand Down Expand Up @@ -283,7 +285,7 @@ export class Relayer extends IRelayer {
Array.from(this.requestsInFlight.values()).map((request) => request.promise),
);
} catch (e) {
this.logger.warn(e);
this.logger.warn(e, (e as Error)?.message);
}
}

Expand All @@ -301,59 +303,29 @@ export class Relayer extends IRelayer {
await this.transportDisconnect();
}

public async transportOpen(relayUrl?: string) {
await this.confirmOnlineStateOrThrow();
if (relayUrl && relayUrl !== this.relayUrl) {
this.relayUrl = relayUrl;
await this.transportDisconnect();
}

// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
await this.createProvider();

this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
try {
await new Promise<void>(async (resolve, reject) => {
const onDisconnect = () => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
reject(new Error(`Connection interrupted while trying to subscribe`));
};
this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);

await createExpiringPromise(
this.provider.connect(),
toMiliseconds(ONE_MINUTE),
`Socket stalled when trying to connect to ${this.relayUrl}`,
)
.catch((e) => {
reject(e);
})
async transportOpen(relayUrl?: string) {
const random = Math.floor(Math.random() * 1e3);
if (this.connectPromise) {
console.log(`Waiting for existing connection attempt to resolve... :${random}`);
await this.connectPromise;
} else {
this.connectPromise = new Promise(async (resolve, reject) => {
await this.connect(relayUrl)
.then(resolve)
.catch(reject)
.finally(() => {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
this.connectPromise = undefined;
});
this.subscriber.start().catch((error) => {
this.logger.error(error);
this.onDisconnectHandler();
});
this.hasExperiencedNetworkDisruption = false;
resolve();
});
} catch (e) {
this.logger.error(e);
const error = e as Error;
this.hasExperiencedNetworkDisruption = true;
if (!this.isConnectionStalled(error.message)) {
throw e;
}
} finally {
this.connectionAttemptInProgress = false;
await this.connectPromise;
}
if (!this.connected) {
throw new Error(`Couldn't establish socket connection to the relay server: ${this.relayUrl}`);
}
}

public async restartTransport(relayUrl?: string) {
console.log("Restarting transport...");
if (this.connectionAttemptInProgress) return;
this.relayUrl = relayUrl || this.relayUrl;
await this.confirmOnlineStateOrThrow();
Expand All @@ -377,7 +349,7 @@ export class Relayer extends IRelayer {
try {
await this.onMessageEvent(message);
} catch (e) {
this.logger.warn(e);
this.logger.warn(e, (e as Error)?.message);
}
}
this.logger.trace(`Batch of ${sortedMessages.length} message events processed`);
Expand All @@ -400,6 +372,70 @@ export class Relayer extends IRelayer {
}

// ---------- Private ----------------------------------------------- //

private async connect(relayUrl?: string) {
await this.confirmOnlineStateOrThrow();
if (relayUrl && relayUrl !== this.relayUrl) {
this.relayUrl = relayUrl;
await this.transportDisconnect();
}

// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
await this.createProvider();

this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
let attempt = 1;

while (attempt < 6) {
try {
await new Promise<void>(async (resolve, reject) => {
const onDisconnect = () => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
reject(new Error(`Connection interrupted while trying to subscribe`));
};
this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);

await createExpiringPromise(
this.provider.connect(),
toMiliseconds(ONE_MINUTE),
`Socket stalled when trying to connect to ${this.relayUrl}`,
)
.catch((e) => {
reject(e);
})
.finally(() => {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
});
this.subscriber.start().catch((error) => {
this.logger.error(error, (error as Error)?.message);
this.onDisconnectHandler();
});
this.hasExperiencedNetworkDisruption = false;
resolve();
});
} catch (e) {
this.logger.error(e, (e as Error)?.message);
const error = e as Error;
this.hasExperiencedNetworkDisruption = true;
if (!this.isConnectionStalled(error.message)) {
throw e;
}
} finally {
this.connectionAttemptInProgress = false;
}

if (this.connected) {
break;
}

await new Promise((resolve) => setTimeout(resolve, toMiliseconds(attempt * 1)));
attempt++;
}
}

/*
* In Node, we must detect when the connection is stalled and terminate it.
* The logic is, if we don't receive ping from the relay within a certain time, we terminate the connection.
Expand All @@ -419,7 +455,7 @@ export class Relayer extends IRelayer {
}
this.resetPingTimeout();
} catch (e) {
this.logger.warn(e);
this.logger.warn(e, (e as Error)?.message);
}
}

Expand All @@ -432,7 +468,7 @@ export class Relayer extends IRelayer {
this.provider?.connection?.socket?.terminate();
}, this.heartBeatTimeout);
} catch (e) {
this.logger.warn(e);
this.logger.warn(e, (e as Error)?.message);
}
};

Expand Down Expand Up @@ -546,7 +582,7 @@ export class Relayer extends IRelayer {
};

private onProviderErrorHandler = (error: Error) => {
this.logger.error(error);
this.logger.error(error, (error as Error)?.message);
this.events.emit(RELAYER_EVENTS.error, error);
// close the transport when a fatal error is received as there's no way to recover from it
// usual cases are missing/invalid projectId, expired jwt token, invalid origin etc
Expand Down Expand Up @@ -582,7 +618,9 @@ export class Relayer extends IRelayer {
await this.transportDisconnect();
this.transportExplicitlyClosed = false;
} else {
await this.restartTransport().catch((error) => this.logger.error(error));
await this.restartTransport().catch((error) =>
this.logger.error(error, (error as Error)?.message),
);
}
});
}
Expand All @@ -596,7 +634,9 @@ export class Relayer extends IRelayer {
if (this.transportExplicitlyClosed) return;
if (this.reconnectTimeout) return;
this.reconnectTimeout = setTimeout(async () => {
await this.transportOpen().catch((error) => this.logger.error(error));
await this.transportOpen().catch((error) =>
this.logger.error(error, (error as Error)?.message),
);
}, toMiliseconds(RELAYER_RECONNECT_TIMEOUT));
}

Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import { EventEmitter } from "events";
import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat";
import { ErrorResponse, RequestArguments } from "@walletconnect/jsonrpc-types";
Expand All @@ -19,6 +20,7 @@
createExpiringPromise,
hashMessage,
isValidArray,
areArraysEqual,
} from "@walletconnect/utils";
import {
CORE_STORAGE_PREFIX,
Expand Down Expand Up @@ -249,7 +251,10 @@
return subId;
}
const subscribe = createExpiringPromise(
this.relayer.request(request).catch((e) => this.logger.warn(e)),
this.relayer.request(request).catch((e) => {
console.log("error", e);
this.logger.warn(e);
}),
this.subscribeTimeout,
`Subscribing to ${topic} failed, please try again`,
);
Expand All @@ -272,7 +277,7 @@
private async rpcBatchSubscribe(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay!.protocol);

Check warning on line 280 in packages/core/src/controllers/subscriber.ts

View workflow job for this annotation

GitHub Actions / code_style (lint)

Forbidden non-null assertion
const request: RequestArguments<RelayJsonRpc.BatchSubscribeParams> = {
method: api.batchSubscribe,
params: {
Expand All @@ -295,7 +300,7 @@
private async rpcBatchFetchMessages(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay!.protocol);

Check warning on line 303 in packages/core/src/controllers/subscriber.ts

View workflow job for this annotation

GitHub Actions / code_style (lint)

Forbidden non-null assertion
const request: RequestArguments<RelayJsonRpc.BatchFetchMessagesParams> = {
method: api.batchFetchMessages,
params: {
Expand Down Expand Up @@ -432,7 +437,11 @@
const persisted = await this.getRelayerSubscriptions();
if (typeof persisted === "undefined") return;
if (!persisted.length) return;
if (this.subscriptions.size) {
if (
this.subscriptions.size &&
!areArraysEqual(persisted, Array.from(this.subscriptions.values()))
) {
console.log("RESTORE_WILL_OVERRIDE", persisted, this.subscriptions.values());
const { message } = getInternalError("RESTORE_WILL_OVERRIDE", this.name);
this.logger.error(message);
this.logger.error(`${this.name}: ${JSON.stringify(this.values)}`);
Expand Down
26 changes: 21 additions & 5 deletions packages/core/test/relayer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { disconnectSocket, TEST_CORE_OPTIONS, throttle } from "./shared";
import { ICore, IRelayer, ISubscriber } from "@walletconnect/types";
import Sinon from "sinon";
import { JsonRpcRequest } from "@walletconnect/jsonrpc-utils";
import { generateRandomBytes32, hashMessage } from "@walletconnect/utils";
import { createExpiringPromise, generateRandomBytes32, hashMessage } from "@walletconnect/utils";

describe("Relayer", () => {
const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger }));
Expand Down Expand Up @@ -330,10 +330,26 @@ describe("Relayer", () => {

it("should restart transport after connection drop", async () => {
const randomSessionIdentifier = relayer.core.crypto.randomSessionIdentifier;
await relayer.provider.connection.close();
expect(relayer.connected).to.be.false;
await throttle(1000);
expect(relayer.connected).to.be.true;

const timeout = setTimeout(() => {
throw new Error("Connection did not restart after disconnect");
}, 5_001);
await Promise.all([
new Promise<void>((resolve) => {
relayer.once(RELAYER_EVENTS.connect, () => {
expect(relayer.connected).to.be.true;
resolve();
});
}),
new Promise<void>((resolve) => {
relayer.once(RELAYER_EVENTS.disconnect, () => {
expect(relayer.connected).to.be.false;
resolve();
});
}),
relayer.provider.connection.close(),
]);
clearTimeout(timeout);
// the identifier should be the same
expect(relayer.core.crypto.randomSessionIdentifier).to.eq(randomSessionIdentifier);
});
Expand Down
9 changes: 9 additions & 0 deletions packages/utils/src/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ export function mergeArrays<T>(a: T[] = [], b: T[] = []): T[] {
return [...new Set([...a, ...b])];
}

export function areArraysEqual<T>(arr1: T[], arr2: T[]): boolean {
try {
if (arr1.length !== arr2.length) return false;
return arr1.every((value, index) => value === arr2[index]);
} catch (err) {
return false;
}
}

export async function handleDeeplinkRedirect({
id,
topic,
Expand Down
Loading