Skip to content

Commit

Permalink
gzip by stream, writes tests validating streaming unzip
Browse files Browse the repository at this point in the history
  • Loading branch information
jowparks committed Mar 22, 2024
1 parent 3f5ed8a commit 2b1b303
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 59 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ node_modules/
*-cache*
*testdb*
*yarn-error.log
test/*
test/*
blocks.gz
12 changes: 0 additions & 12 deletions src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ export class LightBlockCache {
return block.sequence;
}

async getUploadHead(): Promise<number> {
const head = await this.get("uploadHead");
if (!head) return 0;
const block = await this.getBlockByHash(head.toString());
if (!block) return 0;
return block.sequence;
}

async putUploadHead(hash: string): Promise<void> {
await this.put("uploadHead", hash);
}

async get(key: string): Promise<Uint8Array | null> {
try {
const data = await this.db.get(key);
Expand Down
44 changes: 44 additions & 0 deletions src/upload/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import fs from "fs";
import os from "os";
import path from "path";

import { createGunzip } from "zlib";
import { lightBlockCache } from "../cache";
import { lightBlockUpload } from "./index";
import {
PutObjectCommand,
ListObjectsV2Command,
S3Client,
} from "@aws-sdk/client-s3";
import { LightBlock } from "../models/lightstreamer";

describe("LightBlockUpload", () => {
beforeAll(() => {
Expand Down Expand Up @@ -47,4 +53,42 @@ describe("LightBlockUpload", () => {
{ start: 1001, end: 2000 },
]);
});

it("should gzip blocks as expected", async () => {
const tempFile = path.join(os.tmpdir(), "test.gz");
await lightBlockUpload.gzipBlocks(1, 0.001, tempFile);

const gunzip = createGunzip();
const inputFile = fs.createReadStream(tempFile);

inputFile.pipe(gunzip);

let leftover = Buffer.alloc(0);
gunzip.on("data", (chunk) => {
let data = Buffer.concat([leftover, chunk]);

while (data.length >= 4) {
const blockLength = data.readUInt32BE(0);
if (data.length >= 4 + blockLength) {
const blockData = data.subarray(4, 4 + blockLength);
expect(() => LightBlock.decode(blockData)).not.toThrow();
data = data.subarray(4 + blockLength);
} else {
break;
}
}

leftover = data;
});

gunzip.on("end", () => {
expect(leftover.length).toBe(0);
});

gunzip.on("error", (err) => {
expect(() => {
throw new Error(err.message);
}).not.toThrow();
});
});
});
122 changes: 76 additions & 46 deletions src/upload/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { gzipSync } from "zlib";
import fs from "fs";
import { createGzip } from "zlib";
import {
ListObjectsV2Command,
PutObjectCommand,
Expand All @@ -10,14 +11,15 @@ import { logger } from "@/utils/logger";

class UploadError extends Error {}

type BlockRange = {
export type BlockRange = {
start: number;
end: number;
};

export class LightBlockUpload {
private cache: LightBlockCache;
private s3Client: S3Client;
private chunkSizeMb: number;
private bucket: string;

constructor(cache: LightBlockCache) {
Expand All @@ -34,6 +36,13 @@ export class LightBlockUpload {
if (!process.env["BUCKET_NAME"]) {
throw new UploadError("BUCKET_NAME not set");
}
if (!process.env["UPLOAD_CHUNK_SIZE_MB"]) {
throw new UploadError("UPLOAD_CHUNK_SIZE_MB not set");
}
this.chunkSizeMb = parseInt(
process.env["UPLOAD_CHUNK_SIZE_MB"] as string,
10,
);
this.bucket = process.env["BUCKET_NAME"];
this.s3Client = new S3Client({
region: "auto",
Expand All @@ -46,45 +55,84 @@ export class LightBlockUpload {
}

async upload(): Promise<void> {
const existingUploads = await this.existingUploads();
let maxUploaded = existingUploads.reduce((max, range) => {
return range.end > max ? range.end : max;
}, 0);
let head = await this.cache.getHeadSequence();
if (!head) head = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
const existingUploads = await this.existingUploads();
const blocks = await this.uploadManifest(existingUploads);
for (const block of blocks) {
logger.info(`Gzipping blocks ${block.start} to ${block.end}`);
const gzip = await this.gzipBlocks(block.start, block.end);

logger.info(`Uploading blocks ${block.start} to ${block.end}`);
const key = this.uploadName(block);
await this.uploadBlocks(gzip, key);
const uploadHead = await this.cache.getBlockBySequence(block.end);
if (uploadHead) {
await this.cache.putUploadHead(uploadHead.hash);
}
}
await new Promise((resolve) => setTimeout(resolve, 10000));
logger.info(
`Gzipping blocks for ${maxUploaded + 1} for ${
this.chunkSizeMb
} MB upload`,
);
const fileName = "blocks.gz";
const range = await this.gzipBlocks(
maxUploaded + 1,
this.chunkSizeMb * 1024 * 1024,
fileName,
);

logger.info(`Uploading blocks ${range.start} to ${range.end}`);
const key = this.uploadName(range);
await this.uploadBlocks(fileName, key);
maxUploaded = range.end;
}
}

async gzipBlocks(start: number, end: number): Promise<Buffer> {
let data = "";
for (let i = start; i <= end; i++) {
const block = await this.cache.getBlockBySequence(i);
if (block) {
data +=
Buffer.from(LightBlock.encode(block).finish()).toString("hex") + "\n";
async gzipBlocks(
start: number,
chunkSizeBytes: number,
outputFileName: string,
): Promise<BlockRange> {
const gzip = createGzip();
const outputFile = fs.createWriteStream(outputFileName);
gzip.pipe(outputFile);
let i = start;
let block;
let warned = false;

// eslint-disable-next-line no-constant-condition
while (true) {
block = await this.cache.getBlockBySequence(i);
if (block == null) {
if (warned) {
logger.warn(
`At end of chain at block ${i}, filling gzip for upload as blocks are added.`,
);
warned = true;
}
await new Promise((resolve) => setTimeout(resolve, 60000)); // Wait for 1 minute
continue;
}
const blockBuffer = LightBlock.encode(block).finish();
const lengthBuffer = Buffer.alloc(4);
lengthBuffer.writeUInt32BE(blockBuffer.byteLength, 0);
gzip.write(lengthBuffer);
gzip.write(blockBuffer);
if (outputFile.bytesWritten >= chunkSizeBytes) {
break;
}
i++;
}
return gzipSync(data);
gzip.end();
await new Promise((resolve) => outputFile.on("finish", resolve));
return { start, end: i };
}

async uploadBlocks(buffer: Buffer, key: string): Promise<void> {
async uploadBlocks(fileName: string, key: string): Promise<void> {
const fileStream = fs.createReadStream(fileName);
const fileSize = fs.statSync(fileName).size;

const command = new PutObjectCommand({
Bucket: this.bucket,
ContentType: "application/gzip",
ContentLength: buffer.byteLength,
ContentLength: fileSize,
Key: key,
Body: buffer,
Body: fileStream,
});

await this.s3Client.send(command);
Expand Down Expand Up @@ -115,24 +163,6 @@ export class LightBlockUpload {
}
throw new UploadError("Invalid upload name: " + uploadName);
}

async uploadManifest(existingUploads: BlockRange[]): Promise<BlockRange[]> {
const head = await this.cache.getHeadSequence();
if (!head) return [];

const headBlockSequence = parseInt(head.toString());
const backfillBlocks = [];
for (let start = 1; start <= headBlockSequence; start += 1000) {
if (headBlockSequence - start < 1000) {
continue;
}
const end = Math.min(start + 999, headBlockSequence);
if (!existingUploads.includes({ start, end })) {
backfillBlocks.push({ start, end });
}
}
return backfillBlocks;
}
}

export const lightBlockUpload = new LightBlockUpload(lightBlockCache);

0 comments on commit 2b1b303

Please sign in to comment.