Skip to content

Commit

Permalink
feat: move subscribe to top-level
Browse files Browse the repository at this point in the history
  • Loading branch information
drochetti committed Oct 4, 2023
1 parent c2ebc7b commit 813e3de
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 43 deletions.
87 changes: 46 additions & 41 deletions libs/client/src/function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,50 @@ export async function run<Input, Output>(
return await responseHandler(response);
}

/**
* Subscribes to updates for a specific request in the queue.
*
* @param id - The ID or URL of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @returns A promise that resolves to the result of the request once it's completed.
*/
export async function subscribe<Input, Output>(
id: string,
options: RunOptions<Input> & QueueSubscribeOptions = {}
): Promise<Output> {
const { request_id: requestId } = await queue.submit(id, options);
if (options.onEnqueue) {
options.onEnqueue(requestId);
}
return new Promise<Output>((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout>;
const pollInterval = options.pollInterval ?? 1000;
const poll = async () => {
try {
const requestStatus = await queue.status(id, requestId);
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
}
if (requestStatus.status === 'COMPLETED') {
clearTimeout(timeoutId);
try {
const result = await queue.result<Output>(id, requestId);
resolve(result);
} catch (error) {
reject(error);
}
return;
}
timeoutId = setTimeout(poll, pollInterval);
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
};
poll().catch(reject);
});
}

/**
* Options for subscribing to the request queue.
*/
Expand Down Expand Up @@ -168,11 +212,7 @@ interface Queue {
result<Output>(id: string, requestId: string): Promise<Output>;

/**
* Subscribes to updates for a specific request in the queue.
*
* @param id - The ID or URL of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @returns A promise that resolves to the result of the request once it's completed.
* @deprecated Use `fal.subscribe` instead.
*/
subscribe<Input, Output>(
id: string,
Expand Down Expand Up @@ -204,40 +244,5 @@ export const queue: Queue = {
path: `/fal/queue/requests/${requestId}/response`,
});
},
async subscribe<Input, Output>(
id: string,
options: RunOptions<Input> & QueueSubscribeOptions = {}
): Promise<Output> {
const { request_id: requestId } = await queue.submit(id, options);
if (options.onEnqueue) {
options.onEnqueue(requestId);
}
return new Promise<Output>((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout>;
const pollInterval = options.pollInterval ?? 1000;
const poll = async () => {
try {
const requestStatus = await queue.status(id, requestId);
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
}
if (requestStatus.status === 'COMPLETED') {
clearTimeout(timeoutId);
try {
const result = await queue.result<Output>(id, requestId);
resolve(result);
} catch (error) {
reject(error);
}
return;
}
timeoutId = setTimeout(poll, pollInterval);
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
};
poll().catch(reject);
});
},
subscribe,
};
2 changes: 1 addition & 1 deletion libs/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export { config, getConfig } from './config';
export { queue, run } from './function';
export { queue, run, subscribe } from './function';
export { withMiddleware } from './middleware';
export { ApiError, ValidationError } from './response';
export type { RequestMiddleware } from './middleware';
Expand Down
2 changes: 1 addition & 1 deletion libs/client/src/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export async function defaultResponseHandler<Output>(
body,
});
}
throw new Error(`HTTP ${status}: ${statusText}`);
throw new ApiError({ message: `HTTP ${status}: ${statusText}`, status });
}
if (contentType?.includes('application/json')) {
return response.json() as Promise<Output>;
Expand Down

0 comments on commit 813e3de

Please sign in to comment.