From 3c01bc5a57609be517a11bf1f61f3c19f9923f8e Mon Sep 17 00:00:00 2001 From: Panu Horsmalahti Date: Thu, 16 Jan 2025 13:31:28 +0200 Subject: [PATCH] fix: writing byte one at a time --- .../stream-impersonator.test.ts.snap | 11 ++ src/__tests__/stream-impersonator.test.ts | 56 +++++++++ src/__tests__/stream-utils.ts | 114 ++++++++++++++++++ src/stream-impersonator.ts | 21 ++-- src/stream-utils.ts | 27 +++++ 5 files changed, 219 insertions(+), 10 deletions(-) create mode 100644 src/__tests__/stream-utils.ts create mode 100644 src/stream-utils.ts diff --git a/src/__tests__/__snapshots__/stream-impersonator.test.ts.snap b/src/__tests__/__snapshots__/stream-impersonator.test.ts.snap index d9f014e..4438758 100644 --- a/src/__tests__/__snapshots__/stream-impersonator.test.ts.snap +++ b/src/__tests__/__snapshots__/stream-impersonator.test.ts.snap @@ -47,6 +47,17 @@ impersonate-user: johndoe BarÄö" `; +exports[`StreamImpersonator handles binary data in body one byte at a time 1`] = ` +"POST / HTTP/1.1 +accept: application/json +content-type: application/octet-stream +content-length: 4 +authorization: Bearer service-account-token +impersonate-user: johndoe + +Äö" +`; + exports[`StreamImpersonator handles body separator splitted to separate chunks 1`] = ` "GET / HTTP/1.1 accept: application/json diff --git a/src/__tests__/stream-impersonator.test.ts b/src/__tests__/stream-impersonator.test.ts index 56cd2e2..8922b15 100644 --- a/src/__tests__/stream-impersonator.test.ts +++ b/src/__tests__/stream-impersonator.test.ts @@ -213,6 +213,62 @@ MwIDAQAB expect(destination.buffer.includes(Buffer.from([0x01, 0x02, 0x03, 0x42, 0x61, 0x72, 0xC3, 0x84, 0xC3, 0xB6]))).toBe(true); }); + it("handles parse errors", () => { + parser.boredServer = boredServer; + parser.publicKey = jwtPublicKey; + + stream.pipe(parser).pipe(destination); + + // Writing the headers as text + stream.write("POST "); + stream.write("/"); + stream.write(" HTTP/1.1"); + + expect(() => { + stream.write("\r\r\n\n"); + }).toThrowError("Parse Error"); + }); + + it("handles binary data in body one byte at a time", async () => { + parser.boredServer = boredServer; + parser.publicKey = jwtPublicKey; + + const token = jwt.sign( + { + exp: Math.floor(Date.now() / 1000) + 60 * 60, + sub: "johndoe", + aud: [boredServer], + }, + jwtPrivateKey, + { algorithm: "RS256" } + ); + + stream.pipe(parser).pipe(destination); + + // Writing the headers as text + stream.write("POST "); + stream.write("/"); + stream.write(" HTTP/1.1"); + stream.write("\r\n"); + stream.write("Accept: application/json\r\n"); + stream.write(`Authorization: Bearer ${token}\r\n`); + stream.write("Content-Type: application/octet-stream\r\nContent-Length: 4\r\n\r\n"); + + // Writing binary data as four Buffer chunks + // UTF8: Äö + stream.write(Buffer.from([0xC3])); + await delay(1); + stream.write(Buffer.from([0x84])); + await delay(1); + stream.write(Buffer.from([0xC3])); + await delay(1); + stream.write(Buffer.from([0xB6])); + + expect(destination.buffer.toString()).toMatchSnapshot(); + expect(destination.buffer.toString()).toContain("Äö"); + expect(destination.buffer.includes(Buffer.from([0xC3, 0x84, 0xC3, 0xB6]))).toBe(true); + }); + it ("handles headers in one chunk, body in another", () => { parser.boredServer = boredServer; parser.publicKey = jwtPublicKey; diff --git a/src/__tests__/stream-utils.ts b/src/__tests__/stream-utils.ts new file mode 100644 index 0000000..d523957 --- /dev/null +++ b/src/__tests__/stream-utils.ts @@ -0,0 +1,114 @@ +import { removeBytesFromBuffersHead } from "../stream-utils"; + +describe("stream-utils", () => { + describe("removeBytesFromBuffersHead", () => { + describe("no buffers", () => { + describe("zero bytes to remove", () => { + it("removes nothing", () => { + expect(removeBytesFromBuffersHead([], 0)).toEqual([]); + }); + }); + + describe("one byte to remove", () => { + it("removes nothing", () => { + expect(removeBytesFromBuffersHead([], 0)).toEqual([]); + }); + }); + + describe("minutes 1 byte to remove", () => { + it("removes nothing", () => { + expect(removeBytesFromBuffersHead([], -1)).toEqual([]); + }); + }); + }); + + describe("one buffer", () => { + describe("has one byte", () => { + it("is removed completely", () => { + const buffer = Buffer.from([0x01]); + + expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]); + }); + }); + + describe("has one byte and removes 2", () => { + it("is removed completely", () => { + const buffer = Buffer.from([0x01]); + + expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([]); + }); + }); + + describe("has empty buffer", () => { + it("is removed completely", () => { + const buffer = Buffer.from([]); + + expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]); + }); + }); + + describe("bytes to remove matches last buffer length", () => { + it("is removed completely", () => { + const buffer = Buffer.from([0x01, 0x02, 0x03]); + + expect(removeBytesFromBuffersHead([buffer], 3)).toEqual([]); + }); + }); + + describe("bytes to remove doesn't match last buffer length", () => { + it("is removed partially", () => { + const buffer = Buffer.from([0x01, 0x02, 0x03]); + + expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([Buffer.from([0x03])]); + }); + }); + }); + + describe("many buffers", () => { + describe("bytes to remove matches last buffer length", () => { + it("first buffer is removed completely", () => { + const buffer1 = Buffer.from([0x01, 0x02, 0x03]); + const buffer2 = Buffer.from([0x04, 0x05, 0x06]); + + expect(removeBytesFromBuffersHead([buffer1, buffer2], 3)).toEqual([buffer2]); + }); + }); + + describe("bytes to remove is less than last buffer length", () => { + it("is removed partially", () => { + const buffer1 = Buffer.from([0x01, 0x02, 0x03]); + const buffer2 = Buffer.from([0x04, 0x05, 0x06]); + + expect(removeBytesFromBuffersHead([buffer1, buffer2], 2)).toEqual([ Buffer.from([0x03]), buffer2]); + }); + }); + + describe("bytes to remove is more than last buffer length", () => { + it("is removed partially", () => { + const buffer1 = Buffer.from([0x01, 0x02, 0x03]); + const buffer2 = Buffer.from([0x04, 0x05, 0x06]); + + expect(removeBytesFromBuffersHead([buffer1, buffer2], 4)).toEqual([Buffer.from([0x05, 0x06])]); + }); + }); + + describe("bytes to remove is equal to sum of all buffer lengths", () => { + it("is removed completely", () => { + const buffer1 = Buffer.from([0x01, 0x02, 0x03]); + const buffer2 = Buffer.from([0x04, 0x05, 0x06]); + + expect(removeBytesFromBuffersHead([buffer1, buffer2], 6)).toEqual([]); + }); + }); + + describe("bytes to remove is more than the sum of all buffer lengths", () => { + it("is removed completely", () => { + const buffer1 = Buffer.from([0x01, 0x02, 0x03]); + const buffer2 = Buffer.from([0x04, 0x05, 0x06]); + + expect(removeBytesFromBuffersHead([buffer1, buffer2], 9999)).toEqual([]); + }); + }); + }); + }); +}); diff --git a/src/stream-impersonator.ts b/src/stream-impersonator.ts index 1a92a6e..1fe6005 100644 --- a/src/stream-impersonator.ts +++ b/src/stream-impersonator.ts @@ -3,6 +3,7 @@ import { HTTPParser, HTTPParserJS } from "http-parser-js"; import { chunk } from "lodash"; import * as jwt from "jsonwebtoken"; import logger from "./logger"; +import { removeBytesFromBuffersHead } from "./stream-utils"; type TokenPayload = { exp: number; @@ -23,7 +24,10 @@ export class StreamImpersonator extends Transform { private httpParser: HTTPParserJS; private upgrade = false; private getSaToken: GetSaToken; - private partialMessage: string = ""; + + // If the a chunk can't be parsed fully, unparsed bytes are saved to be passed + // when we receive the next chunk + private partialMessage: Buffer[] = []; constructor(getSaToken: GetSaToken) { super(); @@ -100,35 +104,32 @@ export class StreamImpersonator extends Transform { _transform( chunk: Buffer, - encoding: BufferEncoding, + _encoding: BufferEncoding, callback: TransformCallback, ): void { - const chunkStr = chunk.toString(); - if (this.upgrade) { this.push(chunk); return callback(); } - this.partialMessage += chunkStr; + this.partialMessage.push(chunk); const handleError = (err: Error) => { - this.partialMessage = ""; + this.partialMessage = []; logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err)); throw err; }; try { - const bytesParsed = this.httpParser.execute( - Buffer.from(this.partialMessage), - ); + const bufferToParse = Buffer.concat(this.partialMessage); + const bytesParsed = this.httpParser.execute(bufferToParse); if (bytesParsed instanceof Error) { return handleError(bytesParsed); } - this.partialMessage = this.partialMessage.slice(bytesParsed); + this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed); } catch (err) { return handleError(err as Error); } diff --git a/src/stream-utils.ts b/src/stream-utils.ts new file mode 100644 index 0000000..06d6d24 --- /dev/null +++ b/src/stream-utils.ts @@ -0,0 +1,27 @@ +/** + * Removes bytes from the beginning of the buffers array. + * If bytes to be removed is less than the buffer length, the bytes are removed from the start + * of the buffer. + * @param buffers Array of buffers to reduce + * @param bytes Number of bytes to remove. + * @returns + */ +export const removeBytesFromBuffersHead = (buffers: Buffer[], bytes: number) => { + let bytesToRemove = bytes; + + while (bytesToRemove > 0 && buffers.length > 0) { + const firstBuffer = buffers[0]; + + if (bytesToRemove < firstBuffer.length) { + // Remove 'bytesToRemove' from the start of the last buffer + buffers[0] = firstBuffer.subarray(bytesToRemove); + + bytesToRemove = 0; + } else { + buffers.shift(); + bytesToRemove -= firstBuffer.length; + } + } + + return buffers; +};