Skip to content

Commit

Permalink
fix: tests for unit of work changes (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
Romsters authored Jan 3, 2024
1 parent 64c1026 commit 82491ac
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 66 deletions.
2 changes: 1 addition & 1 deletion packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"test": "jest",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:ci": "echo tests are disabled on ci temporarily",
"test:ci": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json",
"typeorm": "typeorm-ts-node-commonjs",
Expand Down
31 changes: 29 additions & 2 deletions packages/worker/src/block/block.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ describe("BlockProcessor", () => {
let blockProcessor: BlockProcessor;
let blockWatcherMock: BlockWatcher;
let unitOfWorkMock: UnitOfWork;
let waitForTransactionExecutionMock: jest.Mock;
let commitTransactionMock: jest.Mock;
let ensureRollbackIfNotCommittedTransactionMock: jest.Mock;
let blockchainServiceMock: BlockchainService;
let transactionProcessorMock: TransactionProcessor;
let logProcessorMock: LogProcessor;
Expand Down Expand Up @@ -110,8 +113,15 @@ describe("BlockProcessor", () => {
};

beforeEach(async () => {
waitForTransactionExecutionMock = jest.fn();
commitTransactionMock = jest.fn();
ensureRollbackIfNotCommittedTransactionMock = jest.fn();
unitOfWorkMock = mock<UnitOfWork>({
useTransaction: jest.fn().mockImplementation((action: () => Promise<void>) => action()),
useTransaction: jest.fn().mockImplementation((action: () => Promise<void>) => ({
waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(action()),
commit: commitTransactionMock.mockResolvedValue(null),
ensureRollbackIfNotCommitted: ensureRollbackIfNotCommittedTransactionMock.mockResolvedValue(null),
})),
});
blockWatcherMock = mock<BlockWatcher>({
getNextBlocksToProcess: jest.fn().mockResolvedValue([]),
Expand Down Expand Up @@ -492,9 +502,11 @@ describe("BlockProcessor", () => {
expect(startBlocksBatchDurationMetricMock).toHaveBeenCalledTimes(1);
});

it("uses transaction when adding blocks", async () => {
it("uses transaction with disabled automatic commit when adding blocks", async () => {
await blockProcessor.processNextBlocksRange();
expect(unitOfWorkMock.useTransaction).toHaveBeenCalledTimes(1);
expect((unitOfWorkMock.useTransaction as jest.Mock).mock.calls[0][1]).toBe(true);
expect(waitForTransactionExecutionMock).toBeCalledTimes(1);
});

it("starts the duration metric", async () => {
Expand Down Expand Up @@ -524,6 +536,11 @@ describe("BlockProcessor", () => {
);
});

it("commits db transactions after execution", async () => {
await blockProcessor.processNextBlocksRange();
expect(commitTransactionMock).toBeCalledTimes(1);
});

describe("when processing fails with an error", () => {
beforeEach(() => {
jest.spyOn(blockRepositoryMock, "add").mockRejectedValue(new Error("getBlock error"));
Expand Down Expand Up @@ -574,6 +591,16 @@ describe("BlockProcessor", () => {
expect(balanceServiceMock.clearTrackedState).toHaveBeenCalledTimes(1);
}
});

it("does not commit db transactions", async () => {
await Promise.allSettled([blockProcessor.processNextBlocksRange()]);
expect(commitTransactionMock).not.toBeCalled();
});

it("ensures all the db transactions for a given batch of blocks are reverted if not committed", async () => {
await Promise.allSettled([blockProcessor.processNextBlocksRange()]);
expect(ensureRollbackIfNotCommittedTransactionMock).toBeCalledTimes(1);
});
});

describe("when block does not contain transactions", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ describe("BlocksRevertService", () => {
let blockRepositoryMock: BlockRepository;
let counterServiceMock: CounterService;
let unitOfWorkMock: UnitOfWork;
let waitForTransactionExecutionMock: jest.Mock;
let revertDurationMetricMock: jest.Mock;
let stopRevertDurationMetricMock: jest.Mock;

let revertDetectMetricMock: jest.Mock;
let stopRevertDetectMetricMock: jest.Mock;

beforeEach(async () => {
waitForTransactionExecutionMock = jest.fn();
unitOfWorkMock = mock<UnitOfWork>({
useTransaction: jest.fn().mockImplementation((action: () => Promise<void>) => action()),
useTransaction: jest.fn().mockImplementation((action: () => Promise<void>) => ({
waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(action()),
commit: jest.fn().mockResolvedValue(null),
ensureRollbackIfNotCommitted: jest.fn().mockResolvedValue(null),
})),
});
blockchainServiceMock = mock<BlockchainService>({
getL1BatchDetails: jest.fn().mockResolvedValue(null),
Expand Down
11 changes: 10 additions & 1 deletion packages/worker/src/counter/counter.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe("CounterProcessor", () => {
let repositoryMock: Repository<Transaction>;
let counterRepositoryMock: CounterRepository;
let unitOfWorkMock: UnitOfWork;
let waitForTransactionExecutionMock: jest.Mock;
let counterProcessor: CounterProcessor<Transaction>;

beforeEach(() => {
Expand All @@ -31,8 +32,13 @@ describe("CounterProcessor", () => {
decrementCounters: jest.fn().mockResolvedValue(null),
getLastProcessedRecordNumber: jest.fn().mockResolvedValue(-1),
});
waitForTransactionExecutionMock = jest.fn();
unitOfWorkMock = mock<UnitOfWork>({
useTransaction: jest.fn().mockImplementation((fn) => fn()),
useTransaction: jest.fn().mockImplementation((fn) => ({
waitForExecution: waitForTransactionExecutionMock.mockResolvedValue(fn()),
commit: jest.fn().mockResolvedValue(null),
ensureRollbackIfNotCommitted: jest.fn().mockResolvedValue(null),
})),
});
counterProcessor = new CounterProcessor<Transaction>(
Transaction,
Expand Down Expand Up @@ -121,6 +127,7 @@ describe("CounterProcessor", () => {
await counterProcessorWithNoCriteria.processNextRecordsBatch();

expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1);
expect(waitForTransactionExecutionMock).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledWith(
[
Expand Down Expand Up @@ -167,6 +174,7 @@ describe("CounterProcessor", () => {
await counterProcessor.processNextRecordsBatch();

expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1);
expect(waitForTransactionExecutionMock).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledWith(
[
Expand Down Expand Up @@ -231,6 +239,7 @@ describe("CounterProcessor", () => {
await counterProcessor.processNextRecordsBatch();

expect(unitOfWorkMock.useTransaction).toBeCalledTimes(1);
expect(waitForTransactionExecutionMock).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledTimes(1);
expect(counterRepositoryMock.incrementCounters).toBeCalledWith(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class WebSocketProviderExtended extends providers.WebSocketProvider {
message: "No response for the ping request. Web socket connection will be terminated",
context: WebSocketProviderExtended.name,
});
//this._websocket.terminate();
this._websocket.terminate();
}, expectedPongBack);

if (Object.keys(this._requests).length > pendingRequestsLimit) {
Expand Down
182 changes: 122 additions & 60 deletions packages/worker/src/unitOfWork/unitOfWork.provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,32 @@ describe("UnitOfWork", () => {
describe("when UnitOfWork instance has queryRunner in scope", () => {
it("returns entity manager from queryRunner", async () => {
expect.assertions(1);
await unitOfWork.useTransaction(async () => {
const manager = unitOfWork.getTransactionManager();
expect(manager).toEqual(entityManager);
});
await unitOfWork
.useTransaction(async () => {
const manager = unitOfWork.getTransactionManager();
expect(manager).toEqual(entityManager);
})
.waitForExecution();
});

it("returns the same entity manager from queryRunner for nested transaction calls", async () => {
expect.assertions(3);
await unitOfWork.useTransaction(async () => {
const manager1 = unitOfWork.getTransactionManager();
expect(manager1).toEqual(entityManager);

await (async () => {
const manager2 = unitOfWork.getTransactionManager();
expect(manager2).toEqual(entityManager);
await unitOfWork
.useTransaction(async () => {
const manager1 = unitOfWork.getTransactionManager();
expect(manager1).toEqual(entityManager);

await (async () => {
const manager3 = unitOfWork.getTransactionManager();
expect(manager3).toEqual(entityManager);
const manager2 = unitOfWork.getTransactionManager();
expect(manager2).toEqual(entityManager);

await (async () => {
const manager3 = unitOfWork.getTransactionManager();
expect(manager3).toEqual(entityManager);
})();
})();
})();
});
})
.waitForExecution();
});

describe("when there are multiple concurrent transactions", () => {
Expand Down Expand Up @@ -108,15 +112,19 @@ describe("UnitOfWork", () => {
let manager2;

transactionActions.push(
unitOfWork.useTransaction(async () => {
manager1 = unitOfWork.getTransactionManager();
})
unitOfWork
.useTransaction(async () => {
manager1 = unitOfWork.getTransactionManager();
})
.waitForExecution()
);

transactionActions.push(
unitOfWork.useTransaction(async () => {
manager2 = unitOfWork.getTransactionManager();
})
unitOfWork
.useTransaction(async () => {
manager2 = unitOfWork.getTransactionManager();
})
.waitForExecution()
);
await Promise.all(transactionActions);

Expand All @@ -136,71 +144,125 @@ describe("UnitOfWork", () => {
});

describe("useTransaction", () => {
// eslint-disable-next-line @typescript-eslint/no-empty-function
const emptyAction = async () => {};
const emptyAction = jest.fn().mockResolvedValue(null);
it("connects the query runner", async () => {
await unitOfWork.useTransaction(emptyAction);
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
expect(queryRunner.connect).toHaveBeenCalledTimes(1);
});

it("starts the transaction with the specified isolation level", async () => {
await unitOfWork.useTransaction(emptyAction, null, "SERIALIZABLE");
const transaction = unitOfWork.useTransaction(emptyAction, false, null, "SERIALIZABLE");
await transaction.waitForExecution();
expect(queryRunner.startTransaction).toHaveBeenCalledTimes(1);
expect(queryRunner.startTransaction).toHaveBeenCalledWith("SERIALIZABLE");
});

it("starts db commit duration metric", async () => {
await unitOfWork.useTransaction(emptyAction);
expect(startDbCommitDurationMetricMock).toHaveBeenCalledTimes(1);
describe("when preventAutomaticCommit is set to false", () => {
it("starts db commit duration metric", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
expect(startDbCommitDurationMetricMock).toHaveBeenCalledTimes(1);
});

it("commits the transaction", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1);
});

it("releases the transaction", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
expect(queryRunner.release).toHaveBeenCalledTimes(1);
});

it("stops db commit duration metric", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
expect(stopDbCommitDurationMetricMock).toHaveBeenCalledTimes(1);
});
});

it("commits the transaction", async () => {
await unitOfWork.useTransaction(emptyAction);
expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1);
describe("when preventAutomaticCommit is set to true", () => {
it("does not commit transaction automatically", async () => {
const transaction = unitOfWork.useTransaction(emptyAction, true);
await transaction.waitForExecution();
expect(queryRunner.commitTransaction).not.toHaveBeenCalled();
});

it("commits transaction when commit is called from outside", async () => {
const transaction = unitOfWork.useTransaction(emptyAction, true);
await transaction.waitForExecution();
await transaction.commit();
expect(queryRunner.commitTransaction).toHaveBeenCalledTimes(1);
});

it("reverts transaction when ensureRollbackIfNotCommitted is called from outside", async () => {
const transaction = unitOfWork.useTransaction(emptyAction, true);
await transaction.waitForExecution();
await transaction.ensureRollbackIfNotCommitted();
expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1);
});
});

it("releases the transaction", async () => {
await unitOfWork.useTransaction(emptyAction);
expect(queryRunner.release).toHaveBeenCalledTimes(1);
it("throws an error when commit is called for already committed transaction", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
await expect(transaction.commit()).rejects.toThrowError(
new Error("The transaction cannot be committed as it connection is released")
);
});

it("stops db commit duration metric", async () => {
await unitOfWork.useTransaction(emptyAction);
expect(stopDbCommitDurationMetricMock).toHaveBeenCalledTimes(1);
it("does not try to rollback already committed transaction", async () => {
const transaction = unitOfWork.useTransaction(emptyAction);
await transaction.waitForExecution();
await transaction.ensureRollbackIfNotCommitted();
expect(queryRunner.rollbackTransaction).not.toBeCalled();
});

describe("when action throws an error", () => {
const errorAction = () => {
throw new Error("DB call error");
};
const error = new Error("DB call error");
const errorAction = jest.fn().mockRejectedValue(error);

it("rollbacks the transaction", async () => {
expect.assertions(1);

try {
await unitOfWork.useTransaction(errorAction);
} catch {
expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1);
}
const transaction = unitOfWork.useTransaction(errorAction);
await Promise.allSettled([transaction.waitForExecution()]);
expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1);
});

it("releases the transaction", async () => {
try {
await unitOfWork.useTransaction(errorAction);
} catch {
expect(queryRunner.release).toHaveBeenCalledTimes(1);
}
const transaction = unitOfWork.useTransaction(errorAction);
await Promise.allSettled([transaction.waitForExecution()]);
expect(queryRunner.release).toHaveBeenCalledTimes(1);
});

it("throws generated error", async () => {
expect.assertions(2);

try {
await unitOfWork.useTransaction(errorAction);
} catch (error) {
expect(error).toBeInstanceOf(Error);
expect(error.message).toBe("DB call error");
}
const transaction = unitOfWork.useTransaction(errorAction);
await expect(transaction.waitForExecution()).rejects.toThrowError(error);
});
});

describe("when commit transaction fails", () => {
beforeEach(() => {
jest.spyOn(queryRunner, "commitTransaction").mockRejectedValue(new Error("Failed to commit"));
});

it("rollbacks the transaction", async () => {
const transaction = unitOfWork.useTransaction(jest.fn());
await Promise.allSettled([transaction.waitForExecution()]);
expect(queryRunner.rollbackTransaction).toHaveBeenCalledTimes(1);
});

it("releases the transaction", async () => {
const transaction = unitOfWork.useTransaction(jest.fn());
await Promise.allSettled([transaction.waitForExecution()]);
expect(queryRunner.release).toHaveBeenCalledTimes(1);
});

it("throws error", async () => {
const transaction = unitOfWork.useTransaction(jest.fn());
await expect(transaction.waitForExecution()).rejects.toThrowError(new Error("Failed to commit"));
});
});
});
Expand Down
Loading

0 comments on commit 82491ac

Please sign in to comment.