diff --git a/.gitignore b/.gitignore index ea866d9..00affc6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ node_modules/ *testdb* *yarn-error.log test/* -blocks.gz \ No newline at end of file +blocks +blocks.gz +blocks.manifest \ No newline at end of file diff --git a/blocks.manifest.gz b/blocks.manifest.gz new file mode 100644 index 0000000..36fa477 Binary files /dev/null and b/blocks.manifest.gz differ diff --git a/protos/lightstreamer.proto b/protos/lightstreamer.proto index da63a30..5165d95 100644 --- a/protos/lightstreamer.proto +++ b/protos/lightstreamer.proto @@ -5,7 +5,7 @@ message Empty {} message BlockID { optional uint64 sequence = 1; - optional string hash = 2; + optional bytes hash = 2; } // BlockRange specifies a series of blocks from start to end inclusive. @@ -18,8 +18,8 @@ message BlockRange { message LightBlock { uint32 protoVersion = 1; // the version of this wire format, for storage uint64 sequence = 2; // the height of this block - string hash = 3; // the ID (hash) of this block, same as explorer - string previousBlockHash = 4; // the ID (hash) of this block's predecessor + bytes hash = 3; // the ID (hash) of this block, same as explorer + bytes previousBlockHash = 4; // the ID (hash) of this block's predecessor uint32 timestamp = 5; // Unix epoch time when the block was mined repeated LightTransaction transactions = 6; // zero or more compact transactions from this block uint64 noteSize = 7; // the size of the notes tree after adding transactions from this block. @@ -28,7 +28,7 @@ message LightBlock { message LightTransaction { uint64 index = 1; // do we need this field? - string hash = 2; + bytes hash = 2; repeated LightSpend spends = 4; repeated LightOutput outputs = 5; @@ -36,20 +36,20 @@ message LightTransaction { message LightSpend { - string nf = 2; + bytes nf = 2; } message LightOutput { - string note = 1; // NoteEncrypted, serialized + bytes note = 1; // NoteEncrypted, serialized } message Transaction { // built, encrypted transaction - string data = 1; + bytes data = 1; } message SendResponse { - string hash = 1; + bytes hash = 1; bool accepted = 2; } diff --git a/src/server.test.ts b/src/server.test.ts index 2da00d6..b99aa82 100644 --- a/src/server.test.ts +++ b/src/server.test.ts @@ -9,15 +9,15 @@ import { LightBlock } from "./models/lightstreamer"; const expectedBlockObject = { protoVersion: expect.any(Number), sequence: 1, - hash: expect.any(String), - previousBlockHash: expect.any(String), + hash: expect.any(Buffer), + previousBlockHash: expect.any(Buffer), timestamp: expect.any(Number), transactions: expect.arrayContaining([ expect.objectContaining({ - hash: expect.any(String), + hash: expect.any(Buffer), outputs: expect.arrayContaining([ expect.objectContaining({ - note: expect.any(String), + note: expect.any(Buffer), }), ]), }), diff --git a/src/upload/index.test.ts b/src/upload/index.test.ts index 8104d4e..5b90bc7 100644 --- a/src/upload/index.test.ts +++ b/src/upload/index.test.ts @@ -1,68 +1,36 @@ import fs from "fs"; import os from "os"; import path from "path"; +import { open } from "fs/promises"; import { createGunzip } from "zlib"; import { lightBlockCache } from "../cache"; import { lightBlockUpload } from "./index"; -import { - PutObjectCommand, - ListObjectsV2Command, - S3Client, -} from "@aws-sdk/client-s3"; import { LightBlock } from "../models/lightstreamer"; +import { createInterface } from "readline"; describe("LightBlockUpload", () => { - beforeAll(() => { - jest.spyOn(S3Client.prototype, "send").mockImplementation((command) => { - if (command instanceof ListObjectsV2Command) { - return Promise.resolve({ - Contents: [ - { Key: lightBlockUpload.uploadName({ start: 1, end: 1000 }) }, - { Key: lightBlockUpload.uploadName({ start: 1001, end: 2000 }) }, - ], - }); - } else if (command instanceof PutObjectCommand) { - return Promise.resolve({ - /* your mock PutObjectCommand response */ - }); - } else { - throw new Error( - `Command mock not implemented: ${command.constructor.name}`, - ); - } - }); - }); - afterAll(async () => { jest.resetAllMocks(); await lightBlockCache.close(); }); - it("upload name creation should be reversible", () => { - const blockRange = { start: 1, end: 1000 }; - const key = lightBlockUpload.uploadName(blockRange); - const newBlockRange = lightBlockUpload.parseUploadName(key); - expect(blockRange).toEqual(newBlockRange); - }); - - it("existing uploads should return block ranges", async () => { - const ranges = await lightBlockUpload.existingUploads(); - expect(ranges).toEqual([ - { start: 1, end: 1000 }, - { start: 1001, end: 2000 }, - ]); - }); + it("should gzip blocks/manifest, last block should match", async () => { + const tempFile = path.join(os.tmpdir(), "test"); + const blockFile = await lightBlockUpload.createBlockFiles(tempFile, 0); - it("should gzip blocks as expected", async () => { - const tempFile = path.join(os.tmpdir(), "test.gz"); - await lightBlockUpload.gzipBlocks(1, 0.001, tempFile); + const tempGz = path.join(os.tmpdir(), "test.gz"); + const tempManifestGz = path.join(os.tmpdir(), "test.manifest.gz"); + await lightBlockUpload.gzipFile(blockFile.file, tempGz); + await lightBlockUpload.gzipFile(blockFile.manifest, tempManifestGz); const gunzip = createGunzip(); - const inputFile = fs.createReadStream(tempFile); - + const inputFile = fs.createReadStream(tempGz); inputFile.pipe(gunzip); + let lastBlock: LightBlock | undefined; + + // Verify unzipped data is correct let leftover = Buffer.alloc(0); gunzip.on("data", (chunk) => { let data = Buffer.concat([leftover, chunk]); @@ -71,7 +39,7 @@ describe("LightBlockUpload", () => { const blockLength = data.readUInt32BE(0); if (data.length >= 4 + blockLength) { const blockData = data.subarray(4, 4 + blockLength); - expect(() => LightBlock.decode(blockData)).not.toThrow(); + lastBlock = LightBlock.decode(blockData); data = data.subarray(4 + blockLength); } else { break; @@ -90,5 +58,38 @@ describe("LightBlockUpload", () => { throw new Error(err.message); }).not.toThrow(); }); + + // Now get blocks via the manifest and raw file + const tempGunzipped = path.join(os.tmpdir(), "test-gunzipped"); + let lastManifestLine: string | undefined; + await new Promise((resolve) => { + fs.createReadStream(tempManifestGz) + .pipe(createGunzip()) + .pipe(fs.createWriteStream(tempGunzipped)) + .on("finish", resolve); + }); + + const rl = createInterface({ + input: fs.createReadStream(tempGunzipped), + output: process.stdout, + terminal: false, + }); + + for await (const line of rl) { + lastManifestLine = line; + } + const splitLastLine = lastManifestLine!.split(","); + const byteStart = parseInt(splitLastLine![1]); + const byteEnd = parseInt(splitLastLine![2]); + + const fileDescriptor = await open(tempFile, "r"); + const buffer = Buffer.alloc(byteEnd - byteStart + 1); + await fileDescriptor.read(buffer, 0, byteEnd - byteStart + 1, byteStart); + console.log("biuff", buffer.toString("hex")); + const lastBlockManifest = LightBlock.decode(buffer); + + // verify block info gotten from binary/manifest is same as gzip + expect(lastBlock?.sequence).toEqual(lastBlockManifest?.sequence); + expect(lastBlock?.hash).toEqual(lastBlockManifest?.hash); }); }); diff --git a/src/upload/index.ts b/src/upload/index.ts index caf1a07..965af15 100644 --- a/src/upload/index.ts +++ b/src/upload/index.ts @@ -1,10 +1,12 @@ -import fs from "fs"; -import { createGzip } from "zlib"; import { - ListObjectsV2Command, + HeadObjectCommand, + NotFound, PutObjectCommand, S3Client, } from "@aws-sdk/client-s3"; +import * as zlib from "zlib"; +import fs from "fs"; + import { LightBlockCache, lightBlockCache } from "@/cache"; import { LightBlock } from "@/models/lightstreamer"; import { logger } from "@/utils/logger"; @@ -16,6 +18,11 @@ export type BlockRange = { end: number; }; +export type BlockFile = { + file: string; + manifest: string; +}; + export class LightBlockUpload { private cache: LightBlockCache; private s3Client: S3Client; @@ -55,113 +62,135 @@ export class LightBlockUpload { } async upload(): Promise { - const existingUploads = await this.existingUploads(); - let maxUploaded = existingUploads.reduce((max, range) => { - return range.end > max ? range.end : max; - }, 0); - let head = await this.cache.getHeadSequence(); - if (!head) head = 0; - // eslint-disable-next-line no-constant-condition - while (true) { - logger.info( - `Gzipping blocks for ${maxUploaded + 1} for ${ - this.chunkSizeMb - } MB upload`, - ); - const fileName = "blocks.gz"; - const range = await this.gzipBlocks( - maxUploaded + 1, - this.chunkSizeMb * 1024 * 1024, - fileName, - ); + const fileName = "blocks"; + const currentUploadSize = (await this.getFileSize(fileName)) || 0; + logger.info( + `Current file uploaded size: ${currentUploadSize}, creating new upload...`, + ); + const files = await this.createBlockFiles(fileName, currentUploadSize); - logger.info(`Uploading blocks ${range.start} to ${range.end}`); - const key = this.uploadName(range); - await this.uploadBlocks(fileName, key); - maxUploaded = range.end; - } + logger.info(`Upload: begin...`); + const gzip = await this.gzipFile(files.file, `${files.file}.gz`); + await this.uploadFile(gzip, "application/gzip"); + logger.info(`Upload: gzip file complete`); + + await this.uploadFile(files.file, "application/octet-stream"); + logger.info(`Upload: binary file complete`); + + const gzipManifest = await this.gzipFile( + files.manifest, + `${files.manifest}.gz`, + ); + await this.uploadFile(gzipManifest, "application/gzip"); + logger.info(`Upload: manifest file complete`); + + await this.upload(); } - async gzipBlocks( - start: number, - chunkSizeBytes: number, + async createBlockFiles( outputFileName: string, - ): Promise { - const gzip = createGzip(); + previousSize: number, + ): Promise { + this.deleteFileIfExists(outputFileName); + const manifestFileName = `${outputFileName}.manifest`; + this.deleteFileIfExists(manifestFileName); + + let i = 1; + let currentByte = 0; const outputFile = fs.createWriteStream(outputFileName); - gzip.pipe(outputFile); - let i = start; - let block; - let warned = false; + const manifestFile = fs.createWriteStream(manifestFileName); // eslint-disable-next-line no-constant-condition while (true) { - block = await this.cache.getBlockBySequence(i); + const block = await this.cache.getBlockBySequence(i); + if (block == null && previousSize === 0) break; if (block == null) { - if (!warned) { - logger.warn( - `At end of chain at block ${i}, filling gzip for upload as blocks are added.`, - ); - warned = true; - } - await new Promise((resolve) => setTimeout(resolve, 60000)); // Wait for 1 minute + await this.waitForNextBlock(); continue; } + const blockBuffer = LightBlock.encode(block).finish(); - const lengthBuffer = Buffer.alloc(4); - lengthBuffer.writeUInt32BE(blockBuffer.byteLength, 0); - gzip.write(lengthBuffer); - gzip.write(blockBuffer); - if (outputFile.bytesWritten >= chunkSizeBytes) { + outputFile.write(blockBuffer); + manifestFile.write( + `${i},${currentByte},${currentByte + blockBuffer.byteLength - 1}\n`, + ); + currentByte += blockBuffer.byteLength; + + if ( + !!previousSize && + outputFile.bytesWritten >= + Math.max(this.chunkSizeMb * 1024 * 1024 + previousSize) + ) { break; } i++; } - gzip.end(); - await new Promise((resolve) => outputFile.on("finish", resolve)); - return { start, end: i }; + + outputFile.end(); + manifestFile.end(); + + logger.info( + `New file upload created, size ${ + outputFile.bytesWritten / 1024 / 1024 + } MB, blocks: ${i - 1}`, + ); + return { file: outputFileName, manifest: manifestFileName }; } - async uploadBlocks(fileName: string, key: string): Promise { + private deleteFileIfExists(fileName: string): void { + if (fs.existsSync(fileName)) { + fs.unlinkSync(fileName); + } + } + + private async waitForNextBlock(): Promise { + await new Promise((resolve) => setTimeout(resolve, 60000)); // Wait for 1 minute + } + + async gzipFile(inputFile: string, outputFile: string): Promise { + const writeStream = fs.createWriteStream(outputFile); + const readStream = fs.createReadStream(inputFile); + const gzip = zlib.createGzip(); + readStream.pipe(gzip).pipe(writeStream); + + await new Promise((resolve, reject) => { + writeStream.on("finish", resolve); + writeStream.on("error", reject); + }); + logger.info(`Gzipping file complete: ${outputFile}`); + return outputFile; + } + + async uploadFile(fileName: string, contentType: string): Promise { + // due to consistentcy model of S3, should be safe to overwrite upload const fileStream = fs.createReadStream(fileName); const fileSize = fs.statSync(fileName).size; - const command = new PutObjectCommand({ Bucket: this.bucket, - ContentType: "application/gzip", + ContentType: contentType, ContentLength: fileSize, - Key: key, + Key: fileName, Body: fileStream, }); - await this.s3Client.send(command); } - async existingUploads(): Promise { - const { Contents } = await this.s3Client.send( - new ListObjectsV2Command({ Bucket: this.bucket }), - ); - - if (!Contents) return []; - const keys = Contents.map((item) => item.Key).filter(Boolean) as string[]; - return keys.map((key) => { - return this.parseUploadName(key); + async getFileSize(key: string): Promise { + const command = new HeadObjectCommand({ + Bucket: this.bucket, + Key: key, }); - } - uploadName(range: BlockRange): string { - return `blocks_${range.start.toString().padStart(10, "0")}_${range.end - .toString() - .padStart(10, "0")}.gz`; - } - - parseUploadName(uploadName: string): BlockRange { - const match = uploadName.match(/blocks_(\d+)_(\d+)\.gz/); - if (match) { - return { start: parseInt(match[1], 10), end: parseInt(match[2], 10) }; + try { + const { ContentLength } = await this.s3Client.send(command); + return ContentLength || null; + } catch (error) { + if (error instanceof NotFound) { + return null; + } + throw error; } - throw new UploadError("Invalid upload name: " + uploadName); } } diff --git a/src/utils/lightBlock.ts b/src/utils/lightBlock.ts index fc9fe23..4baf1c3 100644 --- a/src/utils/lightBlock.ts +++ b/src/utils/lightBlock.ts @@ -31,14 +31,14 @@ export function lightBlock( serialized = rpcTransaction.serialized; const transaction = new Transaction(Buffer.from(serialized, "hex")); for (const spend of transaction.spends) { - lightSpends.push({ nf: spend.nullifier.toString("hex") }); + lightSpends.push({ nf: spend.nullifier }); } for (const note of transaction.notes) { - lightOutputs.push({ note: note.serialize().toString("hex") }); + lightOutputs.push({ note: note.serialize() }); } lightTransactions.push({ index, - hash: rpcTransaction.hash, + hash: Buffer.from(rpcTransaction.hash, "hex"), spends: lightSpends, outputs: lightOutputs, }); @@ -46,8 +46,8 @@ export function lightBlock( return { protoVersion: 1, sequence: response.block.sequence, - hash: response.block.hash, - previousBlockHash: previousBlockHash, + hash: Buffer.from(response.block.hash, "hex"), + previousBlockHash: Buffer.from(previousBlockHash, "hex"), timestamp: response.block.timestamp, transactions: lightTransactions, noteSize: response.block.noteSize,