diff --git a/libs/client/package.json b/libs/client/package.json index 4c50a30..e919801 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.6.0", + "version": "0.6.1", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/realtime.ts b/libs/client/src/realtime.ts index fd19d3d..f0ecf5f 100644 --- a/libs/client/src/realtime.ts +++ b/libs/client/src/realtime.ts @@ -120,6 +120,7 @@ const WebSocketErrorCodes = { const connectionManager = (() => { const connections = new Map(); const tokens = new Map(); + const isAuthInProgress = new Map(); return { token(app: string) { @@ -150,10 +151,23 @@ const connectionManager = (() => { remove(connectionKey: string) { connections.delete(connectionKey); }, + isAuthInProgress(app: string) { + return isAuthInProgress.has(app); + }, + setAuthInProgress(app: string, inProgress: boolean) { + if (inProgress) { + isAuthInProgress.set(app, true); + } else { + isAuthInProgress.delete(app); + } + }, }; })(); async function getConnection(app: string, key: string): Promise { + if (connectionManager.isAuthInProgress(app)) { + throw new Error('Authentication in progress'); + } const url = buildRealtimeUrl(app); if (connectionManager.has(key)) { @@ -161,7 +175,9 @@ async function getConnection(app: string, key: string): Promise { } let token = connectionManager.token(app); if (!token) { + connectionManager.setAuthInProgress(app, true); token = await connectionManager.refreshToken(app); + connectionManager.setAuthInProgress(app, false); } const ws = new WebSocket(`${url}?fal_jwt_token=${token}`); connectionManager.set(key, ws); @@ -203,7 +219,7 @@ export const realtimeImpl: RealtimeClient = { return NoOpConnection; } - const enqueueMessages: Input[] = []; + let pendingMessage: Input | undefined = undefined; let reconnecting = false; let ws: WebSocket | null = null; @@ -217,7 +233,7 @@ export const realtimeImpl: RealtimeClient = { }) ); } else { - enqueueMessages.push(input); + pendingMessage = input; if (!reconnecting) { reconnecting = true; reconnect(); @@ -231,14 +247,17 @@ export const realtimeImpl: RealtimeClient = { if (ws && ws.readyState === WebSocket.OPEN) { return; } + if (connectionManager.isAuthInProgress(app)) { + return; + } getConnection(app, connectionKey) .then((connection) => { ws = connection; ws.onopen = () => { reconnecting = false; - if (enqueueMessages.length > 0) { - enqueueMessages.forEach((input) => send(input)); - enqueueMessages.length = 0; + if (pendingMessage) { + send(pendingMessage); + pendingMessage = undefined; } }; ws.onclose = (event) => {