Skip to content

Commit

Permalink
make client auto reconnect to websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
lostbean committed Jan 5, 2024
1 parent 75667b0 commit 6ae00c0
Showing 1 changed file with 71 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export type MessageResponse<T> =
export function createWSClient<Paths extends {}>(
clientOptions?: ClientOptions,
): {
WS: <P extends PathsWithMethod<Paths, "get">, T extends keyof Paths[P]>(
WS: <P extends PathsWithMethod<Paths, "get">>(
url: P,
...init: ParamsType<Paths, P>[]
) => AsyncGenerator<MessageResponse<ReturnType<Paths, P>>>;
Expand All @@ -62,7 +62,7 @@ export function createWSClient<Paths extends {}>(
return {
/** Call a WS endpoint */
WS: async function* (url, fetchOptions) {
const { params = {}, querySerializer = defaultQuerySerializer, abortSignal, ...init } = fetchOptions || {};
const { params = {}, querySerializer = defaultQuerySerializer, abortSignal } = fetchOptions || {};

// build full URL
const finalURL = createFinalURL(url.toString(), {
Expand All @@ -72,17 +72,25 @@ export function createWSClient<Paths extends {}>(
});

var socket: WebSocket;
try {
socket = new WebSocket(finalURL);
} catch (error) {
return { error: {}, data: null };
var isWSPaused = false;
var controller = new AbortController();

// Create and wait for the WebSocket connection to be open
async function wsConnect(): Promise<WebSocket> {
return new Promise((resolve, reject) => {
try {
let newSocket = new WebSocket(finalURL);
newSocket.addEventListener("open", () => resolve(newSocket));
} catch (error) {
reject(error);
}
});
}

// Wait for the WebSocket connection to be open
await new Promise((resolve) => {
socket.addEventListener("open", resolve);
});
// Start the connection for the first time
socket = await wsConnect();

// Handle client side request to abort (via abort signal)
if (abortSignal) {
// already aborted, fail immediately
if (abortSignal.aborted) {
Expand All @@ -97,21 +105,68 @@ export function createWSClient<Paths extends {}>(
});
}

try {
while (socket.readyState === WebSocket.OPEN) {
// Wait for the next message
const message: MessageEvent<any> = await new Promise((resolve) => {
socket.addEventListener("message", (event: MessageEvent<any>) => resolve(event));
let reconnectHandler = async () => {
if (!abortSignal?.aborted) {
console.warn("Websocket connection unexpectedly closed, reconnecting");
await wsConnect().then((ws) => {
socket = ws;
});
controller.abort();
controller = new AbortController();
isWSPaused = false;
}
};

// reconnect on unexpected close (i.e. abortSignal not aborted)
socket.addEventListener("close", reconnectHandler);

// // Pause and resume WS connection when
// document.addEventListener("visibilitychange", async () => {
// if (document.hidden) {
// console.debug("Lost focus on web UI. Closing Websocket connection to save resources, resuming later");
// isWSPaused = true;
// socket.close();
// } else {
// console.debug("Resuming Websocket connection");
// reconnectHandler();
// }
// });

// Abortable message listener used to exit the message promise in case the connection
// in broken and we replace with a new connection
function waitForMsg(signal: AbortSignal): Promise<MessageEvent<any>> {
if (signal.aborted) {
return Promise.reject("Signal already aborted");
}
return new Promise((resolve, reject) => {
socket.addEventListener("message", (event: MessageEvent<any>) => resolve(event));
// Listen for abort event on signal
signal.addEventListener("abort", () => {
reject("Received signal to aborted");
});
});
}

// the async loop generator
try {
let message: MessageEvent<any> | undefined;
// the isWSPaused keep the loop active even if the connection got closed. The loop
// only exit if it's not paused and the connection is closed.
while (isWSPaused || socket.readyState === WebSocket.OPEN) {
// Wait for the next message and skip if gets an undefined message (i.e. aborted message await)
message = await waitForMsg(controller.signal).catch((_) => undefined);
if (message === undefined) {
continue;
}
// Yield the received message
yield { error: undefined, data: JSON.parse(message.data), message: message };
}
} catch (error) {
console.error(`Received an unexpected message from the channel on ${finalURL}:`);
console.error(error);
} finally {
// Close the WebSocket connection when the generator is done
// Final close. But let's remove the reconnection handler first
socket.removeEventListener("close", reconnectHandler);
socket.close();
}
},
Expand Down

0 comments on commit 6ae00c0

Please sign in to comment.