Skip to content

Commit

Permalink
fix: relayer to do only single connection attempt at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
ganchoradkov committed Nov 22, 2024
1 parent 56080bb commit c6d40c3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 47 deletions.
117 changes: 71 additions & 46 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import { EventEmitter } from "events";
import { JsonRpcProvider } from "@walletconnect/jsonrpc-provider";
import {
Expand Down Expand Up @@ -101,6 +102,7 @@ export class Relayer extends IRelayer {
*/
private heartBeatTimeout = toMiliseconds(THIRTY_SECONDS + ONE_SECOND);
private reconnectTimeout: NodeJS.Timeout | undefined;
private connectPromise: Promise<void> | undefined;

constructor(opts: RelayerOptions) {
super(opts);
Expand Down Expand Up @@ -301,59 +303,29 @@ export class Relayer extends IRelayer {
await this.transportDisconnect();
}

public async transportOpen(relayUrl?: string) {
await this.confirmOnlineStateOrThrow();
if (relayUrl && relayUrl !== this.relayUrl) {
this.relayUrl = relayUrl;
await this.transportDisconnect();
}

// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
await this.createProvider();

this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
try {
await new Promise<void>(async (resolve, reject) => {
const onDisconnect = () => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
reject(new Error(`Connection interrupted while trying to subscribe`));
};
this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);

await createExpiringPromise(
this.provider.connect(),
toMiliseconds(ONE_MINUTE),
`Socket stalled when trying to connect to ${this.relayUrl}`,
)
.catch((e) => {
reject(e);
})
async transportOpen(relayUrl?: string) {
const random = Math.floor(Math.random() * 1e3);
if (this.connectPromise) {
console.log(`Waiting for existing connection attempt to resolve... :${random}`);
await this.connectPromise;
} else {
this.connectPromise = new Promise(async (resolve, reject) => {
await this.connect(relayUrl)
.then(resolve)
.catch(reject)
.finally(() => {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
this.connectPromise = undefined;
});
this.subscriber.start().catch((error) => {
this.logger.error(error);
this.onDisconnectHandler();
});
this.hasExperiencedNetworkDisruption = false;
resolve();
});
} catch (e) {
this.logger.error(e);
const error = e as Error;
this.hasExperiencedNetworkDisruption = true;
if (!this.isConnectionStalled(error.message)) {
throw e;
}
} finally {
this.connectionAttemptInProgress = false;
await this.connectPromise;
}
if (!this.connected) {
throw new Error(`Couldn't establish socket connection to the relay server: ${this.relayUrl}`);
}
}

public async restartTransport(relayUrl?: string) {
console.log("Restarting transport...");
if (this.connectionAttemptInProgress) return;
this.relayUrl = relayUrl || this.relayUrl;
await this.confirmOnlineStateOrThrow();
Expand Down Expand Up @@ -400,6 +372,59 @@ export class Relayer extends IRelayer {
}

// ---------- Private ----------------------------------------------- //

private async connect(relayUrl?: string) {
await this.confirmOnlineStateOrThrow();
if (relayUrl && relayUrl !== this.relayUrl) {
this.relayUrl = relayUrl;
await this.transportDisconnect();
}

// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
await this.createProvider();

this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
try {
await new Promise<void>(async (resolve, reject) => {
const onDisconnect = () => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
reject(new Error(`Connection interrupted while trying to subscribe`));
};
this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);

await createExpiringPromise(
this.provider.connect(),
toMiliseconds(ONE_MINUTE),
`Socket stalled when trying to connect to ${this.relayUrl}`,
)
.catch((e) => {
reject(e);
})
.finally(() => {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
});
this.subscriber.start().catch((error) => {
this.logger.error(error);
this.onDisconnectHandler();
});
this.hasExperiencedNetworkDisruption = false;
resolve();
});
} catch (e) {
this.logger.error(e);
const error = e as Error;
this.hasExperiencedNetworkDisruption = true;
if (!this.isConnectionStalled(error.message)) {
throw e;
}
} finally {
this.connectionAttemptInProgress = false;
}
}

/*
* In Node, we must detect when the connection is stalled and terminate it.
* The logic is, if we don't receive ping from the relay within a certain time, we terminate the connection.
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import { EventEmitter } from "events";
import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat";
import { ErrorResponse, RequestArguments } from "@walletconnect/jsonrpc-types";
Expand Down Expand Up @@ -249,7 +250,10 @@ export class Subscriber extends ISubscriber {
return subId;
}
const subscribe = createExpiringPromise(
this.relayer.request(request).catch((e) => this.logger.warn(e)),
this.relayer.request(request).catch((e) => {
console.log("error", e);
this.logger.warn(e);
}),
this.subscribeTimeout,
`Subscribing to ${topic} failed, please try again`,
);
Expand Down Expand Up @@ -433,6 +437,7 @@ export class Subscriber extends ISubscriber {
if (typeof persisted === "undefined") return;
if (!persisted.length) return;
if (this.subscriptions.size) {
console.log("RESTORE_WILL_OVERRIDE", persisted, this.subscriptions.values());
const { message } = getInternalError("RESTORE_WILL_OVERRIDE", this.name);
this.logger.error(message);
this.logger.error(`${this.name}: ${JSON.stringify(this.values)}`);
Expand Down

0 comments on commit c6d40c3

Please sign in to comment.