Skip to content

Commit

Permalink
feat(jspi): enable http_request via JSPI
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdickinson committed Nov 19, 2024
1 parent dc7edae commit 8345362
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 138 deletions.
2 changes: 2 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ build_worker_node out='worker/node' args='[]':
"platform": "node",
"polyfills": {
"./src/polyfills/deno-capabilities.ts": "./src/polyfills/node-capabilities.ts",
"./src/polyfills/deno-minimatch.ts": "./src/polyfills/node-minimatch.ts",
"./src/polyfills/node-fs.ts": "node:fs/promises",
"./src/polyfills/deno-wasi.ts": "./src/polyfills/node-wasi.ts",
}
Expand All @@ -170,6 +171,7 @@ build_worker_browser out='worker/browser' args='[]':
},
"polyfills": {
"./src/polyfills/deno-capabilities.ts": "./src/polyfills/browser-capabilities.ts",
"./src/polyfills/deno-minimatch.ts": "./src/polyfills/node-minimatch.ts",
"./src/polyfills/node-fs.ts": "./src/polyfills/browser-fs.ts",
"./src/polyfills/deno-wasi.ts": "./src/polyfills/browser-wasi.ts",
}
Expand Down
93 changes: 1 addition & 92 deletions src/background-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ import {
} from './call-context.ts';
import {
type InternalConfig,
MemoryOptions,
PluginOutput,
SAB_BASE_OFFSET,
SharedArrayBufferSection,
} from './interfaces.ts';
import { readBodyUpTo } from './utils.ts';
import { WORKER_URL } from './worker-url.ts';
import { Worker } from 'node:worker_threads';
import { CAPABILITIES } from './polyfills/deno-capabilities.ts';
import { EXTISM_ENV } from './foreground-plugin.ts';
import { matches } from './polyfills/deno-minimatch.ts';
import { HttpContext } from './http-context.ts'

// Firefox has not yet implemented Atomics.waitAsync, but we can polyfill
// it using a worker as a one-off.
Expand Down Expand Up @@ -496,95 +494,6 @@ class RingBufferWriter {
}
}

class HttpContext {
fetch: typeof fetch;
lastStatusCode: number;
lastHeaders: Record<string, string> | null;
allowedHosts: string[];
memoryOptions: MemoryOptions;

constructor(
_fetch: typeof fetch,
allowedHosts: string[],
memoryOptions: MemoryOptions,
allowResponseHeaders: boolean,
) {
this.fetch = _fetch;
this.allowedHosts = allowedHosts;
this.lastStatusCode = 0;
this.memoryOptions = memoryOptions;
this.lastHeaders = allowResponseHeaders ? {} : null;
}

contribute(functions: Record<string, Record<string, any>>) {
functions[EXTISM_ENV] ??= {};
functions[EXTISM_ENV].http_request = (callContext: CallContext, reqaddr: bigint, bodyaddr: bigint) =>
this.makeRequest(callContext, reqaddr, bodyaddr);
functions[EXTISM_ENV].http_status_code = () => this.lastStatusCode;
functions[EXTISM_ENV].http_headers = (callContext: CallContext) => {
if (this.lastHeaders === null) {
return 0n;
}
return callContext.store(JSON.stringify(this.lastHeaders));
};
}

async makeRequest(callContext: CallContext, reqaddr: bigint, bodyaddr: bigint) {
if (this.lastHeaders !== null) {
this.lastHeaders = {};
}
this.lastStatusCode = 0;

const req = callContext.read(reqaddr);
if (req === null) {
return 0n;
}

const { headers, header, url: rawUrl, method: m } = req.json();
const method = m ?? 'GET';
const url = new URL(rawUrl);

const isAllowed = this.allowedHosts.some((allowedHost) => {
return allowedHost === url.hostname || matches(url.hostname, allowedHost);
});

if (!isAllowed) {
throw new Error(`Call error: HTTP request to "${url}" is not allowed (no allowedHosts match "${url.hostname}")`);
}

const body = bodyaddr === 0n || method === 'GET' || method === 'HEAD' ? null : callContext.read(bodyaddr)?.bytes();
const fetch = this.fetch;
const response = await fetch(rawUrl, {
headers: headers || header,
method,
...(body ? { body: body.slice() } : {}),
});

this.lastStatusCode = response.status;

if (this.lastHeaders !== null) {
this.lastHeaders = Object.fromEntries(response.headers);
}

try {
const bytes = this.memoryOptions.maxHttpResponseBytes
? await readBodyUpTo(response, this.memoryOptions.maxHttpResponseBytes)
: new Uint8Array(await response.arrayBuffer());

const result = callContext.store(bytes);

return result;
} catch (err) {
if (err instanceof Error) {
const ptr = callContext.store(new TextEncoder().encode(err.message));
callContext[ENV].log_error(ptr);
return 0n;
}
return 0n;
}
}
}

export async function createBackgroundPlugin(
opts: InternalConfig,
names: string[],
Expand Down
69 changes: 56 additions & 13 deletions src/foreground-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ import { BEGIN, CallContext, END, ENV, GET_BLOCK, RESET, SET_HOST_CONTEXT, STORE
import { type InternalConfig, InternalWasi, PluginOutput } from './interfaces.ts';
import { CAPABILITIES } from './polyfills/deno-capabilities.ts';
import { loadWasi } from './polyfills/deno-wasi.ts';
import { HttpContext } from './http-context.ts';

export const EXTISM_ENV = 'extism:host/env';

type InstantiatedModule = [WebAssembly.Module, WebAssembly.Instance];

interface SuspendingCtor {
new (fn: CallableFunction): any;
new(fn: CallableFunction): any;
}

const AsyncFunction = (async () => {}).constructor;
const AsyncFunction = (async () => { }).constructor;
const Suspending: SuspendingCtor | undefined = (WebAssembly as any).Suspending;
const promising: CallableFunction | undefined = (WebAssembly as any).promising;

Expand Down Expand Up @@ -140,7 +141,7 @@ export async function createForegroundPlugin(
const isAsync = func.constructor === AsyncFunction;
suspendsOnInvoke ||= isAsync;
const wrapped = func.bind(null, context);
imports[namespace][name] = isAsync ? new (WebAssembly as any).Suspending(wrapped) : wrapped;
imports[namespace][name] = isAsync ? new Suspending!(wrapped) : wrapped;
}
}

Expand All @@ -158,12 +159,14 @@ export async function createForegroundPlugin(
const seen: Map<WebAssembly.Module, WebAssembly.Instance> = new Map();
const wasiList: InternalWasi[] = [];

const instance = await instantiateModule(['main'], modules[mainIndex], imports, opts, wasiList, names, modules, seen);
const mutableFlags = { suspendsOnInvoke }
const instance = await instantiateModule(context, ['main'], modules[mainIndex], imports, opts, wasiList, names, modules, seen, mutableFlags);

return new ForegroundPlugin(opts, context, [modules[mainIndex], instance], wasiList, suspendsOnInvoke);
return new ForegroundPlugin(opts, context, [modules[mainIndex], instance], wasiList, mutableFlags.suspendsOnInvoke);
}

async function instantiateModule(
context: CallContext,
current: string[],
module: WebAssembly.Module,
imports: Record<string, Record<string, any>>,
Expand All @@ -172,6 +175,7 @@ async function instantiateModule(
names: string[],
modules: WebAssembly.Module[],
linked: Map<WebAssembly.Module, WebAssembly.Instance | null>,
mutableFlags: { suspendsOnInvoke: boolean }
) {
linked.set(module, null);

Expand Down Expand Up @@ -216,6 +220,45 @@ async function instantiateModule(
);
}

// XXX(chrisdickinson): This is a bit of a hack, admittedly. So what's going on here?
//
// JSPI is going on here. Let me explain: at the time of writing, the js-sdk supports
// JSPI by detecting AsyncFunction use in the `functions` parameter. When we detect an
// async function in imports we _must_ mark all exported Wasm functions as "promising" --
// that is, they might call a host function that suspends the stack.
//
// If we were to mark extism's http_request as async, we would _always_ set exports as
// "promising". This adds unnecessary overhead for folks who aren't using `http_request`.
// Instead, we detect if any of the manifest items *import* `http_request`. If they
// haven't overridden the default CallContext implementation, we provide an HttpContext
// on-demand.
//
// Unfortuantely this duplicates a little bit of logic-- in particular, we have to bind
// CallContext to each of the HttpContext contributions (See "REBIND" below.)
if (
module === EXTISM_ENV &&
name === 'http_request' &&
promising &&
imports[module][name] === context[ENV].http_request
) {
const httpContext = new HttpContext(
opts.fetch,
opts.allowedHosts,
opts.memory,
opts.allowHttpResponseHeaders
);

mutableFlags.suspendsOnInvoke = true

const contributions = {} as any
httpContext.contribute(contributions)
for (const [key, entry] of Object.entries(contributions[EXTISM_ENV] as { [k: string]: CallableFunction })) {
// REBIND:
imports[module][key] = (entry as any).bind(null, context)
}
imports[module][name] = new Suspending!(imports[module][name])
}

switch (kind) {
case `function`: {
instantiationImports[module] ??= {};
Expand Down Expand Up @@ -246,11 +289,11 @@ async function instantiateModule(

// If the dependency provides "_start", treat it as a WASI Command module; instantiate it (and its subtree) directly.
const instance = providerExports.find((xs) => xs.name === '_start')
? await instantiateModule([...current, module], provider, imports, opts, wasiList, names, modules, new Map())
? await instantiateModule(context, [...current, module], provider, imports, opts, wasiList, names, modules, new Map(), mutableFlags)
: !linked.has(provider)
? (await instantiateModule([...current, module], provider, imports, opts, wasiList, names, modules, linked),
linked.get(provider))
: linked.get(provider);
? (await instantiateModule(context, [...current, module], provider, imports, opts, wasiList, names, modules, linked, mutableFlags),
linked.get(provider))
: linked.get(provider);

if (!instance) {
// circular import, either make a trampoline or bail
Expand Down Expand Up @@ -291,10 +334,10 @@ async function instantiateModule(
const guestType = instance.exports.hs_init
? 'haskell'
: instance.exports._initialize
? 'reactor'
: instance.exports._start
? 'command'
: 'none';
? 'reactor'
: instance.exports._start
? 'command'
: 'none';

if (wasi) {
await wasi?.initialize(instance);
Expand Down
129 changes: 129 additions & 0 deletions src/http-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import {
CallContext,
ENV,
} from './call-context.ts';
import {
MemoryOptions,
} from './interfaces.ts';
import { EXTISM_ENV } from './foreground-plugin.ts';
import { matches } from './polyfills/deno-minimatch.ts';

export class HttpContext {
fetch: typeof fetch;
lastStatusCode: number;
lastHeaders: Record<string, string> | null;
allowedHosts: string[];
memoryOptions: MemoryOptions;

constructor(
_fetch: typeof fetch,
allowedHosts: string[],
memoryOptions: MemoryOptions,
allowResponseHeaders: boolean,
) {
this.fetch = _fetch;
this.allowedHosts = allowedHosts;
this.lastStatusCode = 0;
this.memoryOptions = memoryOptions;
this.lastHeaders = allowResponseHeaders ? {} : null;
}

contribute(functions: Record<string, Record<string, any>>) {
functions[EXTISM_ENV] ??= {};
functions[EXTISM_ENV].http_request = (callContext: CallContext, reqaddr: bigint, bodyaddr: bigint) =>
this.makeRequest(callContext, reqaddr, bodyaddr);
functions[EXTISM_ENV].http_status_code = () => this.lastStatusCode;
functions[EXTISM_ENV].http_headers = (callContext: CallContext) => {
if (this.lastHeaders === null) {
return 0n;
}
return callContext.store(JSON.stringify(this.lastHeaders));
};
}

async makeRequest(callContext: CallContext, reqaddr: bigint, bodyaddr: bigint) {
if (this.lastHeaders !== null) {
this.lastHeaders = {};
}
this.lastStatusCode = 0;

const req = callContext.read(reqaddr);
if (req === null) {
return 0n;
}

const { headers, header, url: rawUrl, method: m } = req.json();
const method = m ?? 'GET';
const url = new URL(rawUrl);

const isAllowed = this.allowedHosts.some((allowedHost) => {
return allowedHost === url.hostname || matches(url.hostname, allowedHost);
});

if (!isAllowed) {
throw new Error(`Call error: HTTP request to "${url}" is not allowed (no allowedHosts match "${url.hostname}")`);
}

const body = bodyaddr === 0n || method === 'GET' || method === 'HEAD' ? null : callContext.read(bodyaddr)?.bytes();
const fetch = this.fetch;
const response = await fetch(rawUrl, {
headers: headers || header,
method,
...(body ? { body: body.slice() } : {}),
});

this.lastStatusCode = response.status;

if (this.lastHeaders !== null) {
this.lastHeaders = Object.fromEntries(response.headers);
}

try {
const bytes = this.memoryOptions.maxHttpResponseBytes
? await readBodyUpTo(response, this.memoryOptions.maxHttpResponseBytes)
: new Uint8Array(await response.arrayBuffer());

const result = callContext.store(bytes);

return result;
} catch (err) {
if (err instanceof Error) {
const ptr = callContext.store(new TextEncoder().encode(err.message));
callContext[ENV].log_error(ptr);
return 0n;
}
return 0n;
}
}
}

async function readBodyUpTo(response: Response, maxBytes: number): Promise<Uint8Array> {
const reader = response.body?.getReader();
if (!reader) {
return new Uint8Array(0);
}

let receivedLength = 0;
const chunks = [];

while (receivedLength < maxBytes) {
const { done, value } = await reader.read();
if (done) {
break;
}
chunks.push(value);
receivedLength += value.length;
if (receivedLength >= maxBytes) {
throw new Error(`Response body exceeded ${maxBytes} bytes`);
}
}

const limitedResponseBody = new Uint8Array(receivedLength);
let position = 0;
for (const chunk of chunks) {
limitedResponseBody.set(chunk, position);
position += chunk.length;
}

return limitedResponseBody;
}
Loading

0 comments on commit 8345362

Please sign in to comment.