Skip to content

Commit

Permalink
buffer as data encoding (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
jowparks authored Apr 9, 2024
1 parent a0d6fed commit b9f5be1
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/cache/index.test.slow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("LightBlockCache creating cache", () => {
const cacheBlocks = lightBlockCache.cacheBlocks();
await Promise.race([cacheBlocks, delay()]);
const head = await lightBlockCache.getHead();
const block = await lightBlockCache.getBlockByHash(head!.toString("hex"));
const block = await lightBlockCache.getLightBlock(head!);
expect(block).toHaveProperty("protoVersion");
expect(block).toHaveProperty("sequence");
expect(block).toHaveProperty("hash");
Expand Down
20 changes: 9 additions & 11 deletions src/cache/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { LightBlock } from "../models/lightstreamer";
import { blockFixture } from "../../test/fixtures";
import { lightBlockCache } from ".";

describe("LightBlockCache", () => {
const fakeHash = "hash1";

beforeAll(async () => {
await lightBlockCache.open();
});
Expand All @@ -14,24 +11,25 @@ describe("LightBlockCache", () => {
});

it("storing and retrieving block is successful", async () => {
const encoded = LightBlock.encode(blockFixture).finish();
await lightBlockCache.put(fakeHash, encoded);
await lightBlockCache.put(blockFixture.sequence.toString(), fakeHash);
await lightBlockCache.putLightBlock(blockFixture);

const hashBlock = await lightBlockCache.getBlockByHash(fakeHash);
const hashBlock = await lightBlockCache.getLightBlock(blockFixture.hash);
expect(hashBlock).toEqual(blockFixture);

const sequenceBlock = await lightBlockCache.getBlockBySequence(
const sequenceBlock = await lightBlockCache.getLightBlockBySequence(
blockFixture.sequence,
);
expect(sequenceBlock).toEqual(blockFixture);
});

it("storing and retrieving hash is successful", async () => {
await lightBlockCache.put("head", Buffer.from("deedbeef", "hex"));
await lightBlockCache.putHead(Buffer.from("deadbeef", "hex"), 1000);

const head = await lightBlockCache.getHead();
expect(head?.toString("hex")).toEqual("deadbeef");

const block = await lightBlockCache.getHead();
expect(block!.toString("hex")).toEqual("deedbeef");
const sequence = await lightBlockCache.getHeadSequence();
expect(sequence).toEqual(1000);
});

it("finality sequence is always behind head sequence by specified amount", async () => {
Expand Down
66 changes: 39 additions & 27 deletions src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,29 @@ export class LightBlockCache {
} else if (content.type === "disconnected") {
logger.warn(`Removing block ${content.block.sequence}...`);
const block = lightBlock(content);
await this.db.put("head", block.previousBlockHash);
await this.db.put("headSequence", (block.sequence - 1).toString());
await this.db.del(block.sequence);
await this.db.del(block.hash);
await this.putHead(block.previousBlockHash, block.sequence - 1);
await this.del(block.sequence.toString());
await this.del(block.hash.toString("hex"));
}
}
}
}

private async rollbackHead(): Promise<void> {
let head = (await this.getHeadSequence()) - 1;
if (!head) {
let headSequence = (await this.getHeadSequence()) - 1;
if (!headSequence) {
logger.error("Head sequence is not set. Cannot rollback.");
return;
}
let block = null;
while (!block) {
block = await this.getBlockBySequence(head);
block = await this.getLightBlockBySequence(headSequence);
if (!block) {
head -= 1;
headSequence -= 1;
}
}
await this.db.put("headSequence", head.toString());
await this.db.put("head", block.hash);
logger.info(`Rolled back head to block sequence ${head}`);
await this.putHead(block.hash, headSequence);
logger.info(`Rolled back head to block sequence ${headSequence}`);
}

async cacheBlock(block: LightBlock): Promise<void> {
Expand All @@ -118,14 +116,12 @@ export class LightBlockCache {
);
}
const hash = block.hash;
await this.db.put(hash, LightBlock.encode(block).finish());
await this.db.put(block.sequence.toString(), hash);
await this.putLightBlock(block);
const finalizedSequence = await this.getFinalizedBlockSequence();
if (block.sequence - this.finalityBlockCount > finalizedSequence) {
this.putFinalizedBlockSequence(block.sequence - this.finalityBlockCount);
}
await this.db.put("head", hash);
await this.db.put("headSequence", block.sequence.toString());
await this.putHead(hash, block.sequence);
}

async getFinalizedBlockSequence(): Promise<number> {
Expand All @@ -135,23 +131,32 @@ export class LightBlockCache {
: this.finalityBlockCount + 1;
}

async getHead(): Promise<Buffer | null> {
const head = await this.get("head");
return head ? Buffer.from(head) : null;
async putFinalizedBlockSequence(sequence: number): Promise<void> {
await this.put("finalizedBlockSequence", Buffer.from(sequence.toString()));
}

async putFinalizedBlockSequence(sequence: number): Promise<void> {
await this.db.put("finalizedBlockSequence", sequence.toString());
async putHead(hash: Buffer, sequence: number): Promise<void> {
await this.put("head", hash);
await this.put("headSequence", Buffer.from(sequence.toString()));
}

async getBlockByHash(hash: string): Promise<LightBlock | null> {
const block = await this.get(hash);
return block ? LightBlock.decode(block) : null;
async getLightBlock(hash: Buffer): Promise<LightBlock | null> {
try {
const data = await this.get(hash.toString("hex"));
if (!data) return null;
return LightBlock.decode(data);
} catch (e) {
return null;
}
}

async getBlockBySequence(sequence: number): Promise<LightBlock | null> {
async getLightBlockBySequence(sequence: number): Promise<LightBlock | null> {
const hash = await this.get(sequence.toString());
return hash ? await this.getBlockByHash(hash.toString()) : null;
return hash ? await this.getLightBlock(hash) : null;
}

async getHead(): Promise<Buffer | null> {
return this.get("head");
}

async getHeadSequence(): Promise<number> {
Expand All @@ -160,7 +165,7 @@ export class LightBlockCache {
return Number(head.toString());
}

async get(key: string): Promise<Uint8Array | null> {
async get(key: string): Promise<Buffer | null> {
try {
const data = await this.db.get(key);
return data;
Expand All @@ -169,10 +174,17 @@ export class LightBlockCache {
}
}

async put(key: string, value: Uint8Array | string): Promise<void> {
private async put(key: string, value: Buffer): Promise<void> {
await this.db.put(key, value);
}

async putLightBlock(block: LightBlock): Promise<void> {
const key = block.hash.toString("hex");
const value = LightBlock.encode(block).finish();
await this.put(block.sequence.toString(), block.hash);
await this.put(key, Buffer.from(value));
}

async del(key: string): Promise<void> {
await this.db.del(key);
}
Expand Down
6 changes: 3 additions & 3 deletions src/controllers/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ export class BlockController {

let block = null;
if (hash) {
block = await lightBlockCache.getBlockByHash(String(hash));
block = await lightBlockCache.getLightBlock(Buffer.from(hash, "hex"));
} else if (sequence) {
block = await lightBlockCache.getBlockBySequence(Number(sequence));
block = await lightBlockCache.getLightBlockBySequence(Number(sequence));
}

if (block) {
Expand Down Expand Up @@ -127,7 +127,7 @@ export class BlockController {
// Placeholder logic: Fetch blocks from cache or database
const blocks: LightBlock[] = [];
for (let i = start; i <= end; i++) {
const block = await lightBlockCache.getBlockBySequence(i);
const block = await lightBlockCache.getLightBlockBySequence(i);
if (block) {
blocks.push(block);
}
Expand Down
2 changes: 1 addition & 1 deletion src/upload/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe("LightBlockUpload", () => {

it("should gzip blocks/manifest, last block should match", async () => {
const tempFile = path.join(os.tmpdir(), "test");
const firstBlock = (await lightBlockCache.getBlockBySequence(
const firstBlock = (await lightBlockCache.getLightBlockBySequence(
1,
)) as LightBlock;
const triggerUploadTime =
Expand Down
2 changes: 1 addition & 1 deletion src/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export class LightBlockUpload {

// eslint-disable-next-line no-constant-condition
while (true) {
const block = await this.cache.getBlockBySequence(currentSequence);
const block = await this.cache.getLightBlockBySequence(currentSequence);
const finalizedSequence = await this.cache.getFinalizedBlockSequence();
if (!block || block.sequence > finalizedSequence) {
const currentTimestamp = Date.now();
Expand Down
Binary file removed test/dbfixture/cache/leveldb/000003.log
Binary file not shown.
Binary file added test/dbfixture/cache/leveldb/000007.ldb
Binary file not shown.
Empty file.
2 changes: 1 addition & 1 deletion test/dbfixture/cache/leveldb/CURRENT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MANIFEST-000002
MANIFEST-000006
6 changes: 5 additions & 1 deletion test/dbfixture/cache/leveldb/LOG
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
2024/03/29-16:12:39.733354 170903000 Delete type=3 #1
2024/04/08-20:26:12.744956 17269f000 Recovering log #5
2024/04/08-20:26:12.745468 17269f000 Level-0 table #7: started
2024/04/08-20:26:12.745659 17269f000 Level-0 table #7: 6076 bytes OK
2024/04/08-20:26:12.746007 17269f000 Delete type=0 #5
2024/04/08-20:26:12.746084 17269f000 Delete type=3 #4
3 changes: 3 additions & 0 deletions test/dbfixture/cache/leveldb/LOG.old
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2024/04/08-20:25:05.384949 178807000 Recovering log #3
2024/04/08-20:25:05.385875 178807000 Delete type=0 #3
2024/04/08-20:25:05.385905 178807000 Delete type=3 #2
Binary file removed test/dbfixture/cache/leveldb/MANIFEST-000002
Binary file not shown.
Binary file added test/dbfixture/cache/leveldb/MANIFEST-000006
Binary file not shown.

0 comments on commit b9f5be1

Please sign in to comment.