-
Notifications
You must be signed in to change notification settings - Fork 20
/
actor.ts
144 lines (136 loc) · 4.05 KB
/
actor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { Timer } from "./performance";
import type { IsTransferrable, Timing } from "./types";
import { onAbort, withTimeout } from "./utils";
let id = 0;
interface Cancel {
type: "cancel";
id: number;
}
interface Response {
type: "response";
id: number;
error?: string;
response?: any;
timings: Timing;
}
interface Request {
type: "request";
id?: number;
name: string;
args: any[];
}
type Message = Cancel | Response | Request;
type MethodsReturning<T, R> = {
[K in keyof T]: T[K] extends (...args: any) => R ? T[K] : never;
};
type Head<T extends any[]> = T extends [...infer Head, any] ? Head : any[];
/**
* Utility for sending messages to a remote instance of `<T>` running in a web worker
* from the main thread, or in the main thread running from a web worker.
*/
export default class Actor<T> {
callbacks: {
[id: number]: (
error: Error | undefined,
message: any,
timings: Timing,
) => void;
};
cancels: { [id: number]: AbortController };
dest: Worker;
timeoutMs: number;
constructor(dest: Worker, dispatcher: any, timeoutMs: number = 20_000) {
this.callbacks = {};
this.cancels = {};
this.dest = dest;
this.timeoutMs = timeoutMs;
this.dest.onmessage = async ({ data }) => {
const message: Message = data;
if (message.type === "cancel") {
const cancel = this.cancels[message.id];
delete this.cancels[message.id];
cancel?.abort();
} else if (message.type === "response") {
const callback = this.callbacks[message.id];
delete this.callbacks[message.id];
if (callback) {
callback(
message.error ? new Error(message.error) : undefined,
message.response,
message.timings,
);
}
} else if (message.type === "request") {
const timer = new Timer("worker");
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
const handler: Function = (dispatcher as any)[message.name];
const abortController = new AbortController();
const request = handler.apply(handler, [
...message.args,
abortController,
timer,
]);
const url = `${message.name}_${message.id}`;
if (message.id && request) {
this.cancels[message.id] = abortController;
try {
const response = await request;
const transferrables = (response as IsTransferrable)
?.transferrables;
this.postMessage(
{
id: message.id,
type: "response",
response,
timings: timer.finish(url),
},
transferrables,
);
} catch (e: any) {
this.postMessage({
id: message.id,
type: "response",
error: e?.toString() || "error",
timings: timer.finish(url),
});
}
delete this.cancels[message.id];
}
}
};
}
postMessage(message: Message, transferrables?: Transferable[]) {
this.dest.postMessage(message, transferrables || []);
}
/** Invokes a method by name with a set of arguments in the remote context. */
send<
R,
M extends MethodsReturning<T, Promise<R>>,
K extends keyof M & string,
P extends Head<Parameters<M[K]>>,
>(
name: K,
transferrables: Transferable[],
abortController: AbortController,
timer?: Timer,
...args: P
): Promise<R> {
const thisId = ++id;
const value: Promise<R> = new Promise((resolve, reject) => {
this.postMessage(
{ id: thisId, type: "request", name, args },
transferrables,
);
this.callbacks[thisId] = (error, result, timings) => {
timer?.addAll(timings);
if (error) reject(error);
else resolve(result);
};
});
onAbort(abortController, () => {
delete this.callbacks[thisId];
this.postMessage({ id: thisId, type: "cancel" });
});
return withTimeout(this.timeoutMs, value, abortController);
}
}