Skip to content

Commit

Permalink
Simplified handling of EOS
Browse files Browse the repository at this point in the history
  • Loading branch information
corrideat committed Mar 15, 2024
1 parent 88a3e95 commit a6422eb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 41 deletions.
4 changes: 1 addition & 3 deletions src/encodeMultipartMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ export type TDecodedMultipartMessage = {
parts?: TIterable<TDecodedMultipartMessage>;
};

const textEncoder = new TextEncoder();

export const liberalBoundaryMatchRegex = /;\s*boundary=(?:"([^"]+)"|([^;",]+))/;

const multipartBoundaryAlphabet =
Expand Down Expand Up @@ -55,6 +53,7 @@ async function* asyncEncoderGenerator(
msg: TIterable<TDecodedMultipartMessage>,
ws: WritableStream,
): AsyncGenerator<void> {
const textEncoder = new TextEncoder();
const encodedBoundary = textEncoder.encode(`\r\n--${boundary}`);

if (Array.isArray(msg) && msg.length < 1) {
Expand Down Expand Up @@ -215,7 +214,6 @@ const encodeMultipartMessage = (

controller.enqueue(readResult.value);
} catch (readError) {
console.error('Read error:', readError);
controller.error(readError);
return;
}
Expand Down
74 changes: 36 additions & 38 deletions src/parseMultipartMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ enum EState {
EPILOGUE,
}

const textEncoder = new TextEncoder();

const newline = textEncoder.encode('\r\n');
const LWSPchar = [0x09, 0x20];

export type TMultipartMessage = {
headers: Headers;
body?: Uint8Array | null;
Expand All @@ -47,18 +42,28 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
throw new Error('Invalid boundary delimiter');
}

const textEncoder = new TextEncoder();
const LWSPchar = [0x09, 0x20];
const newline = new Uint8Array([0x0d, 0x0a]); // '\r\n'

const boundaryDelimiter = textEncoder.encode(`\r\n--${boundary}`);

let buffer = new Uint8Array();
let state: EState = EState.PREAMBLE;
let eosReached = false;

const reader = stream.getReader();

try {
while (state !== EState.EPILOGUE) {
const { done, value } = await reader.read();

if (!done) {
if (done) {
if (buffer.length === 0 || eosReached) {
throw new Error('Invalid message');
}
eosReached = true;
} else {
buffer = mergeTypedArrays(
buffer,
ArrayBuffer.isView(value)
Expand All @@ -71,15 +76,15 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
);
}

while (buffer.length) {
while (buffer.length > 0) {
let boundaryIndex: number = NaN;

if (state === EState.PREAMBLE) {
// Special handling of empty preamble
boundaryIndex =
findIndex(buffer, boundaryDelimiter.slice(2)) - 2;

if (boundaryIndex === -3 && !done) {
if (boundaryIndex === -3) {
// If the boundary isn't found in the current buffer, we
// need to read more data
break;
Expand All @@ -90,7 +95,7 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
boundaryIndex = findIndex(buffer, boundaryDelimiter);
}

if (boundaryIndex === -1 && !done) {
if (boundaryIndex === -1) {
// If the boundary isn't found in the current buffer, we need to read more data
break;
}
Expand All @@ -100,32 +105,29 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
// Transport padding
// Maximum acceptable transport padding
// set to 32 bytes
const nextIndexCRLF = findIndex(
buffer.subarray(nextIndex, nextIndex + 32),
newline,
);
const nextIndexCRLF = done
? 0
: findIndex(
buffer.subarray(nextIndex, nextIndex + 32),
newline,
);

if (!done) {
if (
nextIndexCRLF === -1 &&
buffer.length - nextIndex < 32
) {
break;
}
if (nextIndexCRLF === -1 && buffer.length - nextIndex < 32) {
break;
}

if (
nextIndexCRLF === -1 ||
!Array.from(
buffer.subarray(
nextIndex + Math.min(2, nextIndexCRLF),
nextIndex + nextIndexCRLF,
),
).every((v) => LWSPchar.includes(v))
) {
throw new Error(
`Invalid boundary at index ${boundaryIndex}`,
);
}
if (
nextIndexCRLF === -1 ||
!Array.from(
buffer.subarray(
nextIndex + Math.min(2, nextIndexCRLF),
nextIndex + nextIndexCRLF,
),
).every((v) => LWSPchar.includes(v))
) {
throw new Error(
`Invalid boundary at index ${boundaryIndex}`,
);
}

// Possibly reached the end of the multipart message
Expand All @@ -145,7 +147,7 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
throw new Error(
`Invalid boundary at index ${boundaryIndex} (${boundary}): ${buffer[
nextIndex + 1
].toString(16)}`,
]?.toString(16)}`,
);
}
}
Expand Down Expand Up @@ -209,10 +211,6 @@ async function* parseMultipartMessage<T extends TTypedArray | ArrayBuffer>(
break;
}

if (done) {
throw new Error('Invalid message');
}

buffer = buffer.subarray(nextIndexCRLF + nextIndex + 2);
}
}
Expand Down

0 comments on commit a6422eb

Please sign in to comment.