Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: writing byte one at a time #320

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/__tests__/__snapshots__/stream-impersonator.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ impersonate-user: johndoe
BarÄö"
`;

exports[`StreamImpersonator handles binary data in body one byte at a time 1`] = `
"POST / HTTP/1.1
accept: application/json
content-type: application/octet-stream
content-length: 4
authorization: Bearer service-account-token
impersonate-user: johndoe

Äö"
`;

exports[`StreamImpersonator handles body separator splitted to separate chunks 1`] = `
"GET / HTTP/1.1
accept: application/json
Expand Down
56 changes: 56 additions & 0 deletions src/__tests__/stream-impersonator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,62 @@ MwIDAQAB
expect(destination.buffer.includes(Buffer.from([0x01, 0x02, 0x03, 0x42, 0x61, 0x72, 0xC3, 0x84, 0xC3, 0xB6]))).toBe(true);
});

it("handles parse errors", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

stream.pipe(parser).pipe(destination);

// Writing the headers as text
stream.write("POST ");
stream.write("/");
stream.write(" HTTP/1.1");

expect(() => {
stream.write("\r\r\n\n");
}).toThrowError("Parse Error");
});

it("handles binary data in body one byte at a time", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign(
{
exp: Math.floor(Date.now() / 1000) + 60 * 60,
sub: "johndoe",
aud: [boredServer],
},
jwtPrivateKey,
{ algorithm: "RS256" }
);

stream.pipe(parser).pipe(destination);

// Writing the headers as text
stream.write("POST ");
stream.write("/");
stream.write(" HTTP/1.1");
stream.write("\r\n");
stream.write("Accept: application/json\r\n");
stream.write(`Authorization: Bearer ${token}\r\n`);
stream.write("Content-Type: application/octet-stream\r\nContent-Length: 4\r\n\r\n");

// Writing binary data as four Buffer chunks
// UTF8: Äö
stream.write(Buffer.from([0xC3]));
await delay(1);
stream.write(Buffer.from([0x84]));
await delay(1);
stream.write(Buffer.from([0xC3]));
await delay(1);
stream.write(Buffer.from([0xB6]));

expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain("Äö");
expect(destination.buffer.includes(Buffer.from([0xC3, 0x84, 0xC3, 0xB6]))).toBe(true);
});

it ("handles headers in one chunk, body in another", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;
Expand Down
114 changes: 114 additions & 0 deletions src/__tests__/stream-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { removeBytesFromBuffersHead } from "../stream-utils";

describe("stream-utils", () => {
describe("removeBytesFromBuffersHead", () => {
describe("no buffers", () => {
describe("zero bytes to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], 0)).toEqual([]);
});
});

describe("one byte to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], 0)).toEqual([]);
});
});

describe("minutes 1 byte to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], -1)).toEqual([]);
});
});
});

describe("one buffer", () => {
describe("has one byte", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01]);

expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]);
});
});

describe("has one byte and removes 2", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01]);

expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([]);
});
});

describe("has empty buffer", () => {
it("is removed completely", () => {
const buffer = Buffer.from([]);

expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]);
});
});

describe("bytes to remove matches last buffer length", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01, 0x02, 0x03]);

expect(removeBytesFromBuffersHead([buffer], 3)).toEqual([]);
});
});

describe("bytes to remove doesn't match last buffer length", () => {
it("is removed partially", () => {
const buffer = Buffer.from([0x01, 0x02, 0x03]);

expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([Buffer.from([0x03])]);
});
});
});

describe("many buffers", () => {
describe("bytes to remove matches last buffer length", () => {
it("first buffer is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 3)).toEqual([buffer2]);
});
});

describe("bytes to remove is less than last buffer length", () => {
it("is removed partially", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 2)).toEqual([ Buffer.from([0x03]), buffer2]);
});
});

describe("bytes to remove is more than last buffer length", () => {
it("is removed partially", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 4)).toEqual([Buffer.from([0x05, 0x06])]);
});
});

describe("bytes to remove is equal to sum of all buffer lengths", () => {
it("is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 6)).toEqual([]);
});
});

describe("bytes to remove is more than the sum of all buffer lengths", () => {
it("is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 9999)).toEqual([]);
});
});
});
});
});
21 changes: 11 additions & 10 deletions src/stream-impersonator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { HTTPParser, HTTPParserJS } from "http-parser-js";
import { chunk } from "lodash";
import * as jwt from "jsonwebtoken";
import logger from "./logger";
import { removeBytesFromBuffersHead } from "./stream-utils";

type TokenPayload = {
exp: number;
Expand All @@ -23,7 +24,10 @@ export class StreamImpersonator extends Transform {
private httpParser: HTTPParserJS;
private upgrade = false;
private getSaToken: GetSaToken;
private partialMessage: string = "";

// If the a chunk can't be parsed fully, unparsed bytes are saved to be passed
// when we receive the next chunk
private partialMessage: Buffer[] = [];

constructor(getSaToken: GetSaToken) {
super();
Expand Down Expand Up @@ -100,35 +104,32 @@ export class StreamImpersonator extends Transform {

_transform(
chunk: Buffer,
encoding: BufferEncoding,
_encoding: BufferEncoding,
callback: TransformCallback,
): void {
const chunkStr = chunk.toString();

if (this.upgrade) {
this.push(chunk);

return callback();
}

this.partialMessage += chunkStr;
this.partialMessage.push(chunk);

const handleError = (err: Error) => {
this.partialMessage = "";
this.partialMessage = [];
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err));
throw err;
};

try {
const bytesParsed = this.httpParser.execute(
Buffer.from(this.partialMessage),
);
const bufferToParse = Buffer.concat(this.partialMessage);
const bytesParsed = this.httpParser.execute(bufferToParse);

if (bytesParsed instanceof Error) {
return handleError(bytesParsed);
}

this.partialMessage = this.partialMessage.slice(bytesParsed);
this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed);
} catch (err) {
return handleError(err as Error);
}
Expand Down
27 changes: 27 additions & 0 deletions src/stream-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Removes bytes from the beginning of the buffers array.
* If bytes to be removed is less than the buffer length, the bytes are removed from the start
* of the buffer.
* @param buffers Array of buffers to reduce
* @param bytes Number of bytes to remove.
* @returns
*/
export const removeBytesFromBuffersHead = (buffers: Buffer[], bytes: number) => {
let bytesToRemove = bytes;

while (bytesToRemove > 0 && buffers.length > 0) {
const firstBuffer = buffers[0];

if (bytesToRemove < firstBuffer.length) {
// Remove 'bytesToRemove' from the start of the last buffer
buffers[0] = firstBuffer.subarray(bytesToRemove);

bytesToRemove = 0;
} else {
buffers.shift();
bytesToRemove -= firstBuffer.length;
}
}

return buffers;
};
Loading