Skip to content

Commit

Permalink
fix: reconnect token state
Browse files Browse the repository at this point in the history
  • Loading branch information
drochetti committed Nov 29, 2023
1 parent 145159a commit 175bb11
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
2 changes: 1 addition & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
"version": "0.6.0",
"version": "0.6.1",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
29 changes: 24 additions & 5 deletions libs/client/src/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ const WebSocketErrorCodes = {
const connectionManager = (() => {
const connections = new Map<string, WebSocket>();
const tokens = new Map<string, string>();
const isAuthInProgress = new Map<string, true>();

return {
token(app: string) {
Expand Down Expand Up @@ -150,18 +151,33 @@ const connectionManager = (() => {
remove(connectionKey: string) {
connections.delete(connectionKey);
},
isAuthInProgress(app: string) {
return isAuthInProgress.has(app);
},
setAuthInProgress(app: string, inProgress: boolean) {
if (inProgress) {
isAuthInProgress.set(app, true);
} else {
isAuthInProgress.delete(app);
}
},
};
})();

async function getConnection(app: string, key: string): Promise<WebSocket> {
if (connectionManager.isAuthInProgress(app)) {
throw new Error('Authentication in progress');
}
const url = buildRealtimeUrl(app);

if (connectionManager.has(key)) {
return connectionManager.get(key) as WebSocket;
}
let token = connectionManager.token(app);
if (!token) {
connectionManager.setAuthInProgress(app, true);
token = await connectionManager.refreshToken(app);
connectionManager.setAuthInProgress(app, false);
}
const ws = new WebSocket(`${url}?fal_jwt_token=${token}`);
connectionManager.set(key, ws);
Expand Down Expand Up @@ -203,7 +219,7 @@ export const realtimeImpl: RealtimeClient = {
return NoOpConnection;
}

const enqueueMessages: Input[] = [];
let pendingMessage: Input | undefined = undefined;

let reconnecting = false;
let ws: WebSocket | null = null;
Expand All @@ -217,7 +233,7 @@ export const realtimeImpl: RealtimeClient = {
})
);
} else {
enqueueMessages.push(input);
pendingMessage = input;
if (!reconnecting) {
reconnecting = true;
reconnect();
Expand All @@ -231,14 +247,17 @@ export const realtimeImpl: RealtimeClient = {
if (ws && ws.readyState === WebSocket.OPEN) {
return;
}
if (connectionManager.isAuthInProgress(app)) {
return;
}
getConnection(app, connectionKey)
.then((connection) => {
ws = connection;
ws.onopen = () => {
reconnecting = false;
if (enqueueMessages.length > 0) {
enqueueMessages.forEach((input) => send(input));
enqueueMessages.length = 0;
if (pendingMessage) {
send(pendingMessage);
pendingMessage = undefined;
}
};
ws.onclose = (event) => {
Expand Down

0 comments on commit 175bb11

Please sign in to comment.