Skip to content

Commit

Permalink
pr comment address about exception catching
Browse files Browse the repository at this point in the history
bug fixes:
uint32 not capturing block timestamp correctly
head hash was updated sometimes before block was added, on premature exit this was causing empty block
  • Loading branch information
jowparks committed Mar 28, 2024
1 parent 0c00751 commit e399ce6
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 48 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ BUCKET_NAME=ironfish-light-blocks-testnet
BUCKET_ACCESS_KEY_ID=your-access-key-id
BUCKET_SECRET_ACCESS_KEY=your-secret-access-key
UPLOAD_CHUNK_SIZE_MB=1
MAX_UPLOAD_LAG_MS=86400000
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ BUCKET_NAME=ironfish-light-blocks-testnet
BUCKET_ACCESS_KEY_ID=your-access-key-id
BUCKET_SECRET_ACCESS_KEY=your-secret-access-key
UPLOAD_CHUNK_SIZE_MB=1
MAX_UPLOAD_LAG_MS=86400000
2 changes: 1 addition & 1 deletion protos/lightstreamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ message LightBlock {
uint64 sequence = 2; // the height of this block
bytes hash = 3; // the ID (hash) of this block, same as explorer
bytes previousBlockHash = 4; // the ID (hash) of this block's predecessor
uint32 timestamp = 5; // Unix epoch time when the block was mined
uint64 timestamp = 5; // Unix epoch time when the block was mined
repeated LightTransaction transactions = 6; // zero or more compact transactions from this block
uint64 noteSize = 7; // the size of the notes tree after adding transactions from this block.

Expand Down
2 changes: 1 addition & 1 deletion src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ export class LightBlockCache {
);
}
const hash = content.block.hash;
await this.db.put("head", hash);
await this.db.put(
hash,
LightBlock.encode(lightBlock(content)).finish(),
);
await this.db.put(content.block.sequence.toString(), hash);
await this.db.put("head", hash);
} else if (content.type === "disconnected") {
logger.warn(`Removing block ${content.block.sequence}...`);
await this.db.put("head", content.block.previousBlockHash);
Expand Down
130 changes: 84 additions & 46 deletions src/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class LightBlockUpload {
private cache: LightBlockCache;
private s3Client: S3Client;
private chunkSizeMb: number;
private maxUploadLagMs: number;
private bucket: string;
private latestPath = "latest.json";
private blockFileName = "blocks";
Expand All @@ -51,11 +52,18 @@ export class LightBlockUpload {
if (!process.env["UPLOAD_CHUNK_SIZE_MB"]) {
throw new UploadError("UPLOAD_CHUNK_SIZE_MB not set");
}
if (!process.env["MAX_UPLOAD_LAG_MS"]) {
throw new UploadError("MAX_UPLOAD_LAG_MS not set");
}

this.chunkSizeMb = parseInt(
process.env["UPLOAD_CHUNK_SIZE_MB"] as string,
10,
);
this.maxUploadLagMs = parseInt(
process.env["MAX_UPLOAD_LAG_MS"] as string,
10,
);
this.bucket = process.env["BUCKET_NAME"];
this.s3Client = new S3Client({
region: "auto",
Expand All @@ -68,90 +76,108 @@ export class LightBlockUpload {
}

async upload(): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await this.uploadInner();
} catch (error) {
logger.error(`Upload failed, will retry. Error: ${error}`);
}
}
}

private async uploadInner(): Promise<void> {
const latestJson = await this.getObject(this.latestPath);

let currentUploadSize = 0;
let lastUploadTimestamp = 0;
if (!latestJson) {
console.warn("No latest json, starting upload from beginning...");
} else {
const latest: BlockFile = JSON.parse(latestJson);
currentUploadSize = (await this.getFileSize(latest.blocks)) || 0;
lastUploadTimestamp = latest.timestamp;
}

try {
logger.info(
`Current file uploaded size: ${this.bytesToMbRounded(
currentUploadSize,
)} MB, creating new upload...`,
);
const files = await this.createBlockFiles(
this.blockFileName,
logger.info(
`Current file uploaded size: ${this.bytesToMbRounded(
currentUploadSize,
);
const prefix = String(files.timestamp) + "/";
)} MB, creating new upload...`,
);
const files = await this.createBlockFiles(
this.blockFileName,
currentUploadSize,
lastUploadTimestamp,
);
const prefix = String(files.timestamp) + "/";

logger.info(`Upload: begin...`);
logger.info(`Upload: begin...`);

const uploadedBinary = await this.uploadFile(
prefix,
files.blocks,
"application/octet-stream",
);
logger.info(`Upload: binary file complete: ${uploadedBinary}`);
const uploadedBinary = await this.uploadFile(
prefix,
files.blocks,
"application/octet-stream",
);
logger.info(`Upload: binary file complete: ${uploadedBinary}`);

const gzipManifest = await this.gzipFile(
files.manifest,
`${files.manifest}.gz`,
);
const uploadedManifest = await this.uploadFile(
prefix,
gzipManifest,
"application/gzip",
);
logger.info(`Upload: manifest file complete: ${uploadedManifest}`);
const gzipManifest = await this.gzipFile(
files.manifest,
`${files.manifest}.gz`,
);
const uploadedManifest = await this.uploadFile(
prefix,
gzipManifest,
"application/gzip",
);
logger.info(`Upload: manifest file complete: ${uploadedManifest}`);

const uploadedLatest = await this.writeLatestTimestamp(
uploadedManifest,
uploadedBinary,
files.timestamp,
);
await this.uploadFile("", uploadedLatest, "plain/text");
logger.info(
`Upload: updating latest json file complete: ${uploadedLatest}`,
);
} catch (error) {
logger.error(`Upload failed, will retry. Error: ${error}`);
}
const uploadedLatest = await this.writeLatestJson(
uploadedManifest,
uploadedBinary,
files.timestamp,
);
await this.uploadFile("", uploadedLatest, "application/json");
logger.info(
`Upload: updating latest json file complete: ${uploadedLatest}`,
);

void this.upload();
}

async createBlockFiles(
outputFileName: string,
previousSize: number,
lastUploadTimestamp: number,
): Promise<BlockFile> {
this.deleteFileIfExists(outputFileName);
const manifestFileName = `${outputFileName}.manifest`;
this.deleteFileIfExists(manifestFileName);

let i = 1;
let currentSequence = 1;
let currentByte = 0;
const nextUploadSize = this.chunkSizeMb * 1024 * 1024 + previousSize;
const outputFile = fs.createWriteStream(outputFileName);
const manifestFile = fs.createWriteStream(manifestFileName);

// eslint-disable-next-line no-constant-condition
while (true) {
const block = await this.cache.getBlockBySequence(i);
const block = await this.cache.getBlockBySequence(currentSequence);
// end of chain, initial upload
if (block == null && previousSize === 0) break;
if (block == null) {
const currentTimestamp = Date.now();
const hoursSinceLastUpload =
(currentTimestamp - lastUploadTimestamp) / (1000 * 60 * 60);
logger.info(
`${this.bytesToMbRounded(
outputFile.bytesWritten,
)}/${this.bytesToMbRounded(
nextUploadSize,
)} MB written, sequence: ${i}, waiting for next block...`,
)} MB written, sequence: ${currentSequence}, hours since last upload: ${hoursSinceLastUpload.toFixed(
2,
)}/${
this.maxUploadLagMs / (1000 * 60 * 60)
}, waiting for next block...`,
);
await this.waitForNextBlock();
continue;
Expand All @@ -160,14 +186,26 @@ export class LightBlockUpload {
const blockBuffer = LightBlock.encode(block).finish();
outputFile.write(blockBuffer);
manifestFile.write(
`${i},${currentByte},${currentByte + blockBuffer.byteLength - 1}\n`,
`${currentSequence},${currentByte},${
currentByte + blockBuffer.byteLength - 1
}\n`,
);
currentByte += blockBuffer.byteLength;

if (!!previousSize && outputFile.bytesWritten >= nextUploadSize) {
logger.info("Chunk size reached, finishing file creation...");
break;
}
if (
!!lastUploadTimestamp &&
block.timestamp >= lastUploadTimestamp + this.maxUploadLagMs
) {
logger.info(
"More than 1 day since last upload, finishing file creation...",
);
break;
}
i++;
currentSequence++;
}

outputFile.end();
Expand All @@ -176,7 +214,7 @@ export class LightBlockUpload {
logger.info(
`New file upload created, size ${this.bytesToMbRounded(
outputFile.bytesWritten,
)} MB, blocks: ${i - 1}`,
)} MB, blocks: ${currentSequence - 1}`,
);
return {
blocks: outputFileName,
Expand All @@ -185,7 +223,7 @@ export class LightBlockUpload {
};
}

async writeLatestTimestamp(
async writeLatestJson(
manifest: string,
blocks: string,
timestamp: number,
Expand Down

0 comments on commit e399ce6

Please sign in to comment.