diff --git a/src/server.ts b/src/server.ts index 8593954..df80937 100644 --- a/src/server.ts +++ b/src/server.ts @@ -17,7 +17,7 @@ if (process.env["BUILD_CACHE"] === "true") { if (process.env["UPLOAD_BLOCKS"] === "true") { logger.info("Starting uploader..."); - void lightBlockUpload.watchAndUpload(); + void lightBlockUpload.upload(); } export const app = express(); diff --git a/src/upload/index.test.ts b/src/upload/index.test.ts index 5e8b707..795b261 100644 --- a/src/upload/index.test.ts +++ b/src/upload/index.test.ts @@ -1,70 +1,50 @@ -import { LightBlockUpload } from "./index"; +import { lightBlockCache } from "../cache"; +import { lightBlockUpload } from "./index"; import { - S3Client, PutObjectCommand, ListObjectsV2Command, + S3Client, } from "@aws-sdk/client-s3"; -import { LightBlockCache } from "../cache"; -import { LightBlock } from "../models/lightstreamer"; - -jest.mock("@aws-sdk/client-s3", () => { - return { - S3Client: jest.fn().mockImplementation(() => { - return { - send: jest.fn().mockImplementation(() => { - return { Contents: [] }; - }), - }; - }), - ListObjectsV2Command: jest.fn().mockImplementation(() => { - return {}; - }), - PutObjectCommand: jest.fn().mockImplementation(() => { - return {}; - }), - }; -}); describe("LightBlockUpload", () => { - let lightBlockUpload: LightBlockUpload; - let mockCache: jest.Mocked; - let mockS3Client: jest.Mocked; - beforeAll(() => { - mockCache = new LightBlockCache() as jest.Mocked; - mockS3Client = new S3Client({}) as jest.Mocked; - lightBlockUpload = new LightBlockUpload(mockCache); + jest.spyOn(S3Client.prototype, "send").mockImplementation((command) => { + if (command instanceof ListObjectsV2Command) { + return Promise.resolve({ + Contents: [ + { Key: lightBlockUpload.uploadName({ start: 1, end: 1000 }) }, + { Key: lightBlockUpload.uploadName({ start: 1001, end: 2000 }) }, + ], + }); + } else if (command instanceof PutObjectCommand) { + return Promise.resolve({ + /* your mock PutObjectCommand response */ + }); + } else { + throw new Error( + `Command mock not implemented: ${command.constructor.name}`, + ); + } + }); }); - afterAll(() => { + afterAll(async () => { jest.resetAllMocks(); + await lightBlockCache.close(); }); - it("should throw an error if environment variables are not set", () => { - delete process.env["BUCKET_ENDPOINT"]; - expect(() => new LightBlockUpload(mockCache)).toThrow(); + it("upload name creation should be reversible", () => { + const blockRange = { start: 1, end: 1000 }; + const key = lightBlockUpload.uploadName(blockRange); + const newBlockRange = lightBlockUpload.parseUploadName(key); + expect(blockRange).toEqual(newBlockRange); }); - it("should upload blocks", async () => { - const mockBlock = LightBlock.fromJSON({ - sequence: 1000, - hash: "test-hash", - previousBlockHash: "test-previous-hash", - timestamp: 123456789, - transactions: [], - noteSize: 0, - }); - mockCache.getBlockBySequence.mockResolvedValue(mockBlock); - mockCache.getHeadSequence.mockResolvedValue(1001); - mockCache.getUploadHead.mockResolvedValue(0); - - const mockSend = jest.fn(); - mockS3Client.send = mockSend; - - await lightBlockUpload.watchAndUpload(); - - expect(mockSend).toHaveBeenCalledWith(expect.any(ListObjectsV2Command)); - expect(mockSend).toHaveBeenCalledWith(expect.any(PutObjectCommand)); - expect(mockCache.putUploadHead).toHaveBeenCalledWith("test-hash"); + it("existing uploads should return block ranges", async () => { + const ranges = await lightBlockUpload.existingUploads(); + expect(ranges).toEqual([ + { start: 1, end: 1000 }, + { start: 1001, end: 2000 }, + ]); }); }); diff --git a/src/upload/index.ts b/src/upload/index.ts index f49a6cb..980bdcf 100644 --- a/src/upload/index.ts +++ b/src/upload/index.ts @@ -8,7 +8,7 @@ import { LightBlockCache, lightBlockCache } from "@/cache"; import { LightBlock } from "@/models/lightstreamer"; import { logger } from "@/utils/logger"; -class UploaderError extends Error {} +class UploadError extends Error {} type BlockRange = { start: number; @@ -23,16 +23,16 @@ export class LightBlockUpload { constructor(cache: LightBlockCache) { this.cache = cache; if (!process.env["BUCKET_ENDPOINT"]) { - throw new UploaderError("BUCKET_ENDPOINT not set"); + throw new UploadError("BUCKET_ENDPOINT not set"); } if (!process.env["BUCKET_ACCESS_KEY_ID"]) { - throw new UploaderError("BUCKET_ACCESS_KEY_ID not set"); + throw new UploadError("BUCKET_ACCESS_KEY_ID not set"); } if (!process.env["BUCKET_SECRET_ACCESS_KEY"]) { - throw new UploaderError("BUCKET_SECRET_ACCESS_KEY not set"); + throw new UploadError("BUCKET_SECRET_ACCESS_KEY not set"); } if (!process.env["BUCKET_NAME"]) { - throw new UploaderError("BUCKET_NAME not set"); + throw new UploadError("BUCKET_NAME not set"); } this.bucket = process.env["BUCKET_NAME"]; this.s3Client = new S3Client({ @@ -45,16 +45,17 @@ export class LightBlockUpload { }); } - async watchAndUpload(): Promise { + async upload(): Promise { // eslint-disable-next-line no-constant-condition while (true) { - const blocks = await this.blocksToUpload(); + const existingUploads = await this.existingUploads(); + const blocks = await this.uploadManifest(existingUploads); for (const block of blocks) { logger.info(`Gzipping blocks ${block.start} to ${block.end}`); const gzip = await this.gzipBlocks(block.start, block.end); logger.info(`Uploading blocks ${block.start} to ${block.end}`); - const key = this.uploadName(block.start, block.end); + const key = this.uploadName(block); await this.uploadBlocks(gzip, key); const uploadHead = await this.cache.getBlockBySequence(block.end); if (uploadHead) { @@ -65,7 +66,7 @@ export class LightBlockUpload { } } - private async gzipBlocks(start: number, end: number): Promise { + async gzipBlocks(start: number, end: number): Promise { let data = ""; for (let i = start; i <= end; i++) { const block = await this.cache.getBlockBySequence(i); @@ -74,11 +75,10 @@ export class LightBlockUpload { Buffer.from(LightBlock.encode(block).finish()).toString("hex") + "\n"; } } - return gzipSync(data); } - private async uploadBlocks(buffer: Buffer, key: string): Promise { + async uploadBlocks(buffer: Buffer, key: string): Promise { const command = new PutObjectCommand({ Bucket: this.bucket, ContentType: "application/gzip", @@ -90,33 +90,45 @@ export class LightBlockUpload { await this.s3Client.send(command); } - private uploadName(start: number, end: number): string { - return `blocks_${start.toString().padStart(10, "0")}_${end + async existingUploads(): Promise { + const { Contents } = await this.s3Client.send( + new ListObjectsV2Command({ Bucket: this.bucket }), + ); + + if (!Contents) return []; + const keys = Contents.map((item) => item.Key).filter(Boolean) as string[]; + return keys.map((key) => { + return this.parseUploadName(key); + }); + } + + uploadName(range: BlockRange): string { + return `blocks_${range.start.toString().padStart(10, "0")}_${range.end .toString() .padStart(10, "0")}.gz`; } - private async blocksToUpload(): Promise { + parseUploadName(uploadName: string): BlockRange { + const match = uploadName.match(/blocks_(\d+)_(\d+)\.gz/); + if (match) { + return { start: parseInt(match[1], 10), end: parseInt(match[2], 10) }; + } + throw new UploadError("Invalid upload name: " + uploadName); + } + + async uploadManifest(existingUploads: BlockRange[]): Promise { const head = await this.cache.getHeadSequence(); if (!head) return []; const headBlockSequence = parseInt(head.toString()); - - const { Contents } = await this.s3Client.send( - new ListObjectsV2Command({ Bucket: this.bucket }), - ); - - if (!Contents) return []; - const keys = Contents.map((item) => item.Key).filter(Boolean) as string[]; const backfillBlocks = []; - for (let i = 0; i <= headBlockSequence; i += 1000) { - if (headBlockSequence - i < 1000) { + for (let start = 1; start <= headBlockSequence; start += 1000) { + if (headBlockSequence - start < 1000) { continue; } - const end = Math.min(i + 999, headBlockSequence); - const key = this.uploadName(i, end); - if (!keys.includes(key)) { - backfillBlocks.push({ start: i, end }); + const end = Math.min(start + 999, headBlockSequence); + if (!existingUploads.includes({ start, end })) { + backfillBlocks.push({ start, end }); } } return backfillBlocks; diff --git a/test/cache/leveldb/000116.ldb b/test/cache/leveldb/000116.ldb new file mode 100644 index 0000000..fd24aaf Binary files /dev/null and b/test/cache/leveldb/000116.ldb differ diff --git a/test/cache/leveldb/000120.ldb b/test/cache/leveldb/000120.ldb new file mode 100644 index 0000000..47ec487 Binary files /dev/null and b/test/cache/leveldb/000120.ldb differ diff --git a/test/cache/leveldb/000123.log b/test/cache/leveldb/000123.log new file mode 100644 index 0000000..0461cb7 Binary files /dev/null and b/test/cache/leveldb/000123.log differ diff --git a/test/cache/leveldb/CURRENT b/test/cache/leveldb/CURRENT index 61135ee..d9766b1 100644 --- a/test/cache/leveldb/CURRENT +++ b/test/cache/leveldb/CURRENT @@ -1 +1 @@ -MANIFEST-000231 +MANIFEST-000122 diff --git a/test/cache/leveldb/LOG b/test/cache/leveldb/LOG index e69de29..6d8d746 100644 --- a/test/cache/leveldb/LOG +++ b/test/cache/leveldb/LOG @@ -0,0 +1,3 @@ +2024/03/19-09:37:17.297631 173e2b000 Recovering log #121 +2024/03/19-09:37:17.298491 173e2b000 Delete type=3 #119 +2024/03/19-09:37:17.298537 173e2b000 Delete type=0 #121 diff --git a/test/cache/leveldb/LOG.old b/test/cache/leveldb/LOG.old index 9b28481..f8abcef 100644 --- a/test/cache/leveldb/LOG.old +++ b/test/cache/leveldb/LOG.old @@ -1,3 +1,5 @@ -2024/03/20-17:13:07.794174 171fcb000 Recovering log #230 -2024/03/20-17:13:07.795044 171fcb000 Delete type=3 #229 -2024/03/20-17:13:07.795090 171fcb000 Delete type=0 #230 +2024/03/19-09:37:16.985331 172e13000 Recovering log #118 +2024/03/19-09:37:16.985449 172e13000 Level-0 table #120: started +2024/03/19-09:37:16.985657 172e13000 Level-0 table #120: 1766 bytes OK +2024/03/19-09:37:16.986076 172e13000 Delete type=3 #117 +2024/03/19-09:37:16.986118 172e13000 Delete type=0 #118 diff --git a/test/cache/leveldb/MANIFEST-000122 b/test/cache/leveldb/MANIFEST-000122 new file mode 100644 index 0000000..e16ab56 Binary files /dev/null and b/test/cache/leveldb/MANIFEST-000122 differ