Skip to content

Commit

Permalink
fix(server): encode the SSE messages as bytes instead (trpc#6361)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Alex / KATT <[email protected]>
  • Loading branch information
danamajid and KATT authored Jan 7, 2025
1 parent 4275ac7 commit 87ea49e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
42 changes: 24 additions & 18 deletions packages/server/src/unstable-core-do-not-import/stream/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export const suppressLogs = () => {
console.error = error;
};
};

const textDecoder = new TextDecoder();

test('e2e, server-sent events (SSE)', async () => {
async function* data(lastEventId: string | undefined) {
let i = lastEventId ? Number(lastEventId) : 0;
Expand All @@ -46,16 +49,17 @@ test('e2e, server-sent events (SSE)', async () => {
const stream = sseStreamProducer({
data: data(lastEventId ?? undefined),
serialize: (v) => SuperJSON.serialize(v),
}).pipeThrough(
// debug stream
new TransformStream({
transform: (chunk, controller) => {
// console.debug('debug', chunk);
written.push(chunk);
controller.enqueue(chunk);
},
}),
);
})
.pipeThrough(new TextDecoderStream())
.pipeThrough(
// debug stream
new TransformStream({
transform(chunk, controller) {
written.push(chunk);
controller.enqueue(chunk);
},
}),
);

return new Response(stream, {
headers: sseHeaders,
Expand Down Expand Up @@ -196,14 +200,16 @@ test('SSE on serverless - emit and disconnect early', async () => {
data: data(asNumber, reqAbortCtrl.signal),
serialize: (v) => SuperJSON.serialize(v),
emitAndEndImmediately: true,
}).pipeThrough(
new TransformStream({
transform(chunk, controller) {
requestTrace.written.push(chunk);
controller.enqueue(chunk);
},
}),
);
})
.pipeThrough(new TextDecoderStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
requestTrace.written.push(chunk);
controller.enqueue(chunk);
},
}),
);

return new Response(stream, {
headers: sseHeaders,
Expand Down
41 changes: 22 additions & 19 deletions packages/server/src/unstable-core-do-not-import/stream/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,30 @@ export function sseStreamProducer<TValue = unknown>(
};
}
}

const stream = readableStreamFrom(generatorWithErrorHandling());

return stream.pipeThrough(
new TransformStream({
transform(chunk, controller: TransformStreamDefaultController<string>) {
if ('event' in chunk) {
controller.enqueue(`event: ${chunk.event}\n`);
}
if ('data' in chunk) {
controller.enqueue(`data: ${chunk.data}\n`);
}
if ('id' in chunk) {
controller.enqueue(`id: ${chunk.id}\n`);
}
if ('comment' in chunk) {
controller.enqueue(`: ${chunk.comment}\n`);
}
controller.enqueue('\n\n');
},
}),
);
return stream
.pipeThrough(
new TransformStream({
transform(chunk, controller: TransformStreamDefaultController<string>) {
if ('event' in chunk) {
controller.enqueue(`event: ${chunk.event}\n`);
}
if ('data' in chunk) {
controller.enqueue(`data: ${chunk.data}\n`);
}
if ('id' in chunk) {
controller.enqueue(`id: ${chunk.id}\n`);
}
if ('comment' in chunk) {
controller.enqueue(`: ${chunk.comment}\n`);
}
controller.enqueue('\n\n');
},
}),
)
.pipeThrough(new TextEncoderStream());
}

interface ConsumerStreamResultBase<TConfig extends ConsumerConfig> {
Expand Down

0 comments on commit 87ea49e

Please sign in to comment.