Skip to content

Commit

Permalink
refactoring LightBlockUpload and writing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jowparks committed Mar 21, 2024
1 parent d7347f6 commit 3f5ed8a
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if (process.env["BUILD_CACHE"] === "true") {

if (process.env["UPLOAD_BLOCKS"] === "true") {
logger.info("Starting uploader...");
void lightBlockUpload.watchAndUpload();
void lightBlockUpload.upload();
}

export const app = express();
Expand Down
88 changes: 34 additions & 54 deletions src/upload/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,50 @@
import { LightBlockUpload } from "./index";
import { lightBlockCache } from "../cache";
import { lightBlockUpload } from "./index";
import {
S3Client,
PutObjectCommand,
ListObjectsV2Command,
S3Client,
} from "@aws-sdk/client-s3";
import { LightBlockCache } from "../cache";
import { LightBlock } from "../models/lightstreamer";

jest.mock("@aws-sdk/client-s3", () => {
return {
S3Client: jest.fn().mockImplementation(() => {
return {
send: jest.fn().mockImplementation(() => {
return { Contents: [] };
}),
};
}),
ListObjectsV2Command: jest.fn().mockImplementation(() => {
return {};
}),
PutObjectCommand: jest.fn().mockImplementation(() => {
return {};
}),
};
});

describe("LightBlockUpload", () => {
let lightBlockUpload: LightBlockUpload;
let mockCache: jest.Mocked<LightBlockCache>;
let mockS3Client: jest.Mocked<S3Client>;

beforeAll(() => {
mockCache = new LightBlockCache() as jest.Mocked<LightBlockCache>;
mockS3Client = new S3Client({}) as jest.Mocked<S3Client>;
lightBlockUpload = new LightBlockUpload(mockCache);
jest.spyOn(S3Client.prototype, "send").mockImplementation((command) => {
if (command instanceof ListObjectsV2Command) {
return Promise.resolve({
Contents: [
{ Key: lightBlockUpload.uploadName({ start: 1, end: 1000 }) },
{ Key: lightBlockUpload.uploadName({ start: 1001, end: 2000 }) },
],
});
} else if (command instanceof PutObjectCommand) {
return Promise.resolve({
/* your mock PutObjectCommand response */
});
} else {
throw new Error(
`Command mock not implemented: ${command.constructor.name}`,
);
}
});
});

afterAll(() => {
afterAll(async () => {
jest.resetAllMocks();
await lightBlockCache.close();
});

it("should throw an error if environment variables are not set", () => {
delete process.env["BUCKET_ENDPOINT"];
expect(() => new LightBlockUpload(mockCache)).toThrow();
it("upload name creation should be reversible", () => {
const blockRange = { start: 1, end: 1000 };
const key = lightBlockUpload.uploadName(blockRange);
const newBlockRange = lightBlockUpload.parseUploadName(key);
expect(blockRange).toEqual(newBlockRange);
});

it("should upload blocks", async () => {
const mockBlock = LightBlock.fromJSON({
sequence: 1000,
hash: "test-hash",
previousBlockHash: "test-previous-hash",
timestamp: 123456789,
transactions: [],
noteSize: 0,
});
mockCache.getBlockBySequence.mockResolvedValue(mockBlock);
mockCache.getHeadSequence.mockResolvedValue(1001);
mockCache.getUploadHead.mockResolvedValue(0);

const mockSend = jest.fn();
mockS3Client.send = mockSend;

await lightBlockUpload.watchAndUpload();

expect(mockSend).toHaveBeenCalledWith(expect.any(ListObjectsV2Command));
expect(mockSend).toHaveBeenCalledWith(expect.any(PutObjectCommand));
expect(mockCache.putUploadHead).toHaveBeenCalledWith("test-hash");
it("existing uploads should return block ranges", async () => {
const ranges = await lightBlockUpload.existingUploads();
expect(ranges).toEqual([
{ start: 1, end: 1000 },
{ start: 1001, end: 2000 },
]);
});
});
66 changes: 39 additions & 27 deletions src/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { LightBlockCache, lightBlockCache } from "@/cache";
import { LightBlock } from "@/models/lightstreamer";
import { logger } from "@/utils/logger";

class UploaderError extends Error {}
class UploadError extends Error {}

type BlockRange = {
start: number;
Expand All @@ -23,16 +23,16 @@ export class LightBlockUpload {
constructor(cache: LightBlockCache) {
this.cache = cache;
if (!process.env["BUCKET_ENDPOINT"]) {
throw new UploaderError("BUCKET_ENDPOINT not set");
throw new UploadError("BUCKET_ENDPOINT not set");
}
if (!process.env["BUCKET_ACCESS_KEY_ID"]) {
throw new UploaderError("BUCKET_ACCESS_KEY_ID not set");
throw new UploadError("BUCKET_ACCESS_KEY_ID not set");
}
if (!process.env["BUCKET_SECRET_ACCESS_KEY"]) {
throw new UploaderError("BUCKET_SECRET_ACCESS_KEY not set");
throw new UploadError("BUCKET_SECRET_ACCESS_KEY not set");
}
if (!process.env["BUCKET_NAME"]) {
throw new UploaderError("BUCKET_NAME not set");
throw new UploadError("BUCKET_NAME not set");
}
this.bucket = process.env["BUCKET_NAME"];
this.s3Client = new S3Client({
Expand All @@ -45,16 +45,17 @@ export class LightBlockUpload {
});
}

async watchAndUpload(): Promise<void> {
async upload(): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
const blocks = await this.blocksToUpload();
const existingUploads = await this.existingUploads();
const blocks = await this.uploadManifest(existingUploads);
for (const block of blocks) {
logger.info(`Gzipping blocks ${block.start} to ${block.end}`);
const gzip = await this.gzipBlocks(block.start, block.end);

logger.info(`Uploading blocks ${block.start} to ${block.end}`);
const key = this.uploadName(block.start, block.end);
const key = this.uploadName(block);
await this.uploadBlocks(gzip, key);
const uploadHead = await this.cache.getBlockBySequence(block.end);
if (uploadHead) {
Expand All @@ -65,7 +66,7 @@ export class LightBlockUpload {
}
}

private async gzipBlocks(start: number, end: number): Promise<Buffer> {
async gzipBlocks(start: number, end: number): Promise<Buffer> {
let data = "";
for (let i = start; i <= end; i++) {
const block = await this.cache.getBlockBySequence(i);
Expand All @@ -74,11 +75,10 @@ export class LightBlockUpload {
Buffer.from(LightBlock.encode(block).finish()).toString("hex") + "\n";
}
}

return gzipSync(data);
}

private async uploadBlocks(buffer: Buffer, key: string): Promise<void> {
async uploadBlocks(buffer: Buffer, key: string): Promise<void> {
const command = new PutObjectCommand({
Bucket: this.bucket,
ContentType: "application/gzip",
Expand All @@ -90,33 +90,45 @@ export class LightBlockUpload {
await this.s3Client.send(command);
}

private uploadName(start: number, end: number): string {
return `blocks_${start.toString().padStart(10, "0")}_${end
async existingUploads(): Promise<BlockRange[]> {
const { Contents } = await this.s3Client.send(
new ListObjectsV2Command({ Bucket: this.bucket }),
);

if (!Contents) return [];
const keys = Contents.map((item) => item.Key).filter(Boolean) as string[];
return keys.map((key) => {
return this.parseUploadName(key);
});
}

uploadName(range: BlockRange): string {
return `blocks_${range.start.toString().padStart(10, "0")}_${range.end
.toString()
.padStart(10, "0")}.gz`;
}

private async blocksToUpload(): Promise<BlockRange[]> {
parseUploadName(uploadName: string): BlockRange {
const match = uploadName.match(/blocks_(\d+)_(\d+)\.gz/);
if (match) {
return { start: parseInt(match[1], 10), end: parseInt(match[2], 10) };
}
throw new UploadError("Invalid upload name: " + uploadName);
}

async uploadManifest(existingUploads: BlockRange[]): Promise<BlockRange[]> {
const head = await this.cache.getHeadSequence();
if (!head) return [];

const headBlockSequence = parseInt(head.toString());

const { Contents } = await this.s3Client.send(
new ListObjectsV2Command({ Bucket: this.bucket }),
);

if (!Contents) return [];
const keys = Contents.map((item) => item.Key).filter(Boolean) as string[];
const backfillBlocks = [];
for (let i = 0; i <= headBlockSequence; i += 1000) {
if (headBlockSequence - i < 1000) {
for (let start = 1; start <= headBlockSequence; start += 1000) {
if (headBlockSequence - start < 1000) {
continue;
}
const end = Math.min(i + 999, headBlockSequence);
const key = this.uploadName(i, end);
if (!keys.includes(key)) {
backfillBlocks.push({ start: i, end });
const end = Math.min(start + 999, headBlockSequence);
if (!existingUploads.includes({ start, end })) {
backfillBlocks.push({ start, end });
}
}
return backfillBlocks;
Expand Down
Binary file added test/cache/leveldb/000116.ldb
Binary file not shown.
Binary file added test/cache/leveldb/000120.ldb
Binary file not shown.
Binary file added test/cache/leveldb/000123.log
Binary file not shown.
2 changes: 1 addition & 1 deletion test/cache/leveldb/CURRENT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MANIFEST-000231
MANIFEST-000122
3 changes: 3 additions & 0 deletions test/cache/leveldb/LOG
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2024/03/19-09:37:17.297631 173e2b000 Recovering log #121
2024/03/19-09:37:17.298491 173e2b000 Delete type=3 #119
2024/03/19-09:37:17.298537 173e2b000 Delete type=0 #121
8 changes: 5 additions & 3 deletions test/cache/leveldb/LOG.old
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
2024/03/20-17:13:07.794174 171fcb000 Recovering log #230
2024/03/20-17:13:07.795044 171fcb000 Delete type=3 #229
2024/03/20-17:13:07.795090 171fcb000 Delete type=0 #230
2024/03/19-09:37:16.985331 172e13000 Recovering log #118
2024/03/19-09:37:16.985449 172e13000 Level-0 table #120: started
2024/03/19-09:37:16.985657 172e13000 Level-0 table #120: 1766 bytes OK
2024/03/19-09:37:16.986076 172e13000 Delete type=3 #117
2024/03/19-09:37:16.986118 172e13000 Delete type=0 #118
Binary file added test/cache/leveldb/MANIFEST-000122
Binary file not shown.

0 comments on commit 3f5ed8a

Please sign in to comment.