Skip to content

Commit

Permalink
Update ThreadAbortSignal API to use classes. (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemonmade authored Aug 16, 2024
1 parent 5f11163 commit ecd7322
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 139 deletions.
49 changes: 49 additions & 0 deletions .changeset/orange-rockets-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
'@quilted/threads': major
'@quilted/quilt': patch
---

Changed `ThreadAbortSignal` utilities to be class-based instead of being a collection of utility functions. This change aligns the API more closely with `AbortController` in the browser, which is created with `new AbortController()`.

Previously, you used `createThreadAbortSignal()` to serialize an `AbortSignal` to pass over a thread, and `acceptThreadAbortSignal()` to turn it into a “live” `AbortSignal`. With the new API, you will do the same steps, but with `ThreadAbortSignal.serialize()` and `new ThreadAbortSignal`:

```ts
import {
createThreadAbortSignal,
acceptThreadAbortSignal,
} from '@quilted/threads';

const abortController = new AbortController();
const serializedAbortSignal = createThreadAbortSignal(abortController.signal);
const liveAbortSignal = acceptThreadAbortSignal(serializedAbortSignal);

await fetch('/', {signal: liveAbortSignal});

// Becomes:

import { ThreadAbortSignal } from '@quilted/threads';\

const abortController = new AbortController();
const serializedAbortSignal = ThreadAbortSignal.serialize(abortController.signal);
const liveAbortSignal = new ThreadAbortSignal(serializedAbortSignal);

await fetch('/', {signal: liveAbortSignal});
```

Additionally, the new `ThreadAbortSignal` class assumes you are not doing manual memory management by default. If your target environment does not support automatic memory management of transferred functions, you will need to manually pass the `retain` and `release` functions to the new APIs:

```ts
import {retain, release, ThreadAbortSignal} from '@quilted/threads';

const abortController = new AbortController();
const serializedAbortSignal = ThreadAbortSignal.serialize(
abortController.signal,
{retain, release},
);
const liveAbortSignal = new ThreadAbortSignal(serializedAbortSignal, {
retain,
release,
});

await fetch('/', {signal: liveAbortSignal});
```
6 changes: 3 additions & 3 deletions packages/quilt/source/threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ export {
markAsTransferable,
isMemoryManageable,
createThread,
createThreadAbortSignal,
acceptThreadAbortSignal,
ThreadAbortSignal,
createBasicEncoder,
createThreadFromIframe,
createThreadFromInsideIframe,
Expand All @@ -20,7 +19,8 @@ export type {
Thread,
ThreadOptions,
ThreadTarget,
ThreadAbortSignal,
ThreadAbortSignalOptions,
ThreadAbortSignalSerialization,
ThreadEncoder,
ThreadEncoderApi,
ThreadEncodable,
Expand Down
66 changes: 55 additions & 11 deletions packages/threads/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,81 @@ Once an object is fully released, any attempt to call its proxied functions will

#### [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)

[`AbortSignal`s](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) allow you to communicate that an asynchronous operation should stop. Because all methods exposed through `@quilted/threads` are asynchronous, you may find many uses
for `AbortSignal`s. However, it can be a bit tricky to communicate an abort signal across threads yourself. To make this easier, this library provides a pair of utilities to create a "thread-safe" `AbortSignal` on one thread, and to "accept" that signal on another thread. In the thread sending a signal, use the `createThreadAbortSignal()` function from this library, passing it an `AbortSignal`:
[`AbortSignal`s](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) allow you to communicate that an asynchronous operation should stop. Because all methods exposed through `@quilted/threads` are asynchronous, you may find many uses for `AbortSignal`s. However, it can be a bit tricky to communicate an abort signal across threads yourself. To make this easier, this library provides utilities to create a serialized `AbortSignal` on one thread, and to convert that serialized version into a “live” `AbortSignal` on another thread. In the thread sending a signal, use the `ThreadAbortSignal.serialize()` method to serialize your `AbortSignal`:

```ts
import {createThreadFromWebWorker, ThreadAbortSignal} from '@quilted/threads';

const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);

const abort = new AbortController();
await thread.calculateResult({
signal: ThreadAbortSignal.serialize(abort.signal),
});
```

On the receiving thread, use `new ThreadAbortSignal()` to turn it back into a live `AbortSignal`, in the current thread’s JavaScript environment:

```ts
import {
createThreadFromWebWorker,
createThreadAbortSignal,
ThreadAbortSignal,
type ThreadAbortSignalSerialization,
} from '@quilted/threads';

const thread = createThreadFromWebWorker(self, {
expose: {calculateResult},
});

function calculateResult({
signal: threadSignal,
}: {
signal: ThreadAbortSignalSerialization;
}) {
const signal = new ThreadAbortSignal(threadSignal);
return await figureOutResult({signal});
}
```

If you are using `@quilted/threads`’ manual memory management approach, you must explicitly pass `retain` and `release` functions to `ThreadAbortSignal.serialize()` and `new ThreadAbortSignal()` methods:

```ts
import {
retain,
release,
createThreadFromWebWorker,
ThreadAbortSignal,
} from '@quilted/threads';

const worker = new Worker('worker.js');
const thread = createThreadFromWebWorker(worker);

const abort = new AbortController();
await thread.calculateResult({signal: createThreadSignal(abort.signal)});
```
await thread.calculateResult({
signal: ThreadAbortSignal.serialize(abort.signal, {retain, release}),
});

On the receiving thread, use the `acceptThreadAbortSignal()` to turn it back into a "live" `AbortSignal`, in the current thread’s JavaScript environment:
// In the worker:

```ts
import {
retain,
release,
createThreadFromWebWorker,
acceptThreadAbortSignal,
type ThreadAbortSignal,
ThreadAbortSignal,
type ThreadAbortSignalSerialization,
} from '@quilted/threads';

const thread = createThreadFromWebWorker(self, {
expose: {calculateResult},
});

function calculateResult({signal: threadSignal}: {signal: ThreadAbortSignal}) {
const signal = acceptThreadAbortSignal(threadSignal);
function calculateResult({
signal: threadSignal,
}: {
signal: ThreadAbortSignalSerialization;
}) {
const signal = new ThreadAbortSignal(threadSignal, {retain, release});
return await figureOutResult({signal});
}
```
Expand Down
173 changes: 173 additions & 0 deletions packages/threads/source/ThreadAbortSignal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/**
* A representation of an `AbortSignal` that can be serialized between
* two threads.
*/
export interface ThreadAbortSignalSerialization {
/**
* Whether the signal was already aborted at the time it was
* sent to the sibling thread.
*/
readonly aborted: boolean;

/**
* A function to connect the signal between the two threads. This
* function should be called by the sibling thread when the abort
* state changes (including changes since the thread-safe abort signal
* was created).
*/
start?(listener: (aborted: boolean) => void): void;
}

export interface ThreadAbortSignalOptions {
/**
* An optional function to call in order to manually retain the memory
* associated with the `start` function of the serialized signal.
* You only need to use this when using a strategy for serializing the
* abort signal that requires manual memory management.
*/
retain?(value: unknown): void;

/**
* An optional function to call in order to manually release the memory
* associated with the `start` function of the serialized signal.
* You only need to use this when using a strategy for serializing the
* abort signal that requires manual memory management.
*/
release?(value: unknown): void;
}

/**
* Converts a serialized `AbortSignal` into a “live” one, which you can
* use to cancel operations in the current environment. When the signal aborts,
* all memory associated with the signal will be released automatically.
*/
export class ThreadAbortSignal implements AbortSignal {
#abortController: AbortController | undefined;
#abortSignal: AbortSignal;
#onabort: AbortSignal['onabort'] | null = null;

// Proxy properties
get aborted(): boolean {
return this.#abortSignal.aborted;
}

get reason(): any {
return this.#abortSignal.reason;
}

get onabort() {
return this.#onabort;
}

set onabort(value) {
if (this.#onabort) {
this.#abortSignal.removeEventListener('abort', this.#onabort);
}

this.#onabort = value;

if (value) {
this.#abortSignal.addEventListener('abort', value);
}
}

constructor(
signal: AbortSignal | ThreadAbortSignalSerialization | undefined,
{retain, release}: ThreadAbortSignalOptions = {},
) {
if (isAbortSignal(signal)) {
this.#abortSignal = signal;
} else {
this.#abortController = new AbortController();
this.#abortSignal = this.#abortController.signal;

const {aborted, start} = signal ?? {};

if (aborted) {
this.#abortController.abort();
} else if (start) {
retain?.(start);

start((aborted) => {
if (aborted) this.#abortController!.abort();
});

if (release) {
this.#abortSignal.addEventListener('abort', () => release(start), {
once: true,
});
}
}
}
}

// Proxy methods
addEventListener(...args: Parameters<AbortSignal['addEventListener']>) {
return this.#abortSignal.addEventListener(...args);
}

removeEventListener(...args: Parameters<AbortSignal['removeEventListener']>) {
return this.#abortSignal.removeEventListener(...args);
}

dispatchEvent(...args: Parameters<AbortSignal['dispatchEvent']>): boolean {
return this.#abortSignal.dispatchEvent(...args);
}

throwIfAborted() {
return this.#abortSignal.throwIfAborted();
}

/**
* Converts an `AbortSignal` into a version of that signal that can
* be transferred to a target `Thread`. The resulting object can be
* serialized using the RPC utilities provided in this library, and
* passed to `new ThreadAbortSignal()` to be converted into a “live”
* `AbortSignal`.
*/
static serialize(
signal: AbortSignal,
{retain, release}: ThreadAbortSignalOptions = {},
): ThreadAbortSignalSerialization {
if (signal.aborted) {
return {
aborted: true,
};
}

const listeners = new Set<(aborted: boolean) => void>();

signal.addEventListener(
'abort',
() => {
for (const listener of listeners) {
listener(signal.aborted);
release?.(listener);
}

listeners.clear();
},
{once: true},
);

return {
aborted: false,
start(listener) {
if (signal.aborted) {
listener(true);
} else {
retain?.(listener);
listeners.add(listener);
}
},
};
}
}

function isAbortSignal(value: unknown): value is AbortSignal {
return (
value != null &&
typeof (value as any).aborted === 'boolean' &&
typeof (value as any).addEventListener === 'function'
);
}
3 changes: 0 additions & 3 deletions packages/threads/source/abort-signal.ts

This file was deleted.

48 changes: 0 additions & 48 deletions packages/threads/source/abort-signal/accept.ts

This file was deleted.

Loading

0 comments on commit ecd7322

Please sign in to comment.