diff --git a/packages/worker/package.json b/packages/worker/package.json index bf5368de2d..cfbb583e14 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -19,7 +19,7 @@ "test": "jest", "test:watch": "jest --watch", "test:cov": "jest --coverage", - "test:ci": "echo tests are disabled on ci temporarily", + "test:ci": "jest --coverage", "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", "test:e2e": "jest --config ./test/jest-e2e.json", "typeorm": "typeorm-ts-node-commonjs", diff --git a/packages/worker/src/block/block.processor.spec.ts b/packages/worker/src/block/block.processor.spec.ts index eda82a36ea..7e6fc7440c 100644 --- a/packages/worker/src/block/block.processor.spec.ts +++ b/packages/worker/src/block/block.processor.spec.ts @@ -21,6 +21,9 @@ describe("BlockProcessor", () => { let blockProcessor: BlockProcessor; let blockWatcherMock: BlockWatcher; let unitOfWorkMock: UnitOfWork; + let waitForTransactionExecutionMock: jest.Mock; + let commitTransactionMock: jest.Mock; + let ensureRollbackIfNotCommittedTransactionMock: jest.Mock; let blockchainServiceMock: BlockchainService; let transactionProcessorMock: TransactionProcessor; let logProcessorMock: LogProcessor; @@ -110,8 +113,15 @@ describe("BlockProcessor", () => { }; beforeEach(async () => { + waitForTransactionExecutionMock = jest.fn(); + commitTransactionMock = jest.fn(); + ensureRollbackIfNotCommittedTransactionMock = jest.fn(); unitOfWorkMock = mock({ - useTransaction: jest.fn().mockImplementation((action: () => Promise) => action()), + useTransaction: jest.fn().mockImplementation((action: () => Promise) => ({ + waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(action()), + commit: commitTransactionMock.mockResolvedValue(null), + ensureRollbackIfNotCommitted: ensureRollbackIfNotCommittedTransactionMock.mockResolvedValue(null), + })), }); blockWatcherMock = mock({ getNextBlocksToProcess: jest.fn().mockResolvedValue([]), @@ -492,9 +502,11 @@ describe("BlockProcessor", () => { expect(startBlocksBatchDurationMetricMock).toHaveBeenCalledTimes(1); }); - it("uses transaction when adding blocks", async () => { + it("uses transaction with disabled automatic commit when adding blocks", async () => { await blockProcessor.processNextBlocksRange(); expect(unitOfWorkMock.useTransaction).toHaveBeenCalledTimes(1); + expect((unitOfWorkMock.useTransaction as jest.Mock).mock.calls[0][1]).toBe(true); + expect(waitForTransactionExecutionMock).toBeCalledTimes(1); }); it("starts the duration metric", async () => { @@ -524,6 +536,11 @@ describe("BlockProcessor", () => { ); }); + it("commits db transactions after execution", async () => { + await blockProcessor.processNextBlocksRange(); + expect(commitTransactionMock).toBeCalledTimes(1); + }); + describe("when processing fails with an error", () => { beforeEach(() => { jest.spyOn(blockRepositoryMock, "add").mockRejectedValue(new Error("getBlock error")); @@ -574,6 +591,16 @@ describe("BlockProcessor", () => { expect(balanceServiceMock.clearTrackedState).toHaveBeenCalledTimes(1); } }); + + it("does not commit db transactions", async () => { + await Promise.allSettled([blockProcessor.processNextBlocksRange()]); + expect(commitTransactionMock).not.toBeCalled(); + }); + + it("ensures all the db transactions for a given batch of blocks are reverted if not committed", async () => { + await Promise.allSettled([blockProcessor.processNextBlocksRange()]); + expect(ensureRollbackIfNotCommittedTransactionMock).toBeCalledTimes(1); + }); }); describe("when block does not contain transactions", () => { diff --git a/packages/worker/src/blocksRevert/blocksRevert.service.spec.ts b/packages/worker/src/blocksRevert/blocksRevert.service.spec.ts index 0f9dcfa0cf..e415487368 100644 --- a/packages/worker/src/blocksRevert/blocksRevert.service.spec.ts +++ b/packages/worker/src/blocksRevert/blocksRevert.service.spec.ts @@ -17,6 +17,7 @@ describe("BlocksRevertService", () => { let blockRepositoryMock: BlockRepository; let counterServiceMock: CounterService; let unitOfWorkMock: UnitOfWork; + let waitForTransactionExecutionMock: jest.Mock; let revertDurationMetricMock: jest.Mock; let stopRevertDurationMetricMock: jest.Mock; @@ -24,8 +25,13 @@ describe("BlocksRevertService", () => { let stopRevertDetectMetricMock: jest.Mock; beforeEach(async () => { + waitForTransactionExecutionMock = jest.fn(); unitOfWorkMock = mock({ - useTransaction: jest.fn().mockImplementation((action: () => Promise) => action()), + useTransaction: jest.fn().mockImplementation((action: () => Promise) => ({ + waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(action()), + commit: jest.fn().mockResolvedValue(null), + ensureRollbackIfNotCommitted: jest.fn().mockResolvedValue(null), + })), }); blockchainServiceMock = mock({ getL1BatchDetails: jest.fn().mockResolvedValue(null), diff --git a/packages/worker/src/counter/counter.processor.spec.ts b/packages/worker/src/counter/counter.processor.spec.ts index c5d51ad09e..d16e8f1138 100644 --- a/packages/worker/src/counter/counter.processor.spec.ts +++ b/packages/worker/src/counter/counter.processor.spec.ts @@ -20,6 +20,7 @@ describe("CounterProcessor", () => { let repositoryMock: Repository; let counterRepositoryMock: CounterRepository; let unitOfWorkMock: UnitOfWork; + let waitForTransactionExecutionMock: jest.Mock; let counterProcessor: CounterProcessor; beforeEach(() => { @@ -31,8 +32,13 @@ describe("CounterProcessor", () => { decrementCounters: jest.fn().mockResolvedValue(null), getLastProcessedRecordNumber: jest.fn().mockResolvedValue(-1), }); + waitForTransactionExecutionMock = jest.fn(); unitOfWorkMock = mock({ - useTransaction: jest.fn().mockImplementation((fn) => fn()), + useTransaction: jest.fn().mockImplementation((fn) => ({ + waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(fn()), + commit: jest.fn().mockResolvedValue(null), + ensureRollbackIfNotCommitted: jest.fn().mockResolvedValue(null), + })), }); counterProcessor = new CounterProcessor( Transaction, @@ -121,6 +127,7 @@ describe("CounterProcessor", () => { await counterProcessorWithNoCriteria.processNextRecordsBatch(); expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1); + expect(waitForTransactionExecutionMock).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledWith( [ @@ -167,6 +174,7 @@ describe("CounterProcessor", () => { await counterProcessor.processNextRecordsBatch(); expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1); + expect(waitForTransactionExecutionMock).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledWith( [ @@ -231,6 +239,7 @@ describe("CounterProcessor", () => { await counterProcessor.processNextRecordsBatch(); expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1); + expect(waitForTransactionExecutionMock).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1); expect(counterRepositoryMock.incrementCounters).toBeCalledWith( [ diff --git a/packages/worker/src/rpcProvider/webSocketProviderExtended.ts b/packages/worker/src/rpcProvider/webSocketProviderExtended.ts index d5f949f37f..0d777bc4b7 100644 --- a/packages/worker/src/rpcProvider/webSocketProviderExtended.ts +++ b/packages/worker/src/rpcProvider/webSocketProviderExtended.ts @@ -86,7 +86,7 @@ export class WebSocketProviderExtended extends providers.WebSocketProvider { message: "No response for the ping request. Web socket connection will be terminated", context: WebSocketProviderExtended.name, }); - //this._websocket.terminate(); + this._websocket.terminate(); }, expectedPongBack); if (Object.keys(this._requests).length > pendingRequestsLimit) { diff --git a/packages/worker/src/unitOfWork/unitOfWork.provider.spec.ts b/packages/worker/src/unitOfWork/unitOfWork.provider.spec.ts index 9c1a81becf..6b509a437a 100644 --- a/packages/worker/src/unitOfWork/unitOfWork.provider.spec.ts +++ b/packages/worker/src/unitOfWork/unitOfWork.provider.spec.ts @@ -59,28 +59,32 @@ describe("UnitOfWork", () => { describe("when UnitOfWork instance has queryRunner in scope", () => { it("returns entity manager from queryRunner", async () => { expect.assertions(1); - await unitOfWork.useTransaction(async () => { - const manager = unitOfWork.getTransactionManager(); - expect(manager).toEqual(entityManager); - }); + await unitOfWork + .useTransaction(async () => { + const manager = unitOfWork.getTransactionManager(); + expect(manager).toEqual(entityManager); + }) + .waitForExecution(); }); it("returns the same entity manager from queryRunner for nested transaction calls", async () => { expect.assertions(3); - await unitOfWork.useTransaction(async () => { - const manager1 = unitOfWork.getTransactionManager(); - expect(manager1).toEqual(entityManager); - - await (async () => { - const manager2 = unitOfWork.getTransactionManager(); - expect(manager2).toEqual(entityManager); + await unitOfWork + .useTransaction(async () => { + const manager1 = unitOfWork.getTransactionManager(); + expect(manager1).toEqual(entityManager); await (async () => { - const manager3 = unitOfWork.getTransactionManager(); - expect(manager3).toEqual(entityManager); + const manager2 = unitOfWork.getTransactionManager(); + expect(manager2).toEqual(entityManager); + + await (async () => { + const manager3 = unitOfWork.getTransactionManager(); + expect(manager3).toEqual(entityManager); + })(); })(); - })(); - }); + }) + .waitForExecution(); }); describe("when there are multiple concurrent transactions", () => { @@ -108,15 +112,19 @@ describe("UnitOfWork", () => { let manager2; transactionActions.push( - unitOfWork.useTransaction(async () => { - manager1 = unitOfWork.getTransactionManager(); - }) + unitOfWork + .useTransaction(async () => { + manager1 = unitOfWork.getTransactionManager(); + }) + .waitForExecution() ); transactionActions.push( - unitOfWork.useTransaction(async () => { - manager2 = unitOfWork.getTransactionManager(); - }) + unitOfWork + .useTransaction(async () => { + manager2 = unitOfWork.getTransactionManager(); + }) + .waitForExecution() ); await Promise.all(transactionActions); @@ -136,71 +144,125 @@ describe("UnitOfWork", () => { }); describe("useTransaction", () => { - // eslint-disable-next-line @typescript-eslint/no-empty-function - const emptyAction = async () => {}; + const emptyAction = jest.fn().mockResolvedValue(null); it("connects the query runner", async () => { - await unitOfWork.useTransaction(emptyAction); + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); expect(queryRunner.connect).toHaveBeenCalledTimes(1); }); it("starts the transaction with the specified isolation level", async () => { - await unitOfWork.useTransaction(emptyAction, null, "SERIALIZABLE"); + const transaction = unitOfWork.useTransaction(emptyAction, false, null, "SERIALIZABLE"); + await transaction.waitForExecution(); expect(queryRunner.startTransaction).toHaveBeenCalledTimes(1); expect(queryRunner.startTransaction).toHaveBeenCalledWith("SERIALIZABLE"); }); - it("starts db commit duration metric", async () => { - await unitOfWork.useTransaction(emptyAction); - expect(startDbCommitDurationMetricMock).toHaveBeenCalledTimes(1); + describe("when preventAutomaticCommit is set to false", () => { + it("starts db commit duration metric", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + expect(startDbCommitDurationMetricMock).toHaveBeenCalledTimes(1); + }); + + it("commits the transaction", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1); + }); + + it("releases the transaction", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + expect(queryRunner.release).toHaveBeenCalledTimes(1); + }); + + it("stops db commit duration metric", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + expect(stopDbCommitDurationMetricMock).toHaveBeenCalledTimes(1); + }); }); - it("commits the transaction", async () => { - await unitOfWork.useTransaction(emptyAction); - expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1); + describe("when preventAutomaticCommit is set to true", () => { + it("does not commit transaction automatically", async () => { + const transaction = unitOfWork.useTransaction(emptyAction, true); + await transaction.waitForExecution(); + expect(queryRunner.commitTransaction).not.toHaveBeenCalled(); + }); + + it("commits transaction when commit is called from outside", async () => { + const transaction = unitOfWork.useTransaction(emptyAction, true); + await transaction.waitForExecution(); + await transaction.commit(); + expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1); + }); + + it("reverts transaction when ensureRollbackIfNotCommitted is called from outside", async () => { + const transaction = unitOfWork.useTransaction(emptyAction, true); + await transaction.waitForExecution(); + await transaction.ensureRollbackIfNotCommitted(); + expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1); + }); }); - it("releases the transaction", async () => { - await unitOfWork.useTransaction(emptyAction); - expect(queryRunner.release).toHaveBeenCalledTimes(1); + it("throws an error when commit is called for already committed transaction", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + await expect(transaction.commit()).rejects.toThrowError( + new Error("The transaction cannot be committed as it connection is released") + ); }); - it("stops db commit duration metric", async () => { - await unitOfWork.useTransaction(emptyAction); - expect(stopDbCommitDurationMetricMock).toHaveBeenCalledTimes(1); + it("does not try to rollback already committed transaction", async () => { + const transaction = unitOfWork.useTransaction(emptyAction); + await transaction.waitForExecution(); + await transaction.ensureRollbackIfNotCommitted(); + expect(queryRunner.rollbackTransaction).not.toBeCalled(); }); describe("when action throws an error", () => { - const errorAction = () => { - throw new Error("DB call error"); - }; + const error = new Error("DB call error"); + const errorAction = jest.fn().mockRejectedValue(error); it("rollbacks the transaction", async () => { - expect.assertions(1); - - try { - await unitOfWork.useTransaction(errorAction); - } catch { - expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1); - } + const transaction = unitOfWork.useTransaction(errorAction); + await Promise.allSettled([transaction.waitForExecution()]); + expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1); }); it("releases the transaction", async () => { - try { - await unitOfWork.useTransaction(errorAction); - } catch { - expect(queryRunner.release).toHaveBeenCalledTimes(1); - } + const transaction = unitOfWork.useTransaction(errorAction); + await Promise.allSettled([transaction.waitForExecution()]); + expect(queryRunner.release).toHaveBeenCalledTimes(1); }); it("throws generated error", async () => { - expect.assertions(2); - - try { - await unitOfWork.useTransaction(errorAction); - } catch (error) { - expect(error).toBeInstanceOf(Error); - expect(error.message).toBe("DB call error"); - } + const transaction = unitOfWork.useTransaction(errorAction); + await expect(transaction.waitForExecution()).rejects.toThrowError(error); + }); + }); + + describe("when commit transaction fails", () => { + beforeEach(() => { + jest.spyOn(queryRunner, "commitTransaction").mockRejectedValue(new Error("Failed to commit")); + }); + + it("rollbacks the transaction", async () => { + const transaction = unitOfWork.useTransaction(jest.fn()); + await Promise.allSettled([transaction.waitForExecution()]); + expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1); + }); + + it("releases the transaction", async () => { + const transaction = unitOfWork.useTransaction(jest.fn()); + await Promise.allSettled([transaction.waitForExecution()]); + expect(queryRunner.release).toHaveBeenCalledTimes(1); + }); + + it("throws error", async () => { + const transaction = unitOfWork.useTransaction(jest.fn()); + await expect(transaction.waitForExecution()).rejects.toThrowError(new Error("Failed to commit")); }); }); }); diff --git a/packages/worker/src/utils/splitIntoChunks.spec.ts b/packages/worker/src/utils/splitIntoChunks.spec.ts new file mode 100644 index 0000000000..dff61bbe1b --- /dev/null +++ b/packages/worker/src/utils/splitIntoChunks.spec.ts @@ -0,0 +1,16 @@ +import splitIntoChunks from "./splitIntoChunks"; + +describe("splitIntoChunks", () => { + it("splits array into chunks", () => { + const chunks = splitIntoChunks([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3); + expect(chunks).toEqual([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]); + }); + + it("splits array into chunks with size 10 by default", () => { + const chunks = splitIntoChunks([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]); + expect(chunks).toEqual([ + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [11, 12], + ]); + }); +});