diff --git a/packages/worker/.env.example b/packages/worker/.env.example index 7402762a37..911f75ea07 100644 --- a/packages/worker/.env.example +++ b/packages/worker/.env.example @@ -12,6 +12,7 @@ BLOCKCHAIN_RPC_URL=http://localhost:3050 WAIT_FOR_BLOCKS_INTERVAL=1000 BLOCKS_PROCESSING_BATCH_SIZE=10 +NUMBER_OF_BLOCKS_PER_DB_TRANSACTION=10 BATCHES_PROCESSING_POLLING_INTERVAL=60000 DELETE_BALANCES_INTERVAL=300000 diff --git a/packages/worker/src/block/block.processor.ts b/packages/worker/src/block/block.processor.ts index 880e4db655..020dafa15d 100644 --- a/packages/worker/src/block/block.processor.ts +++ b/packages/worker/src/block/block.processor.ts @@ -4,7 +4,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Histogram } from "prom-client"; import { MoreThanOrEqual, LessThanOrEqual, Between, FindOptionsWhere } from "typeorm"; -import { UnitOfWork } from "../unitOfWork"; +import { IDbTransaction, UnitOfWork } from "../unitOfWork"; import { BlockchainService } from "../blockchain/blockchain.service"; import { BlockInfo, BlockWatcher } from "./block.watcher"; import { BalanceService } from "../balance/balance.service"; @@ -14,6 +14,7 @@ import { Block } from "../entities"; import { TransactionProcessor } from "../transaction"; import { LogProcessor } from "../log"; import { validateBlocksLinking } from "./block.utils"; +import splitIntoChunks from "../utils/splitIntoChunks"; import { BLOCKS_BATCH_PROCESSING_DURATION_METRIC_NAME, BLOCK_PROCESSING_DURATION_METRIC_NAME, @@ -29,6 +30,7 @@ export class BlockProcessor { private readonly fromBlock: number; private readonly toBlock: number; private readonly disableBlocksRevert: boolean; + private readonly numberOfBlocksPerDbTransaction: number; public constructor( private readonly unitOfWork: UnitOfWork, @@ -52,6 +54,7 @@ export class BlockProcessor { this.fromBlock = configService.get("blocks.fromBlock"); this.toBlock = configService.get("blocks.toBlock"); this.disableBlocksRevert = configService.get("blocks.disableBlocksRevert"); + this.numberOfBlocksPerDbTransaction = configService.get("blocks.numberOfBlocksPerDbTransaction"); } public async processNextBlocksRange(): Promise { @@ -108,12 +111,25 @@ export class BlockProcessor { } const stopDurationMeasuring = this.blocksBatchProcessingDurationMetric.startTimer(); + let dbTransactions: IDbTransaction[] = []; + try { - await this.unitOfWork.useTransaction(async () => { - await Promise.all(blocksToProcess.map((blockInfo) => this.addBlock(blockInfo))); - }); + const blocksToProcessChunks = splitIntoChunks(blocksToProcess, this.numberOfBlocksPerDbTransaction); + + dbTransactions = blocksToProcessChunks.map((blocksToProcessChunk) => + this.unitOfWork.useTransaction(async () => { + await Promise.all(blocksToProcessChunk.map((blockInfo) => this.addBlock(blockInfo))); + }, true) + ); + await Promise.all(dbTransactions.map((t) => t.waitForExecution())); + + // sequentially commit transactions to preserve blocks order in DB + for (const dbTransaction of dbTransactions) { + await dbTransaction.commit(); + } stopDurationMeasuring({ status: "success" }); } catch (error) { + await Promise.all(dbTransactions.map((dbTransaction) => dbTransaction.ensureRollbackIfNotCommitted())); stopDurationMeasuring({ status: "error" }); throw error; } diff --git a/packages/worker/src/blocksRevert/blocksRevert.service.ts b/packages/worker/src/blocksRevert/blocksRevert.service.ts index 602d299007..497cd4fb5d 100644 --- a/packages/worker/src/blocksRevert/blocksRevert.service.ts +++ b/packages/worker/src/blocksRevert/blocksRevert.service.ts @@ -56,7 +56,7 @@ export class BlocksRevertService { } } - await this.unitOfWork.useTransaction(async () => { + const dbTransaction = this.unitOfWork.useTransaction(async () => { this.logger.log("Reverting counters", { lastCorrectBlockNumber }); await this.counterService.revert(lastCorrectBlockNumber); @@ -66,6 +66,7 @@ export class BlocksRevertService { this.blockRepository.delete({ number: MoreThan(lastCorrectBlockNumber) }), ]); }); + await dbTransaction.waitForExecution(); this.logger.log("Blocks revert completed", { lastCorrectBlockNumber }); } catch (error) { diff --git a/packages/worker/src/config.spec.ts b/packages/worker/src/config.spec.ts index 96bcd8ec64..0d1150f410 100644 --- a/packages/worker/src/config.spec.ts +++ b/packages/worker/src/config.spec.ts @@ -30,6 +30,7 @@ describe("config", () => { fromBlock: 0, toBlock: null, disableBlocksRevert: false, + numberOfBlocksPerDbTransaction: 50, }, batches: { batchesProcessingPollingInterval: 60000, diff --git a/packages/worker/src/config.ts b/packages/worker/src/config.ts index 1582950bcd..2bb1d81963 100644 --- a/packages/worker/src/config.ts +++ b/packages/worker/src/config.ts @@ -11,6 +11,7 @@ export default () => { USE_WEBSOCKETS_FOR_TRANSACTIONS, WAIT_FOR_BLOCKS_INTERVAL, BLOCKS_PROCESSING_BATCH_SIZE, + NUMBER_OF_BLOCKS_PER_DB_TRANSACTION, BATCHES_PROCESSING_POLLING_INTERVAL, DELETE_BALANCES_INTERVAL, COUNTERS_PROCESSING_POLLING_INTERVAL, @@ -49,6 +50,7 @@ export default () => { fromBlock: parseInt(FROM_BLOCK, 10) || 0, toBlock: parseInt(TO_BLOCK, 10) || null, disableBlocksRevert: DISABLE_BLOCKS_REVERT === "true", + numberOfBlocksPerDbTransaction: parseInt(NUMBER_OF_BLOCKS_PER_DB_TRANSACTION, 10) || 50, }, batches: { batchesProcessingPollingInterval: parseInt(BATCHES_PROCESSING_POLLING_INTERVAL, 10) || 60000, diff --git a/packages/worker/src/counter/counter.processor.ts b/packages/worker/src/counter/counter.processor.ts index 97129ac840..34d97306fb 100644 --- a/packages/worker/src/counter/counter.processor.ts +++ b/packages/worker/src/counter/counter.processor.ts @@ -61,9 +61,10 @@ export class CounterProcessor { const counters = calculateCounters(this.tableName, records, this.criteriaList); const newLastProcessedRecordNumber = Number(records[records.length - 1].number); - await this.unitOfWork.useTransaction(() => + const dbTransaction = this.unitOfWork.useTransaction(() => this.counterRepository.incrementCounters(counters, newLastProcessedRecordNumber) ); + await dbTransaction.waitForExecution(); this.lastProcessedRecordNumber = newLastProcessedRecordNumber; return records.length === this.recordsBatchSize; diff --git a/packages/worker/src/unitOfWork/unitOfWork.provider.ts b/packages/worker/src/unitOfWork/unitOfWork.provider.ts index 948a62c371..824643e663 100644 --- a/packages/worker/src/unitOfWork/unitOfWork.provider.ts +++ b/packages/worker/src/unitOfWork/unitOfWork.provider.ts @@ -7,6 +7,12 @@ import { AsyncLocalStorage } from "node:async_hooks"; export declare type IsolationLevel = "READ UNCOMMITTED" | "READ COMMITTED" | "REPEATABLE READ" | "SERIALIZABLE"; +export interface IDbTransaction { + waitForExecution: () => Promise; + commit: () => Promise; + ensureRollbackIfNotCommitted: () => Promise; +} + @Injectable() export class UnitOfWork { private readonly logger: Logger; @@ -28,14 +34,56 @@ export class UnitOfWork { return queryRunner?.manager || this.entityManager; } - public async useTransaction( + public useTransaction( action: () => Promise, + preventAutomaticCommit = false, logContext?: Record, isolationLevel?: IsolationLevel - ): Promise { + ): IDbTransaction { const queryRunner: QueryRunner = this.dataSource.createQueryRunner(); - await this.asyncLocalStorage.run( + let isReleased = false; + + const release = async () => { + this.logger.debug({ message: "Releasing the unit of work", ...logContext }); + await queryRunner.release(); + }; + + const commit = async () => { + if (isReleased) { + throw new Error("The transaction cannot be committed as it connection is released"); + } + isReleased = true; + try { + this.logger.debug({ message: "Committing the transaction", ...logContext }); + const stopDbCommitDurationMeasuring = this.dbCommitDurationMetric.startTimer(); + await queryRunner.commitTransaction(); + stopDbCommitDurationMeasuring(); + } catch (error) { + this.logger.error( + { message: "Error while committing the transaction. Rolling back", ...logContext }, + error.stack + ); + await queryRunner.rollbackTransaction(); + throw error; + } finally { + await release(); + } + }; + + const rollback = async () => { + if (isReleased) { + return; + } + isReleased = true; + try { + await queryRunner.rollbackTransaction(); + } finally { + await release(); + } + }; + + const executionPromise = this.asyncLocalStorage.run( { queryRunner, }, @@ -45,23 +93,25 @@ export class UnitOfWork { try { await action(); - - this.logger.debug({ message: "Committing the transaction", ...logContext }); - const stopDbCommitDurationMeasuring = this.dbCommitDurationMetric.startTimer(); - await queryRunner.commitTransaction(); - stopDbCommitDurationMeasuring(); } catch (error) { this.logger.error( { message: "Error while processing the transaction. Rolling back", ...logContext }, error.stack ); - await queryRunner.rollbackTransaction(); + await rollback(); throw error; - } finally { - this.logger.debug({ message: "Releasing the unit of work", ...logContext }); - await queryRunner.release(); + } + + if (!preventAutomaticCommit) { + await commit(); } } ); + + return { + waitForExecution: () => executionPromise, + commit, + ensureRollbackIfNotCommitted: rollback, + }; } } diff --git a/packages/worker/src/utils/splitIntoChunks.ts b/packages/worker/src/utils/splitIntoChunks.ts new file mode 100644 index 0000000000..b1f55a4121 --- /dev/null +++ b/packages/worker/src/utils/splitIntoChunks.ts @@ -0,0 +1,14 @@ +export default (array: T[], chunkSize = 10) => { + const chunks = []; + let currentChunk = 0; + for (let i = 0; i < array.length; i++) { + if (chunks[currentChunk] && chunks[currentChunk].length === chunkSize) { + currentChunk++; + } + if (!chunks[currentChunk]) { + chunks[currentChunk] = []; + } + chunks[currentChunk].push(array[i]); + } + return chunks; +};