From 2b1b30376da70103428535d3803be4decfec1f9b Mon Sep 17 00:00:00 2001 From: Joe Date: Thu, 21 Mar 2024 20:28:53 -0700 Subject: [PATCH] gzip by stream, writes tests validating streaming unzip --- .gitignore | 3 +- src/cache/index.ts | 12 ---- src/upload/index.test.ts | 44 ++++++++++++++ src/upload/index.ts | 122 ++++++++++++++++++++++++--------------- 4 files changed, 122 insertions(+), 59 deletions(-) diff --git a/.gitignore b/.gitignore index a0a4fc3..ea866d9 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ node_modules/ *-cache* *testdb* *yarn-error.log -test/* \ No newline at end of file +test/* +blocks.gz \ No newline at end of file diff --git a/src/cache/index.ts b/src/cache/index.ts index 9b3cf2d..fc6fef5 100644 --- a/src/cache/index.ts +++ b/src/cache/index.ts @@ -95,18 +95,6 @@ export class LightBlockCache { return block.sequence; } - async getUploadHead(): Promise { - const head = await this.get("uploadHead"); - if (!head) return 0; - const block = await this.getBlockByHash(head.toString()); - if (!block) return 0; - return block.sequence; - } - - async putUploadHead(hash: string): Promise { - await this.put("uploadHead", hash); - } - async get(key: string): Promise { try { const data = await this.db.get(key); diff --git a/src/upload/index.test.ts b/src/upload/index.test.ts index 795b261..8104d4e 100644 --- a/src/upload/index.test.ts +++ b/src/upload/index.test.ts @@ -1,3 +1,8 @@ +import fs from "fs"; +import os from "os"; +import path from "path"; + +import { createGunzip } from "zlib"; import { lightBlockCache } from "../cache"; import { lightBlockUpload } from "./index"; import { @@ -5,6 +10,7 @@ import { ListObjectsV2Command, S3Client, } from "@aws-sdk/client-s3"; +import { LightBlock } from "../models/lightstreamer"; describe("LightBlockUpload", () => { beforeAll(() => { @@ -47,4 +53,42 @@ describe("LightBlockUpload", () => { { start: 1001, end: 2000 }, ]); }); + + it("should gzip blocks as expected", async () => { + const tempFile = path.join(os.tmpdir(), "test.gz"); + await lightBlockUpload.gzipBlocks(1, 0.001, tempFile); + + const gunzip = createGunzip(); + const inputFile = fs.createReadStream(tempFile); + + inputFile.pipe(gunzip); + + let leftover = Buffer.alloc(0); + gunzip.on("data", (chunk) => { + let data = Buffer.concat([leftover, chunk]); + + while (data.length >= 4) { + const blockLength = data.readUInt32BE(0); + if (data.length >= 4 + blockLength) { + const blockData = data.subarray(4, 4 + blockLength); + expect(() => LightBlock.decode(blockData)).not.toThrow(); + data = data.subarray(4 + blockLength); + } else { + break; + } + } + + leftover = data; + }); + + gunzip.on("end", () => { + expect(leftover.length).toBe(0); + }); + + gunzip.on("error", (err) => { + expect(() => { + throw new Error(err.message); + }).not.toThrow(); + }); + }); }); diff --git a/src/upload/index.ts b/src/upload/index.ts index 980bdcf..a8b9d09 100644 --- a/src/upload/index.ts +++ b/src/upload/index.ts @@ -1,4 +1,5 @@ -import { gzipSync } from "zlib"; +import fs from "fs"; +import { createGzip } from "zlib"; import { ListObjectsV2Command, PutObjectCommand, @@ -10,7 +11,7 @@ import { logger } from "@/utils/logger"; class UploadError extends Error {} -type BlockRange = { +export type BlockRange = { start: number; end: number; }; @@ -18,6 +19,7 @@ type BlockRange = { export class LightBlockUpload { private cache: LightBlockCache; private s3Client: S3Client; + private chunkSizeMb: number; private bucket: string; constructor(cache: LightBlockCache) { @@ -34,6 +36,13 @@ export class LightBlockUpload { if (!process.env["BUCKET_NAME"]) { throw new UploadError("BUCKET_NAME not set"); } + if (!process.env["UPLOAD_CHUNK_SIZE_MB"]) { + throw new UploadError("UPLOAD_CHUNK_SIZE_MB not set"); + } + this.chunkSizeMb = parseInt( + process.env["UPLOAD_CHUNK_SIZE_MB"] as string, + 10, + ); this.bucket = process.env["BUCKET_NAME"]; this.s3Client = new S3Client({ region: "auto", @@ -46,45 +55,84 @@ export class LightBlockUpload { } async upload(): Promise { + const existingUploads = await this.existingUploads(); + let maxUploaded = existingUploads.reduce((max, range) => { + return range.end > max ? range.end : max; + }, 0); + let head = await this.cache.getHeadSequence(); + if (!head) head = 0; + // eslint-disable-next-line no-constant-condition while (true) { - const existingUploads = await this.existingUploads(); - const blocks = await this.uploadManifest(existingUploads); - for (const block of blocks) { - logger.info(`Gzipping blocks ${block.start} to ${block.end}`); - const gzip = await this.gzipBlocks(block.start, block.end); - - logger.info(`Uploading blocks ${block.start} to ${block.end}`); - const key = this.uploadName(block); - await this.uploadBlocks(gzip, key); - const uploadHead = await this.cache.getBlockBySequence(block.end); - if (uploadHead) { - await this.cache.putUploadHead(uploadHead.hash); - } - } - await new Promise((resolve) => setTimeout(resolve, 10000)); + logger.info( + `Gzipping blocks for ${maxUploaded + 1} for ${ + this.chunkSizeMb + } MB upload`, + ); + const fileName = "blocks.gz"; + const range = await this.gzipBlocks( + maxUploaded + 1, + this.chunkSizeMb * 1024 * 1024, + fileName, + ); + + logger.info(`Uploading blocks ${range.start} to ${range.end}`); + const key = this.uploadName(range); + await this.uploadBlocks(fileName, key); + maxUploaded = range.end; } } - async gzipBlocks(start: number, end: number): Promise { - let data = ""; - for (let i = start; i <= end; i++) { - const block = await this.cache.getBlockBySequence(i); - if (block) { - data += - Buffer.from(LightBlock.encode(block).finish()).toString("hex") + "\n"; + async gzipBlocks( + start: number, + chunkSizeBytes: number, + outputFileName: string, + ): Promise { + const gzip = createGzip(); + const outputFile = fs.createWriteStream(outputFileName); + gzip.pipe(outputFile); + let i = start; + let block; + let warned = false; + + // eslint-disable-next-line no-constant-condition + while (true) { + block = await this.cache.getBlockBySequence(i); + if (block == null) { + if (warned) { + logger.warn( + `At end of chain at block ${i}, filling gzip for upload as blocks are added.`, + ); + warned = true; + } + await new Promise((resolve) => setTimeout(resolve, 60000)); // Wait for 1 minute + continue; } + const blockBuffer = LightBlock.encode(block).finish(); + const lengthBuffer = Buffer.alloc(4); + lengthBuffer.writeUInt32BE(blockBuffer.byteLength, 0); + gzip.write(lengthBuffer); + gzip.write(blockBuffer); + if (outputFile.bytesWritten >= chunkSizeBytes) { + break; + } + i++; } - return gzipSync(data); + gzip.end(); + await new Promise((resolve) => outputFile.on("finish", resolve)); + return { start, end: i }; } - async uploadBlocks(buffer: Buffer, key: string): Promise { + async uploadBlocks(fileName: string, key: string): Promise { + const fileStream = fs.createReadStream(fileName); + const fileSize = fs.statSync(fileName).size; + const command = new PutObjectCommand({ Bucket: this.bucket, ContentType: "application/gzip", - ContentLength: buffer.byteLength, + ContentLength: fileSize, Key: key, - Body: buffer, + Body: fileStream, }); await this.s3Client.send(command); @@ -115,24 +163,6 @@ export class LightBlockUpload { } throw new UploadError("Invalid upload name: " + uploadName); } - - async uploadManifest(existingUploads: BlockRange[]): Promise { - const head = await this.cache.getHeadSequence(); - if (!head) return []; - - const headBlockSequence = parseInt(head.toString()); - const backfillBlocks = []; - for (let start = 1; start <= headBlockSequence; start += 1000) { - if (headBlockSequence - start < 1000) { - continue; - } - const end = Math.min(start + 999, headBlockSequence); - if (!existingUploads.includes({ start, end })) { - backfillBlocks.push({ start, end }); - } - } - return backfillBlocks; - } } export const lightBlockUpload = new LightBlockUpload(lightBlockCache);