Skip to content

Commit

Permalink
fix: queue streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
drochetti committed Jul 27, 2024
1 parent 13ca3a1 commit 4c4055d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 36 deletions.
9 changes: 7 additions & 2 deletions apps/demo-nextjs-app-router/app/queue/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ function Error(props: ErrorProps) {
);
}

const DEFAULT_ENDPOINT_ID = 'fal-ai/fast-sdxl';
const DEFAULT_INPUT = `{
"prompt": "A beautiful sunset over the ocean"
}`;

export default function Home() {
// Input state
const [endpointId, setEndpointId] = useState<string>('');
const [input, setInput] = useState<string>('{}');
const [endpointId, setEndpointId] = useState<string>(DEFAULT_ENDPOINT_ID);
const [input, setInput] = useState<string>(DEFAULT_INPUT);
// Result state
const [loading, setLoading] = useState(false);
const [error, setError] = useState<Error | null>(null);
Expand Down
1 change: 1 addition & 0 deletions apps/demo-nextjs-app-router/app/streaming/audio/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export default function AudioStreamingDemo() {
text: prompt,
},
connectionMode: 'server',
accept: 'audio/*',
}
);
setStreamStatus('running');
Expand Down
2 changes: 1 addition & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
"version": "0.14.0-alpha.7",
"version": "0.14.0-alpha.8",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
43 changes: 31 additions & 12 deletions libs/client/src/function.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getTemporaryAuthToken } from './auth';
import { dispatchRequest } from './request';
import { storageImpl } from './storage';
import { FalStream } from './streaming';
import { FalStream, StreamingConnectionMode } from './streaming';
import {
CompletedQueueStatus,
EnqueueResult,
Expand Down Expand Up @@ -199,6 +198,12 @@ type QueueSubscribeOptions = {
}
| {
mode: 'streaming';

/**
* The connection mode to use for streaming updates. It defaults to `server`.
* Set to `client` if your server proxy doesn't support streaming.
*/
connectionMode?: StreamingConnectionMode;
}
);

Expand Down Expand Up @@ -228,6 +233,14 @@ type QueueStatusOptions = BaseQueueOptions & {
logs?: boolean;
};

type QueueStatusStreamOptions = QueueStatusOptions & {
/**
* The connection mode to use for streaming updates. It defaults to `server`.
* Set to `client` if your server proxy doesn't support streaming.
*/
connectionMode?: StreamingConnectionMode;
};

/**
* Represents a request queue with methods for submitting requests,
* checking their status, retrieving results, and subscribing to updates.
Expand Down Expand Up @@ -263,7 +276,7 @@ interface Queue {
*/
streamStatus(
endpointId: string,
options: QueueStatusOptions
options: QueueStatusStreamOptions
): Promise<FalStream<unknown, QueueStatus>>;

/**
Expand Down Expand Up @@ -340,24 +353,26 @@ export const queue: Queue = {

async streamStatus(
endpointId: string,
{ requestId, logs = false }: QueueStatusOptions
{ requestId, logs = false, connectionMode }: QueueStatusStreamOptions
): Promise<FalStream<unknown, QueueStatus>> {
const appId = parseAppId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : '';
const token = await getTemporaryAuthToken(endpointId);

const queryParams = {
logs: logs ? '1' : '0',
};

const url = buildUrl(`${prefix}${appId.owner}/${appId.alias}`, {
subdomain: 'queue',
path: `/requests/${requestId}/status/stream`,
query: queryParams,
});

const queryParams = new URLSearchParams({
fal_jwt_token: token,
logs: logs ? '1' : '0',
});

return new FalStream<unknown, QueueStatus>(`${url}?${queryParams}`, {
input: {},
return new FalStream<unknown, QueueStatus>(endpointId, {
url,
method: 'get',
connectionMode,
queryParams,
});
},

Expand All @@ -375,6 +390,10 @@ export const queue: Queue = {
const status = await queue.streamStatus(endpointId, {
requestId,
logs: options.logs,
connectionMode:
'connectionMode' in options
? (options.connectionMode as StreamingConnectionMode)
: undefined,
});
const logs: RequestLog[] = [];
if (timeout) {
Expand Down
42 changes: 21 additions & 21 deletions libs/client/src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ import { dispatchRequest } from './request';
import { ApiError, defaultResponseHandler } from './response';
import { storageImpl } from './storage';

type StreamingConnectionMode = 'client' | 'server';
export type StreamingConnectionMode = 'client' | 'server';

/**
* The stream API options. It requires the API input and also
* offers configuration options.
*/
type StreamOptions<Input> = {
/**
* The endpoint URL. If not provided, it will be generated from the
* `endpointId` and the `queryParams`.
*/
readonly url?: string;

/**
* The API input payload.
*/
Expand Down Expand Up @@ -51,7 +57,8 @@ type StreamOptions<Input> = {
* or through your own server, either when running on NodeJS or when
* using a proxy that supports streaming.
*
* It defaults to `client` when running in the browser or `server` otherwise.
* It defaults to `server`. Set to `client` if your server proxy doesn't
* support streaming.
*/
readonly connectionMode?: StreamingConnectionMode;
};
Expand Down Expand Up @@ -85,10 +92,12 @@ export class FalStream<Input, Output> {

constructor(endpointId: string, options: StreamOptions<Input>) {
this.endpointId = endpointId;
this.url = buildUrl(endpointId, {
path: '/stream',
query: options.queryParams,
});
this.url =
options.url ??
buildUrl(endpointId, {
path: '/stream',
query: options.queryParams,
});
this.options = options;
this.donePromise = new Promise<Output>((resolve, reject) => {
if (this.streamClosed) {
Expand All @@ -114,32 +123,24 @@ export class FalStream<Input, Output> {

private start = async () => {
const { endpointId, options } = this;
const {
input,
method = 'post',
connectionMode = typeof window === 'undefined' ? 'server' : 'client',
queryParams = {},
} = options;
const { input, method = 'post', connectionMode = 'server' } = options;
try {
if (connectionMode === 'client') {
// if we are in the browser, we need to get a temporary token
// to authenticate the request
const token = await getTemporaryAuthToken(endpointId);
const { fetch = global.fetch } = getConfig();
const url = buildUrl(endpointId, {
path: '/stream',
query: {
...queryParams,
fal_jwt_token: token,
},
});
const response = await fetch(url, {
const parsedUrl = new URL(this.url);
console.log(this.url);
parsedUrl.searchParams.set('fal_jwt_token', token);
const response = await fetch(parsedUrl.toString(), {
method: method.toUpperCase(),
headers: {
accept: options.accept ?? 'text/event-stream',
'content-type': 'application/json',
},
body: input && method !== 'get' ? JSON.stringify(input) : undefined,
signal: this.abortController.signal,
});
return await this.handleResponse(response);
}
Expand Down Expand Up @@ -182,7 +183,6 @@ export class FalStream<Input, Output> {

// any response that is not a text/event-stream will be handled as a binary stream
if (response.headers.get('content-type') !== 'text/event-stream') {
// pass the binary chunks to this.emit('data', chunk)
const reader = body.getReader();
const emitRawChunk = () => {
reader.read().then(({ done, value }) => {
Expand Down

0 comments on commit 4c4055d

Please sign in to comment.