Skip to content

Commit

Permalink
R2 upload for LightBlock in single file (#54)
Browse files Browse the repository at this point in the history
* R2 upload for wallet server chunks

* handle upload head

* updating upload mechanism to split functionality

* refactoring LightBlockUpload and writing tests

* gzip by stream, writes tests validating streaming unzip

* add .env.test

* upload lightblocks

* test for gzip, manifest, and lightblock decoding

* pr comment rework, upload to new directory and replace latest.json

* add example script for downloading block

* pr comment address about exception catching

bug fixes:
uint32 not capturing block timestamp correctly
head hash was updated sometimes before block was added, on premature exit this was causing empty block

* bug fix

* db fixture cleanup

* db fixture readme
  • Loading branch information
jowparks authored Mar 28, 2024
1 parent 011b3f5 commit b57e081
Show file tree
Hide file tree
Showing 26 changed files with 1,707 additions and 36 deletions.
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ NODE_AUTH_TOKEN=your-auth-token
BUILD_CACHE=true
CACHE_PATH=./block-cache
BLOCK_RANGE_MAX=1000

UPLOAD_BLOCKS=true
BUCKET_ENDPOINT=https://foo.r2.cloudflarestorage.com
BUCKET_NAME=ironfish-light-blocks-testnet
BUCKET_ACCESS_KEY_ID=your-access-key-id
BUCKET_SECRET_ACCESS_KEY=your-secret-access-key
UPLOAD_CHUNK_SIZE_MB=1
MAX_UPLOAD_LAG_MS=86400000
14 changes: 14 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
NODE_HOST=node.domain.com
NODE_PORT=8020
NODE_AUTH_TOKEN=your-auth-token
BUILD_CACHE=true
CACHE_PATH=./block-cache
BLOCK_RANGE_MAX=1000

UPLOAD_BLOCKS=true
BUCKET_ENDPOINT=https://foo.r2.cloudflarestorage.com
BUCKET_NAME=ironfish-light-blocks-testnet
BUCKET_ACCESS_KEY_ID=your-access-key-id
BUCKET_SECRET_ACCESS_KEY=your-secret-access-key
UPLOAD_CHUNK_SIZE_MB=1
MAX_UPLOAD_LAG_MS=86400000
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ node_modules/
*-cache*
*testdb*
*yarn-error.log
test/*
test/*
blocks*
latest.json
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@ All that is needed to generate a compliant gRPC client using your favorite gRPC
## Environment variables

See [.env.example](.env.example) for the environment variables that can be used with the server. The repository is compliant with usage of standard environment variables or [dotenv](https://www.npmjs.com/package/dotenv)

## Test fixture database

To create a reusable database, the cache was hardcoded to stop being built at 10 blocks. The database files relating to the cache (`block-cache/`) were copied to test fixture directory (`test/dbfixture/`). A global setup copies those files into a gitignored folder (`test/cache/`) for use when running tests. This prevents tests updating the db and causing changelogs. To update the fixture, just run the cache with a hardcoded stop point (currently 10 blocks), and overwrite files in `test/dbfixture/` directory.
4 changes: 4 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,7 @@ Once we have the most recent block that is on the main chain, we can roll back t
- `AccountsManager.handleReorg()`
- `BlockCache.handleReorg()`
- Merkle tree: `revertToNoteSize`

## Downloading Block Cache data

- See [download script](./scripts/download_block.ts) for example of how to use download of blocks
74 changes: 74 additions & 0 deletions example/scripts/download_block.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import axios from "axios";
import fs from "fs";
import zlib from "zlib";
import readline from "readline";
import { LightBlock } from "../../src/models/lightstreamer";
import { promisify } from "util";
import { Readable, finished } from "stream";

const R2_URL = "https://pub-a64f19884be64edaa4f3326fa5c9a39a.r2.dev/";
const BLOCK_NUMBER = 420;

const finishedPromise = promisify(finished); // Convert callback to promise

async function downloadFile(url: string, path: string) {
const response = await axios.get(url, { responseType: "stream" });
const writer = fs.createWriteStream(path);
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on("finish", resolve);
writer.on("error", reject);
});
}

async function decompressGzip(inputPath: string, outputPath: string) {
const gzip = zlib.createGunzip();
const input = fs.createReadStream(inputPath);
const output = fs.createWriteStream(outputPath);
input.pipe(gzip).pipe(output);
await finishedPromise(output);
}

async function findBlockRange(manifestPath: string, blockNumber: number) {
const fileStream = fs.createReadStream(manifestPath);
const rl = readline.createInterface({ input: fileStream });
for await (const line of rl) {
const [block, start, end] = line.split(",");
if (parseInt(block) === blockNumber) {
return { start: parseInt(start), end: parseInt(end) };
}
}
throw new Error(`Block ${blockNumber} not found in manifest`);
}

async function downloadBlock(
url: string,
range: { start: number; end: number },
) {
const response = await axios.get(url, {
headers: { Range: `bytes=${range.start}-${range.end}` },
responseType: "stream",
});
const data = await streamToBuffer(response.data);
return data;
}

async function streamToBuffer(readableStream: Readable): Promise<Buffer> {
const chunks: Buffer[] = [];
for await (const chunk of readableStream) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}

async function main() {
await downloadFile(R2_URL + "latest.json", "latest.json");
const latest = JSON.parse(fs.readFileSync("latest.json", "utf-8"));
await downloadFile(R2_URL + latest.manifest, "blocks.manifest.gz");
await decompressGzip("blocks.manifest.gz", "blocks.manifest");
const range = await findBlockRange("blocks.manifest", BLOCK_NUMBER);
const block = await downloadBlock(R2_URL + latest.blocks, range);
console.log(JSON.stringify(LightBlock.decode(block), null, 2));
}

main().catch(console.error);
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ module.exports = {
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/src/$1",
},
globalSetup: "<rootDir>/jest.setup.js",
};
46 changes: 46 additions & 0 deletions jest.setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const fs = require("fs");
const path = require("path");
require("dotenv").config({ path: ".env.test" });

// Create working copy of test database
const sourceDir = path.join(__dirname, "test", "dbfixture", "cache");
const targetDir = path.join(__dirname, "test", "cache");

function copyDirectory(source, target) {
return new Promise((resolve, reject) => {
fs.readdir(source, (err, files) => {
if (err) reject(err);

const promises = files.map((file) => {
return new Promise((resolve, reject) => {
const sourceFile = path.join(source, file);
const targetFile = path.join(target, file);

fs.stat(sourceFile, (err, stats) => {
if (err) reject(err);

if (stats.isDirectory()) {
fs.mkdir(targetFile, { recursive: true }, (err) => {
if (err) reject(err);
resolve(copyDirectory(sourceFile, targetFile));
});
} else {
fs.copyFile(sourceFile, targetFile, (err) => {
if (err) reject(err);
resolve();
});
}
});
});
});

Promise.all(promises)
.then(() => resolve())
.catch((error) => reject(error));
});
});
}

module.exports = async () => {
await copyDirectory(sourceDir, targetDir);
};
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"scripts": {
"dev": "nodemon --config ./nodemon.json --exec 'yarn build && yarn start'",
"lint": "eslint --ext .ts .",
"test": "yarn build && NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" CACHE_PATH='test/cache' BUILD_CACHE=false jest src/* --forceExit --runInBand",
"test": "yarn build && CACHE_PATH='test/cache' UPLOAD_BLOCKS=false BUILD_CACHE=false jest src/* --forceExit --runInBand",
"format": "prettier . --write",
"prebuild": "node bin/proto && rimraf dist",
"tsoa-gen": "tsoa spec-and-routes",
Expand Down Expand Up @@ -52,6 +52,7 @@
"typescript": "5.1.6"
},
"dependencies": {
"@aws-sdk/client-s3": "3",
"express": "^4.18.3",
"jest": "^29.7.0",
"source-map-support": "0.5.21",
Expand Down
18 changes: 9 additions & 9 deletions protos/lightstreamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ message Empty {}

message BlockID {
optional uint64 sequence = 1;
optional string hash = 2;
optional bytes hash = 2;
}

// BlockRange specifies a series of blocks from start to end inclusive.
Expand All @@ -18,38 +18,38 @@ message BlockRange {
message LightBlock {
uint32 protoVersion = 1; // the version of this wire format, for storage
uint64 sequence = 2; // the height of this block
string hash = 3; // the ID (hash) of this block, same as explorer
string previousBlockHash = 4; // the ID (hash) of this block's predecessor
uint32 timestamp = 5; // Unix epoch time when the block was mined
bytes hash = 3; // the ID (hash) of this block, same as explorer
bytes previousBlockHash = 4; // the ID (hash) of this block's predecessor
uint64 timestamp = 5; // Unix epoch time when the block was mined
repeated LightTransaction transactions = 6; // zero or more compact transactions from this block
uint64 noteSize = 7; // the size of the notes tree after adding transactions from this block.

}

message LightTransaction {
uint64 index = 1; // do we need this field?
string hash = 2;
bytes hash = 2;

repeated LightSpend spends = 4;
repeated LightOutput outputs = 5;
}


message LightSpend {
string nf = 2;
bytes nf = 2;
}

message LightOutput {
string note = 1; // NoteEncrypted, serialized
bytes note = 1; // NoteEncrypted, serialized
}

message Transaction {
// built, encrypted transaction
string data = 1;
bytes data = 1;
}

message SendResponse {
string hash = 1;
bytes hash = 1;
bool accepted = 2;
}

Expand Down
1 change: 0 additions & 1 deletion src/cache/index.test.slow.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { expect, it, describe, afterAll } from "vitest";
import { lightBlockCache } from ".";

function delay(ms = 1000) {
Expand Down
14 changes: 11 additions & 3 deletions src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function getCachePath(): string {
return path.join(".", folderName);
}

class LightBlockCache {
export class LightBlockCache {
private db: LevelUp;
private cacheDir: string;

Expand Down Expand Up @@ -60,15 +60,15 @@ class LightBlockCache {
);
}
const hash = content.block.hash;
await this.db.put("head", hash);
await this.db.put(
hash,
LightBlock.encode(lightBlock(content)).finish(),
);
await this.db.put(content.block.sequence.toString(), hash);
await this.db.put("head", hash);
} else if (content.type === "disconnected") {
logger.warn(`Removing block ${content.block.sequence}...`);
await this.db.put("head", content.block.previous.toString());
await this.db.put("head", content.block.previousBlockHash);
await this.db.del(content.block.sequence);
await this.db.del(content.block.hash);
}
Expand All @@ -87,6 +87,14 @@ class LightBlockCache {
return LightBlock.decode(block);
}

async getHeadSequence(): Promise<number> {
const head = await this.get("head");
if (!head) return 0;
const block = await this.getBlockByHash(head.toString());
if (!block) return 0;
return block.sequence;
}

async get(key: string): Promise<Uint8Array | null> {
try {
const data = await this.db.get(key);
Expand Down
19 changes: 4 additions & 15 deletions src/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import { LightBlock } from "./models/lightstreamer";
const expectedBlockObject = {
protoVersion: expect.any(Number),
sequence: 1,
hash: expect.any(String),
previousBlockHash: expect.any(String),
hash: expect.any(Buffer),
previousBlockHash: expect.any(Buffer),
timestamp: expect.any(Number),
transactions: expect.arrayContaining([
expect.objectContaining({
hash: expect.any(String),
hash: expect.any(Buffer),
outputs: expect.arrayContaining([
expect.objectContaining({
note: expect.any(String),
note: expect.any(Buffer),
}),
]),
}),
Expand All @@ -30,17 +30,6 @@ afterAll(async () => {
server.close();
});

describe("GET /latest-block", () => {
it("should return the latest block successfully", async () => {
const response = await request(app).get("/latest-block");
expect(response.statusCode).toBe(200);
expect(response.body).toMatchObject({
sequence: expect.any(Number),
hash: expect.any(String),
});
});
});

describe("GET /block", () => {
it("should return the correct block for a given identifier", async () => {
// Assuming an identifier and corresponding block fixture exist
Expand Down
6 changes: 6 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import swaggerUi from "swagger-ui-express";
import * as openApiDocument from "./swagger/swagger.json";
import { logger } from "./utils/logger";
import { lightBlockCache } from "./cache";
import { lightBlockUpload } from "./upload";

if (process.env["BUILD_CACHE"] === "true") {
logger.info("Building block cache...");
void lightBlockCache.cacheBlocks();
}

if (process.env["UPLOAD_BLOCKS"] === "true") {
logger.info("Starting uploader...");
void lightBlockUpload.upload();
}

export const app = express();
app.use(bodyParser.json());

Expand Down
Loading

0 comments on commit b57e081

Please sign in to comment.