Skip to content

Commit

Permalink
correctly serialize headers and timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
turbocrime committed Feb 22, 2025
1 parent 11b89b2 commit 9d32ff2
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .changeset/curly-icons-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@penumbra-zone/transport-dom': patch
---

convey timeout in request headers
5 changes: 5 additions & 0 deletions .changeset/khaki-pandas-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@penumbra-zone/transport-dom': patch
---

exclude non-transferable type member from internal message types
40 changes: 25 additions & 15 deletions packages/transport-dom/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
TransportMessage,
TransportStream,
} from './messages.js';
import { normalizeHeader } from './util.js';

import ReadableStream from './ReadableStream.from.js';

Expand Down Expand Up @@ -157,15 +158,12 @@ export const createChannelTransport = ({
port ??= await connect();

const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(
transportFailure.signal,
requestFailure.signal,
timeoutMs > 0 ? AbortSignal.timeout(timeoutMs) : undefined,
signal,
),
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
new Promise<TransportMessage>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportMessage(tev, requestId)) {
Expand All @@ -188,7 +186,12 @@ export const createChannelTransport = ({
signal?.addEventListener('abort', () =>
port?.postMessage({ requestId, abort: true } satisfies TransportAbort),
);
port.postMessage({ requestId, message, header } satisfies TransportMessage);

port.postMessage({
requestId,
message,
header: normalizeHeader(timeoutMs, header),
} satisfies TransportMessage);
}
break;
default:
Expand Down Expand Up @@ -227,14 +230,10 @@ export const createChannelTransport = ({
const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(
transportFailure.signal,
requestFailure.signal,
timeoutMs > 0 ? AbortSignal.timeout(timeoutMs) : undefined,
signal,
),
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
new Promise<TransportStream>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportStream(tev, requestId)) {
Expand Down Expand Up @@ -263,7 +262,11 @@ export const createChannelTransport = ({
// confirm the input stream ended after one message with content
if (done && typeof value === 'object' && value !== null) {
const message = Any.pack(new method.I(value as object)).toJson(jsonOptions);
port.postMessage({ requestId, message, header } satisfies TransportMessage);
port.postMessage({
requestId,
message,
header: normalizeHeader(timeoutMs, header),
} satisfies TransportMessage);
} else {
throw new ConnectError(
'MethodKind.ServerStreaming expects a single request message',
Expand All @@ -282,7 +285,14 @@ export const createChannelTransport = ({
cont.enqueue(Any.pack(new method.I(chunk)).toJson(jsonOptions)),
}),
);
port.postMessage({ requestId, stream, header } satisfies TransportStream, [stream]);
port.postMessage(
{
requestId,
stream,
header: normalizeHeader(timeoutMs, header),
} satisfies TransportStream,
[stream],
);
}
break;
default:
Expand Down
7 changes: 3 additions & 4 deletions packages/transport-dom/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { JsonValue } from '@bufbuild/protobuf';
export interface TransportError<I extends string | undefined> extends Partial<TransportEvent> {
requestId: I extends string ? string : string | undefined;
error: JsonValue;
metadata?: HeadersInit;
metadata?: Extract<HeadersInit, JsonValue>;
}

// transport content
Expand All @@ -14,9 +14,8 @@ export type TransportData = TransportMessage | TransportStream;

export interface TransportEvent<I extends string = string> {
requestId: I;
header?: HeadersInit;
trailer?: HeadersInit;
// contextValues?: object;
header?: Extract<HeadersInit, JsonValue>;
trailer?: Extract<HeadersInit, JsonValue>;
}

export interface TransportAbort<I = string> extends TransportEvent<I extends string ? I : never> {
Expand Down
14 changes: 14 additions & 0 deletions packages/transport-dom/src/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Collect timeout and user-provided headers to a JSON-serializable object.
*/
export const normalizeHeader = (timeoutMs?: number, userProvidedHeaders?: HeadersInit) => {
const headers = new Headers(userProvidedHeaders ?? {});

if (timeoutMs) {
headers.set('headerTimeout', `${timeoutMs}`);
}

const entries = Array.from(headers);

return entries.length ? Object.fromEntries(entries) : undefined;
};

0 comments on commit 9d32ff2

Please sign in to comment.