Skip to content

Commit

Permalink
updating upload mechanism to split functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
jowparks committed Mar 21, 2024
1 parent 3c23bab commit d7347f6
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 100 deletions.
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ module.exports = {
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/src/$1",
},
setupFiles: ["<rootDir>/jest.setup.js"],
};
1 change: 1 addition & 0 deletions jest.setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require("dotenv").config({ path: ".env.test" });
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"scripts": {
"dev": "nodemon --config ./nodemon.json --exec 'yarn build && yarn start'",
"lint": "eslint --ext .ts .",
"test": "yarn build && NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" CACHE_PATH='test/cache' BUILD_CACHE=false jest src/* --forceExit --runInBand",
"test": "yarn build && CACHE_PATH='test/cache' UPLOAD_BLOCKS=false BUILD_CACHE=false jest src/* --forceExit --runInBand",
"format": "prettier . --write",
"prebuild": "node bin/proto && rimraf dist",
"tsoa-gen": "tsoa spec-and-routes",
Expand Down
11 changes: 0 additions & 11 deletions src/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@ afterAll(async () => {
server.close();
});

describe("GET /latest-block", () => {
it("should return the latest block successfully", async () => {
const response = await request(app).get("/latest-block");
expect(response.statusCode).toBe(200);
expect(response.body).toMatchObject({
sequence: expect.any(Number),
hash: expect.any(String),
});
});
});

describe("GET /block", () => {
it("should return the correct block for a given identifier", async () => {
// Assuming an identifier and corresponding block fixture exist
Expand Down
118 changes: 61 additions & 57 deletions src/upload/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,70 @@
// import { LightBlockUpload } from './index';
// import { S3Client, PutObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3";
// import { LightBlockCache } from "../cache";
// import { LightBlock } from "../models/lightstreamer";
import { LightBlockUpload } from "./index";
import {
S3Client,
PutObjectCommand,
ListObjectsV2Command,
} from "@aws-sdk/client-s3";
import { LightBlockCache } from "../cache";
import { LightBlock } from "../models/lightstreamer";

// jest.mock("@aws-sdk/client-s3", () => {
// return {
// S3Client: jest.fn().mockImplementation(() => {
// return {
// send: jest.fn()
// };
// })
// };
// });
// jest.mock("../cache");
// jest.mock("../models/lightstreamer");
jest.mock("@aws-sdk/client-s3", () => {
return {
S3Client: jest.fn().mockImplementation(() => {
return {
send: jest.fn().mockImplementation(() => {
return { Contents: [] };
}),
};
}),
ListObjectsV2Command: jest.fn().mockImplementation(() => {
return {};
}),
PutObjectCommand: jest.fn().mockImplementation(() => {
return {};
}),
};
});

// describe('LightBlockUpload', () => {
// let lightBlockUpload: LightBlockUpload;
// let mockCache: jest.Mocked<LightBlockCache>;
// let mockS3Client: jest.Mocked<S3Client>;
describe("LightBlockUpload", () => {
let lightBlockUpload: LightBlockUpload;
let mockCache: jest.Mocked<LightBlockCache>;
let mockS3Client: jest.Mocked<S3Client>;

// beforeAll(() => {
// process.env["BUCKET_ENDPOINT"] = 'test-endpoint';
// process.env["BUCKET_ACCESS_KEY_ID"] = 'test-access-key-id';
// process.env["BUCKET_SECRET_ACCESS_KEY"] = 'test-secret-access-key';
// process.env["BUCKET_NAME"] = 'test-bucket-name';
beforeAll(() => {
mockCache = new LightBlockCache() as jest.Mocked<LightBlockCache>;
mockS3Client = new S3Client({}) as jest.Mocked<S3Client>;
lightBlockUpload = new LightBlockUpload(mockCache);
});

// mockCache = new LightBlockCache() as jest.Mocked<LightBlockCache>;
// mockS3Client = new S3Client({}) as jest.Mocked<S3Client>;
afterAll(() => {
jest.resetAllMocks();
});

// lightBlockUpload = new LightBlockUpload(mockCache);
// });
it("should throw an error if environment variables are not set", () => {
delete process.env["BUCKET_ENDPOINT"];
expect(() => new LightBlockUpload(mockCache)).toThrow();
});

// afterAll(() => {
// jest.resetAllMocks();
// });
it("should upload blocks", async () => {
const mockBlock = LightBlock.fromJSON({
sequence: 1000,
hash: "test-hash",
previousBlockHash: "test-previous-hash",
timestamp: 123456789,
transactions: [],
noteSize: 0,
});
mockCache.getBlockBySequence.mockResolvedValue(mockBlock);
mockCache.getHeadSequence.mockResolvedValue(1001);
mockCache.getUploadHead.mockResolvedValue(0);

// it('should throw an error if environment variables are not set', () => {
// delete process.env["BUCKET_ENDPOINT"];
// expect(() => new LightBlockUpload(mockCache)).toThrow('BUCKET_ENDPOINT not set');
// });
const mockSend = jest.fn();
mockS3Client.send = mockSend;

// it('should upload blocks', async () => {
// const mockBlock = LightBlock.fromJSON({
// sequence: 1000,
// hash: 'test-hash',
// previousBlockHash: 'test-previous-hash',
// timestamp: 123456789,
// transactions: [],
// noteSize: 0
// });
// mockCache.getBlockBySequence.mockResolvedValue(mockBlock);
// mockCache.getHeadSequence.mockResolvedValue(1001);
// mockCache.getUploadHead.mockResolvedValue(0);
await lightBlockUpload.watchAndUpload();

// const mockSend = jest.fn();
// mockS3Client.send = mockSend;

// await lightBlockUpload.watchAndUpload();

// expect(mockSend).toHaveBeenCalledWith(expect.any(ListObjectsV2Command));
// expect(mockSend).toHaveBeenCalledWith(expect.any(PutObjectCommand));
// expect(mockCache.putUploadHead).toHaveBeenCalledWith('test-hash');
// });
// });
expect(mockSend).toHaveBeenCalledWith(expect.any(ListObjectsV2Command));
expect(mockSend).toHaveBeenCalledWith(expect.any(PutObjectCommand));
expect(mockCache.putUploadHead).toHaveBeenCalledWith("test-hash");
});
});
51 changes: 29 additions & 22 deletions src/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import { logger } from "@/utils/logger";

class UploaderError extends Error {}

type BlockRange = {
start: number;
end: number;
};

export class LightBlockUpload {
private cache: LightBlockCache;
private s3Client: S3Client;
Expand Down Expand Up @@ -41,48 +46,48 @@ export class LightBlockUpload {
}

async watchAndUpload(): Promise<void> {
await this.backfill();

let lastBlockNumber = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
const head = await this.cache.getHeadSequence();
const uploadHead = await this.cache.getUploadHead();
if (head && head - uploadHead > 1000 && head > 1000) {
lastBlockNumber = head;
await this.uploadBlocks(lastBlockNumber - 999, lastBlockNumber);
const blocks = await this.blocksToUpload();
for (const block of blocks) {
logger.info(`Gzipping blocks ${block.start} to ${block.end}`);
const gzip = await this.gzipBlocks(block.start, block.end);

logger.info(`Uploading blocks ${block.start} to ${block.end}`);
const key = this.uploadName(block.start, block.end);
await this.uploadBlocks(gzip, key);
const uploadHead = await this.cache.getBlockBySequence(block.end);
if (uploadHead) {
await this.cache.putUploadHead(uploadHead.hash);
}
}
await new Promise((resolve) => setTimeout(resolve, 10000));
}
}

// returns hash of last block uploaded
private async uploadBlocks(start: number, end: number): Promise<void> {
logger.info(`Uploading blocks ${start} to ${end}`);

private async gzipBlocks(start: number, end: number): Promise<Buffer> {
let data = "";
let blockHash = null;
for (let i = start; i <= end; i++) {
const block = await this.cache.getBlockBySequence(i);
if (block) {
data +=
Buffer.from(LightBlock.encode(block).finish()).toString("hex") + "\n";
blockHash = block?.hash;
}
}

const buffer = gzipSync(data);
return gzipSync(data);
}

private async uploadBlocks(buffer: Buffer, key: string): Promise<void> {
const command = new PutObjectCommand({
Bucket: this.bucket,
ContentType: "application/gzip",
ContentLength: buffer.byteLength,
Key: this.uploadName(start, end),
Key: key,
Body: buffer,
});

await this.s3Client.send(command);
if (blockHash) await this.cache.putUploadHead(blockHash);
}

private uploadName(start: number, end: number): string {
Expand All @@ -91,28 +96,30 @@ export class LightBlockUpload {
.padStart(10, "0")}.gz`;
}

private async backfill(): Promise<void> {
private async blocksToUpload(): Promise<BlockRange[]> {
const head = await this.cache.getHeadSequence();
if (!head) return;
if (!head) return [];

const headBlockSequence = parseInt(head.toString());

const { Contents } = await this.s3Client.send(
new ListObjectsV2Command({ Bucket: this.bucket }),
);

if (!Contents) return;
if (!Contents) return [];
const keys = Contents.map((item) => item.Key).filter(Boolean) as string[];
const backfillBlocks = [];
for (let i = 0; i <= headBlockSequence; i += 1000) {
if (headBlockSequence - i < 1000) {
continue;
}
const end = Math.min(i + 999, headBlockSequence);
const key = this.uploadName(i, end);
if (!keys.includes(key)) {
await this.uploadBlocks(i, end);
backfillBlocks.push({ start: i, end });
}
}
return backfillBlocks;
}
}

Expand Down
Binary file removed test/cache/leveldb/000116.ldb
Binary file not shown.
Binary file removed test/cache/leveldb/000120.ldb
Binary file not shown.
Binary file removed test/cache/leveldb/000123.log
Binary file not shown.
2 changes: 1 addition & 1 deletion test/cache/leveldb/CURRENT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MANIFEST-000122
MANIFEST-000231
3 changes: 0 additions & 3 deletions test/cache/leveldb/LOG
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
2024/03/19-09:37:17.297631 173e2b000 Recovering log #121
2024/03/19-09:37:17.298491 173e2b000 Delete type=3 #119
2024/03/19-09:37:17.298537 173e2b000 Delete type=0 #121
8 changes: 3 additions & 5 deletions test/cache/leveldb/LOG.old
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
2024/03/19-09:37:16.985331 172e13000 Recovering log #118
2024/03/19-09:37:16.985449 172e13000 Level-0 table #120: started
2024/03/19-09:37:16.985657 172e13000 Level-0 table #120: 1766 bytes OK
2024/03/19-09:37:16.986076 172e13000 Delete type=3 #117
2024/03/19-09:37:16.986118 172e13000 Delete type=0 #118
2024/03/20-17:13:07.794174 171fcb000 Recovering log #230
2024/03/20-17:13:07.795044 171fcb000 Delete type=3 #229
2024/03/20-17:13:07.795090 171fcb000 Delete type=0 #230
Binary file removed test/cache/leveldb/MANIFEST-000122
Binary file not shown.

0 comments on commit d7347f6

Please sign in to comment.