Skip to content

Commit

Permalink
feat: use multiple transactions for parallel blocks processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Romsters committed Dec 20, 2023
1 parent e2d2a8f commit f44e956
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 18 deletions.
1 change: 1 addition & 0 deletions packages/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ BLOCKCHAIN_RPC_URL=http://localhost:3050

WAIT_FOR_BLOCKS_INTERVAL=1000
BLOCKS_PROCESSING_BATCH_SIZE=10
NUMBER_OF_BLOCKS_PER_DB_TRANSACTION=10

BATCHES_PROCESSING_POLLING_INTERVAL=60000
DELETE_BALANCES_INTERVAL=300000
Expand Down
24 changes: 20 additions & 4 deletions packages/worker/src/block/block.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ConfigService } from "@nestjs/config";
import { InjectMetric } from "@willsoto/nestjs-prometheus";
import { Histogram } from "prom-client";
import { MoreThanOrEqual, LessThanOrEqual, Between, FindOptionsWhere } from "typeorm";
import { UnitOfWork } from "../unitOfWork";
import { IDbTransaction, UnitOfWork } from "../unitOfWork";
import { BlockchainService } from "../blockchain/blockchain.service";
import { BlockInfo, BlockWatcher } from "./block.watcher";
import { BalanceService } from "../balance/balance.service";
Expand All @@ -14,6 +14,7 @@ import { Block } from "../entities";
import { TransactionProcessor } from "../transaction";
import { LogProcessor } from "../log";
import { validateBlocksLinking } from "./block.utils";
import splitIntoChunks from "../utils/splitIntoChunks";
import {
BLOCKS_BATCH_PROCESSING_DURATION_METRIC_NAME,
BLOCK_PROCESSING_DURATION_METRIC_NAME,
Expand All @@ -29,6 +30,7 @@ export class BlockProcessor {
private readonly fromBlock: number;
private readonly toBlock: number;
private readonly disableBlocksRevert: boolean;
private readonly numberOfBlocksPerDbTransaction: number;

public constructor(
private readonly unitOfWork: UnitOfWork,
Expand All @@ -52,6 +54,7 @@ export class BlockProcessor {
this.fromBlock = configService.get<number>("blocks.fromBlock");
this.toBlock = configService.get<number>("blocks.toBlock");
this.disableBlocksRevert = configService.get<boolean>("blocks.disableBlocksRevert");
this.numberOfBlocksPerDbTransaction = configService.get<number>("blocks.numberOfBlocksPerDbTransaction");
}

public async processNextBlocksRange(): Promise<boolean> {
Expand Down Expand Up @@ -108,12 +111,25 @@ export class BlockProcessor {
}

const stopDurationMeasuring = this.blocksBatchProcessingDurationMetric.startTimer();
let dbTransactions: IDbTransaction[] = [];

try {
await this.unitOfWork.useTransaction(async () => {
await Promise.all(blocksToProcess.map((blockInfo) => this.addBlock(blockInfo)));
});
const blocksToProcessChunks = splitIntoChunks(blocksToProcess, this.numberOfBlocksPerDbTransaction);

dbTransactions = blocksToProcessChunks.map((blocksToProcessChunk) =>
this.unitOfWork.useTransaction(async () => {
await Promise.all(blocksToProcessChunk.map((blockInfo) => this.addBlock(blockInfo)));
}, true)
);
await Promise.all(dbTransactions.map((t) => t.waitForExecution()));

// sequentially commit transactions to preserve blocks order in DB
for (const dbTransaction of dbTransactions) {
await dbTransaction.commit();
}
stopDurationMeasuring({ status: "success" });
} catch (error) {
await Promise.all(dbTransactions.map((dbTransaction) => dbTransaction.ensureRollbackIfNotCommitted()));
stopDurationMeasuring({ status: "error" });
throw error;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/worker/src/blocksRevert/blocksRevert.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class BlocksRevertService {
}
}

await this.unitOfWork.useTransaction(async () => {
const dbTransaction = this.unitOfWork.useTransaction(async () => {
this.logger.log("Reverting counters", { lastCorrectBlockNumber });
await this.counterService.revert(lastCorrectBlockNumber);

Expand All @@ -66,6 +66,7 @@ export class BlocksRevertService {
this.blockRepository.delete({ number: MoreThan(lastCorrectBlockNumber) }),
]);
});
await dbTransaction.waitForExecution();

this.logger.log("Blocks revert completed", { lastCorrectBlockNumber });
} catch (error) {
Expand Down
1 change: 1 addition & 0 deletions packages/worker/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe("config", () => {
fromBlock: 0,
toBlock: null,
disableBlocksRevert: false,
numberOfBlocksPerDbTransaction: 50,
},
batches: {
batchesProcessingPollingInterval: 60000,
Expand Down
2 changes: 2 additions & 0 deletions packages/worker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export default () => {
USE_WEBSOCKETS_FOR_TRANSACTIONS,
WAIT_FOR_BLOCKS_INTERVAL,
BLOCKS_PROCESSING_BATCH_SIZE,
NUMBER_OF_BLOCKS_PER_DB_TRANSACTION,
BATCHES_PROCESSING_POLLING_INTERVAL,
DELETE_BALANCES_INTERVAL,
COUNTERS_PROCESSING_POLLING_INTERVAL,
Expand Down Expand Up @@ -49,6 +50,7 @@ export default () => {
fromBlock: parseInt(FROM_BLOCK, 10) || 0,
toBlock: parseInt(TO_BLOCK, 10) || null,
disableBlocksRevert: DISABLE_BLOCKS_REVERT === "true",
numberOfBlocksPerDbTransaction: parseInt(NUMBER_OF_BLOCKS_PER_DB_TRANSACTION, 10) || 50,
},
batches: {
batchesProcessingPollingInterval: parseInt(BATCHES_PROCESSING_POLLING_INTERVAL, 10) || 60000,
Expand Down
3 changes: 2 additions & 1 deletion packages/worker/src/counter/counter.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ export class CounterProcessor<T extends CountableEntity> {
const counters = calculateCounters(this.tableName, records, this.criteriaList);
const newLastProcessedRecordNumber = Number(records[records.length - 1].number);

await this.unitOfWork.useTransaction(() =>
const dbTransaction = this.unitOfWork.useTransaction(() =>
this.counterRepository.incrementCounters(counters, newLastProcessedRecordNumber)
);
await dbTransaction.waitForExecution();

this.lastProcessedRecordNumber = newLastProcessedRecordNumber;
return records.length === this.recordsBatchSize;
Expand Down
74 changes: 62 additions & 12 deletions packages/worker/src/unitOfWork/unitOfWork.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import { AsyncLocalStorage } from "node:async_hooks";

export declare type IsolationLevel = "READ UNCOMMITTED" | "READ COMMITTED" | "REPEATABLE READ" | "SERIALIZABLE";

export interface IDbTransaction {
waitForExecution: () => Promise<void>;
commit: () => Promise<void>;
ensureRollbackIfNotCommitted: () => Promise<void>;
}

@Injectable()
export class UnitOfWork {
private readonly logger: Logger;
Expand All @@ -28,14 +34,56 @@ export class UnitOfWork {
return queryRunner?.manager || this.entityManager;
}

public async useTransaction(
public useTransaction(
action: () => Promise<void>,
preventAutomaticCommit = false,
logContext?: Record<string, string | number>,
isolationLevel?: IsolationLevel
): Promise<void> {
): IDbTransaction {
const queryRunner: QueryRunner = this.dataSource.createQueryRunner();

await this.asyncLocalStorage.run(
let isReleased = false;

const release = async () => {
this.logger.debug({ message: "Releasing the unit of work", ...logContext });
await queryRunner.release();
};

const commit = async () => {
if (isReleased) {
throw new Error("The transaction cannot be committed as it connection is released");
}
isReleased = true;
try {
this.logger.debug({ message: "Committing the transaction", ...logContext });
const stopDbCommitDurationMeasuring = this.dbCommitDurationMetric.startTimer();
await queryRunner.commitTransaction();
stopDbCommitDurationMeasuring();
} catch (error) {
this.logger.error(
{ message: "Error while committing the transaction. Rolling back", ...logContext },
error.stack
);
await queryRunner.rollbackTransaction();
throw error;
} finally {
await release();
}
};

const rollback = async () => {
if (isReleased) {
return;
}
isReleased = true;
try {
await queryRunner.rollbackTransaction();
} finally {
await release();
}
};

const executionPromise = this.asyncLocalStorage.run(
{
queryRunner,
},
Expand All @@ -45,23 +93,25 @@ export class UnitOfWork {

try {
await action();

this.logger.debug({ message: "Committing the transaction", ...logContext });
const stopDbCommitDurationMeasuring = this.dbCommitDurationMetric.startTimer();
await queryRunner.commitTransaction();
stopDbCommitDurationMeasuring();
} catch (error) {
this.logger.error(
{ message: "Error while processing the transaction. Rolling back", ...logContext },
error.stack
);
await queryRunner.rollbackTransaction();
await rollback();
throw error;
} finally {
this.logger.debug({ message: "Releasing the unit of work", ...logContext });
await queryRunner.release();
}

if (!preventAutomaticCommit) {
await commit();
}
}
);

return {
waitForExecution: () => executionPromise,
commit,
ensureRollbackIfNotCommitted: rollback,
};
}
}
14 changes: 14 additions & 0 deletions packages/worker/src/utils/splitIntoChunks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export default <T>(array: T[], chunkSize = 10) => {
const chunks = [];
let currentChunk = 0;
for (let i = 0; i < array.length; i++) {
if (chunks[currentChunk] && chunks[currentChunk].length === chunkSize) {
currentChunk++;
}
if (!chunks[currentChunk]) {
chunks[currentChunk] = [];
}
chunks[currentChunk].push(array[i]);
}
return chunks;
};

0 comments on commit f44e956

Please sign in to comment.