diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index d2d79ea..fc29306 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -15,6 +15,7 @@ import { KyselyRoundRepository, KyselyStrategyProcessingCheckpointRepository, KyselyStrategyRegistryRepository, + KyselyTransactionManager, } from "@grants-stack-indexer/repository"; import { ILogger, Logger } from "@grants-stack-indexer/shared"; @@ -50,6 +51,8 @@ export class SharedDependenciesService { logger, ); + const transactionManager = new KyselyTransactionManager(kyselyDatabase); + const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA); const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA); const applicationRepository = new KyselyApplicationRepository( @@ -97,6 +100,7 @@ export class SharedDependenciesService { donationRepository, metadataProvider, applicationPayoutRepository, + transactionManager, }, registriesRepositories: { eventRegistryRepository, diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index df37a6e..cdd3c75 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -42,6 +42,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ })), KyselyEventRegistryRepository: vi.fn(), KyselyStrategyProcessingCheckpointRepository: vi.fn(), + KyselyTransactionManager: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -145,6 +146,7 @@ describe("SharedDependenciesService", () => { expect(dependencies.core).toHaveProperty("donationRepository"); expect(dependencies.core).toHaveProperty("metadataProvider"); expect(dependencies.core).toHaveProperty("applicationPayoutRepository"); + expect(dependencies.core).toHaveProperty("transactionManager"); // Verify registries expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository"); diff --git a/packages/data-flow/src/data-loader/dataLoader.ts b/packages/data-flow/src/data-loader/dataLoader.ts index 2f07d1a..d87b0cc 100644 --- a/packages/data-flow/src/data-loader/dataLoader.ts +++ b/packages/data-flow/src/data-loader/dataLoader.ts @@ -5,10 +5,11 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; -import { ILogger, stringify } from "@grants-stack-indexer/shared"; +import { ILogger } from "@grants-stack-indexer/shared"; -import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js"; +import { IDataLoader, InvalidChangeset } from "../internal.js"; import { createApplicationHandlers, createApplicationPayoutHandlers, @@ -42,6 +43,7 @@ export class DataLoader implements IDataLoader { donation: IDonationRepository; applicationPayout: IApplicationPayoutRepository; }, + private readonly transactionManager: ITransactionManager, private readonly logger: ILogger, ) { this.handlers = { @@ -54,40 +56,26 @@ export class DataLoader implements IDataLoader { } /** @inheritdoc */ - public async applyChanges(changesets: Changeset[]): Promise { - const result: ExecutionResult = { - changesets: [], - numExecuted: 0, - numSuccessful: 0, - numFailed: 0, - errors: [], - }; - + public async applyChanges(changesets: Changeset[]): Promise { const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]); if (invalidTypes.length > 0) { throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type)); } - //TODO: research how to manage transactions so we can rollback on error - for (const changeset of changesets) { - result.numExecuted++; - try { - //TODO: inside each handler, we should add zod validation that the args match the expected type - await this.handlers[changeset.type](changeset as never); - result.changesets.push(changeset.type); - result.numSuccessful++; - } catch (error) { - result.numFailed++; - result.errors.push( - `Failed to apply changeset ${changeset.type}: ${ - error instanceof Error ? error.message : String(error) - }`, - ); - this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`); - break; - } - } + await this.transactionManager.runInTransaction(async (tx) => { + this.logger.debug("Starting transaction..."); + for (const changeset of changesets) { + try { + //TODO: inside each handler, we should add zod validation that the args match the expected type + await this.handlers[changeset.type](changeset as never, tx); + } catch (error) { + this.logger.debug( + `Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`, + ); - return result; + throw error; + } + } + }); } } diff --git a/packages/data-flow/src/data-loader/handlers/application.handlers.ts b/packages/data-flow/src/data-loader/handlers/application.handlers.ts index 20afd8a..7167efc 100644 --- a/packages/data-flow/src/data-loader/handlers/application.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/application.handlers.ts @@ -19,12 +19,16 @@ export type ApplicationHandlers = { export const createApplicationHandlers = ( repository: IApplicationRepository, ): ApplicationHandlers => ({ - InsertApplication: (async (changeset): Promise => { - await repository.insertApplication(changeset.args); + InsertApplication: (async (changeset, txConnection): Promise => { + await repository.insertApplication(changeset.args, txConnection); }) satisfies ChangesetHandler<"InsertApplication">, - UpdateApplication: (async (changeset): Promise => { + UpdateApplication: (async (changeset, txConnection): Promise => { const { chainId, roundId, applicationId, application } = changeset.args; - await repository.updateApplication({ chainId, roundId, id: applicationId }, application); + await repository.updateApplication( + { chainId, roundId, id: applicationId }, + application, + txConnection, + ); }) satisfies ChangesetHandler<"UpdateApplication">, }); diff --git a/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts b/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts index f14fc24..a96de33 100644 --- a/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts @@ -22,7 +22,7 @@ export type ApplicationPayoutHandlers = { export const createApplicationPayoutHandlers = ( repository: IApplicationPayoutRepository, ): ApplicationPayoutHandlers => ({ - InsertApplicationPayout: (async (changeset): Promise => { - await repository.insertApplicationPayout(changeset.args.applicationPayout); + InsertApplicationPayout: (async (changeset, txConnection): Promise => { + await repository.insertApplicationPayout(changeset.args.applicationPayout, txConnection); }) satisfies ChangesetHandler<"InsertApplicationPayout">, }); diff --git a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts index b42d9e0..841fbd3 100644 --- a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts @@ -17,11 +17,11 @@ export type DonationHandlers = { * @returns An object containing all application-related handlers */ export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({ - InsertDonation: (async (changeset): Promise => { - await repository.insertDonation(changeset.args.donation); + InsertDonation: (async (changeset, txConnection): Promise => { + await repository.insertDonation(changeset.args.donation, txConnection); }) satisfies ChangesetHandler<"InsertDonation">, - InsertManyDonations: (async (changeset): Promise => { - await repository.insertManyDonations(changeset.args.donations); + InsertManyDonations: (async (changeset, txConnection): Promise => { + await repository.insertManyDonations(changeset.args.donations, txConnection); }) satisfies ChangesetHandler<"InsertManyDonations">, }); diff --git a/packages/data-flow/src/data-loader/handlers/project.handlers.ts b/packages/data-flow/src/data-loader/handlers/project.handlers.ts index b32a9db..01cb06f 100644 --- a/packages/data-flow/src/data-loader/handlers/project.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/project.handlers.ts @@ -17,38 +17,38 @@ export type ProjectHandlers = { * @returns An object containing all project-related handlers */ export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({ - InsertProject: (async (changeset): Promise => { + InsertProject: (async (changeset, txConnection): Promise => { const { project } = changeset.args; - await repository.insertProject(project); + await repository.insertProject(project, txConnection); }) satisfies ChangesetHandler<"InsertProject">, - UpdateProject: (async (changeset): Promise => { + UpdateProject: (async (changeset, txConnection): Promise => { const { chainId, projectId, project } = changeset.args; - await repository.updateProject({ id: projectId, chainId }, project); + await repository.updateProject({ id: projectId, chainId }, project, txConnection); }) satisfies ChangesetHandler<"UpdateProject">, - InsertPendingProjectRole: (async (changeset): Promise => { + InsertPendingProjectRole: (async (changeset, txConnection): Promise => { const { pendingProjectRole } = changeset.args; - await repository.insertPendingProjectRole(pendingProjectRole); + await repository.insertPendingProjectRole(pendingProjectRole, txConnection); }) satisfies ChangesetHandler<"InsertPendingProjectRole">, - DeletePendingProjectRoles: (async (changeset): Promise => { + DeletePendingProjectRoles: (async (changeset, txConnection): Promise => { const { ids } = changeset.args; - await repository.deleteManyPendingProjectRoles(ids); + await repository.deleteManyPendingProjectRoles(ids, txConnection); }) satisfies ChangesetHandler<"DeletePendingProjectRoles">, - InsertProjectRole: (async (changeset): Promise => { + InsertProjectRole: (async (changeset, txConnection): Promise => { const { projectRole } = changeset.args; - await repository.insertProjectRole(projectRole); + await repository.insertProjectRole(projectRole, txConnection); }) satisfies ChangesetHandler<"InsertProjectRole">, - DeleteAllProjectRolesByRole: (async (changeset): Promise => { + DeleteAllProjectRolesByRole: (async (changeset, txConnection): Promise => { const { chainId, projectId, role } = changeset.args.projectRole; - await repository.deleteManyProjectRoles(chainId, projectId, role); + await repository.deleteManyProjectRoles(chainId, projectId, role, undefined, txConnection); }) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">, - DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise => { + DeleteAllProjectRolesByRoleAndAddress: (async (changeset, txConnection): Promise => { const { chainId, projectId, role, address } = changeset.args.projectRole; - await repository.deleteManyProjectRoles(chainId, projectId, role, address); + await repository.deleteManyProjectRoles(chainId, projectId, role, address, txConnection); }) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">, }); diff --git a/packages/data-flow/src/data-loader/handlers/round.handlers.ts b/packages/data-flow/src/data-loader/handlers/round.handlers.ts index cb6c831..d877850 100644 --- a/packages/data-flow/src/data-loader/handlers/round.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/round.handlers.ts @@ -17,24 +17,28 @@ export type RoundHandlers = { * @returns An object containing all round-related handlers */ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({ - InsertRound: (async (changeset): Promise => { + InsertRound: (async (changeset, txConnection): Promise => { const { round } = changeset.args; - await repository.insertRound(round); + await repository.insertRound(round, txConnection); }) satisfies ChangesetHandler<"InsertRound">, - UpdateRound: (async (changeset): Promise => { + UpdateRound: (async (changeset, txConnection): Promise => { const { chainId, roundId, round } = changeset.args; - await repository.updateRound({ id: roundId, chainId }, round); + await repository.updateRound({ id: roundId, chainId }, round, txConnection); }) satisfies ChangesetHandler<"UpdateRound">, - UpdateRoundByStrategyAddress: (async (changeset): Promise => { + UpdateRoundByStrategyAddress: (async (changeset, txConnection): Promise => { const { chainId, strategyAddress, round } = changeset.args; if (round) { - await repository.updateRound({ strategyAddress, chainId: chainId }, round); + await repository.updateRound( + { strategyAddress, chainId: chainId }, + round, + txConnection, + ); } }) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">, - IncrementRoundFundedAmount: (async (changeset): Promise => { + IncrementRoundFundedAmount: (async (changeset, txConnection): Promise => { const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args; await repository.incrementRoundFunds( { @@ -43,10 +47,11 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers }, fundedAmount, fundedAmountInUsd, + txConnection, ); }) satisfies ChangesetHandler<"IncrementRoundFundedAmount">, - IncrementRoundTotalDistributed: (async (changeset): Promise => { + IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise => { const { chainId, roundId, amount } = changeset.args; await repository.incrementRoundTotalDistributed( { @@ -54,26 +59,33 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers roundId, }, amount, + txConnection, ); }) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">, - InsertPendingRoundRole: (async (changeset): Promise => { + InsertPendingRoundRole: (async (changeset, txConnection): Promise => { const { pendingRoundRole } = changeset.args; - await repository.insertPendingRoundRole(pendingRoundRole); + await repository.insertPendingRoundRole(pendingRoundRole, txConnection); }) satisfies ChangesetHandler<"InsertPendingRoundRole">, - DeletePendingRoundRoles: (async (changeset): Promise => { + DeletePendingRoundRoles: (async (changeset, txConnection): Promise => { const { ids } = changeset.args; - await repository.deleteManyPendingRoundRoles(ids); + await repository.deleteManyPendingRoundRoles(ids, txConnection); }) satisfies ChangesetHandler<"DeletePendingRoundRoles">, - InsertRoundRole: (async (changeset): Promise => { + InsertRoundRole: (async (changeset, txConnection): Promise => { const { roundRole } = changeset.args; - await repository.insertRoundRole(roundRole); + await repository.insertRoundRole(roundRole, txConnection); }) satisfies ChangesetHandler<"InsertRoundRole">, - DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise => { + DeleteAllRoundRolesByRoleAndAddress: (async (changeset, txConnection): Promise => { const { chainId, roundId, role, address } = changeset.args.roundRole; - await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address); + await repository.deleteManyRoundRolesByRoleAndAddress( + chainId, + roundId, + role, + address, + txConnection, + ); }) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">, }); diff --git a/packages/data-flow/src/data-loader/types/index.ts b/packages/data-flow/src/data-loader/types/index.ts index c8cae2b..9637860 100644 --- a/packages/data-flow/src/data-loader/types/index.ts +++ b/packages/data-flow/src/data-loader/types/index.ts @@ -1,7 +1,8 @@ -import { Changeset } from "@grants-stack-indexer/repository"; +import { Changeset, TransactionConnection } from "@grants-stack-indexer/repository"; export type ChangesetHandler = ( changeset: Extract, + txConnection?: TransactionConnection, ) => Promise; export type ChangesetHandlers = { diff --git a/packages/data-flow/src/interfaces/dataLoader.interface.ts b/packages/data-flow/src/interfaces/dataLoader.interface.ts index 16a28fb..3b78046 100644 --- a/packages/data-flow/src/interfaces/dataLoader.interface.ts +++ b/packages/data-flow/src/interfaces/dataLoader.interface.ts @@ -1,7 +1,5 @@ import type { Changeset } from "@grants-stack-indexer/repository"; -import type { ExecutionResult } from "../internal.js"; - export interface IDataLoader { /** * Applies the changesets to the database. @@ -9,5 +7,5 @@ export interface IDataLoader { * @returns The execution result. * @throws {InvalidChangeset} if there are changesets with invalid types. */ - applyChanges(changesets: Changeset[]): Promise; + applyChanges(changesets: Changeset[]): Promise; } diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 89c9bcf..0265750 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -94,6 +94,7 @@ export class Orchestrator { donation: this.dependencies.donationRepository, applicationPayout: this.dependencies.applicationPayoutRepository, }, + this.dependencies.transactionManager, this.logger, ); this.eventsQueue = new Queue>(fetchLimit); @@ -145,20 +146,7 @@ export class Orchestrator { } const changesets = await this.eventsProcessor.processEvent(event); - const executionResult = await this.dataLoader.applyChanges(changesets); - - if (executionResult.numFailed > 0) { - //TODO: should we retry the failed changesets? - this.logger.error( - `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify( - event, - )}`, - { - className: Orchestrator.name, - chainId: this.chainId, - }, - ); - } + await this.dataLoader.applyChanges(changesets); } catch (error: unknown) { // TODO: improve error handling, retries and notify if ( diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts index 1553701..ddb25c6 100644 --- a/packages/data-flow/src/retroactiveProcessor.ts +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -100,6 +100,7 @@ export class RetroactiveProcessor { donation: this.dependencies.donationRepository, applicationPayout: this.dependencies.applicationPayoutRepository, }, + this.dependencies.transactionManager, this.logger, ); } @@ -208,17 +209,7 @@ export class RetroactiveProcessor { event.strategyId = strategyId; const changesets = await this.eventsProcessor.processEvent(event); - const executionResult = await this.dataLoader.applyChanges(changesets); - - if (executionResult.numFailed > 0) { - this.logger.error( - `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`, - { - className: RetroactiveProcessor.name, - chainId: this.chainId, - }, - ); - } + await this.dataLoader.applyChanges(changesets); } catch (error) { if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { // Expected errors that we can safely ignore diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index a83c8d8..563071a 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -1,24 +1,13 @@ import { ProcessorDependencies } from "@grants-stack-indexer/processors"; import { - Changeset, IApplicationPayoutRepository, IApplicationRepository, IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; -/** - * The result of the execution of the changesets. - */ -export type ExecutionResult = { - changesets: Changeset["type"][]; - numExecuted: number; - numSuccessful: number; - numFailed: number; - errors: string[]; -}; - /** * The core dependencies for the data flow * @@ -35,4 +24,5 @@ export type CoreDependencies = Pick< applicationRepository: IApplicationRepository; donationRepository: IDonationRepository; applicationPayoutRepository: IApplicationPayoutRepository; + transactionManager: ITransactionManager; }; diff --git a/packages/data-flow/test/data-loader/dataLoader.spec.ts b/packages/data-flow/test/data-loader/dataLoader.spec.ts index 167284b..f0f62e7 100644 --- a/packages/data-flow/test/data-loader/dataLoader.spec.ts +++ b/packages/data-flow/test/data-loader/dataLoader.spec.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { Changeset, @@ -7,6 +7,8 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, + TransactionConnection, } from "@grants-stack-indexer/repository"; import { ILogger } from "@grants-stack-indexer/shared"; @@ -14,6 +16,7 @@ import { DataLoader } from "../../src/data-loader/dataLoader.js"; import { InvalidChangeset } from "../../src/internal.js"; describe("DataLoader", () => { + let dataLoader: DataLoader; const mockProjectRepository = { insertProject: vi.fn(), updateProject: vi.fn(), @@ -44,8 +47,14 @@ describe("DataLoader", () => { info: vi.fn(), warn: vi.fn(), }; - const createDataLoader = (): DataLoader => - new DataLoader( + + const mockTx = { query: vi.fn() } as unknown as TransactionConnection; + const mockTransactionManager = { + runInTransaction: async (fn) => await fn(mockTx), + } as ITransactionManager; + + beforeEach(() => { + dataLoader = new DataLoader( { project: mockProjectRepository, round: mockRoundRepository, @@ -53,16 +62,17 @@ describe("DataLoader", () => { donation: mockDonationRepository, applicationPayout: mockApplicationPayoutRepository, }, + mockTransactionManager, logger, ); + }); - beforeEach(() => { + afterEach(() => { vi.clearAllMocks(); }); describe("applyChanges", () => { it("successfully process multiple changesets", async () => { - const dataLoader = createDataLoader(); const changesets = [ { type: "InsertProject", @@ -74,18 +84,21 @@ describe("DataLoader", () => { } as unknown as Changeset, ]; - const result = await dataLoader.applyChanges(changesets); + await dataLoader.applyChanges(changesets); - expect(result.numExecuted).toBe(2); - expect(result.numSuccessful).toBe(2); - expect(result.numFailed).toBe(0); - expect(result.errors).toHaveLength(0); expect(mockProjectRepository.insertProject).toHaveBeenCalledTimes(1); + expect(mockProjectRepository.insertProject).toHaveBeenCalledWith( + { id: "1", name: "Test Project" }, + mockTx, + ); expect(mockRoundRepository.insertRound).toHaveBeenCalledTimes(1); + expect(mockRoundRepository.insertRound).toHaveBeenCalledWith( + { id: "1", name: "Test Round" }, + mockTx, + ); }); it("throw InvalidChangeset when encountering unknown changeset type", async () => { - const dataLoader = createDataLoader(); const changesets = [ { type: "UnknownType", @@ -97,8 +110,7 @@ describe("DataLoader", () => { ); }); - it("stops processing changesets on first error", async () => { - const dataLoader = createDataLoader(); + it("throws an error if the database operation fails", async () => { const error = new Error("Database error"); vi.spyOn(mockProjectRepository, "insertProject").mockRejectedValueOnce(error); @@ -113,13 +125,11 @@ describe("DataLoader", () => { } as unknown as Changeset, ]; - const result = await dataLoader.applyChanges(changesets); + await expect(dataLoader.applyChanges(changesets)).rejects.toThrow(error); - expect(result.numExecuted).toBe(1); - expect(result.numSuccessful).toBe(0); - expect(result.numFailed).toBe(1); - expect(result.errors).toHaveLength(1); - expect(result.errors[0]).toContain("Database error"); + expect(logger.debug).toHaveBeenCalledWith( + `Error applying changeset InsertProject. Rolling back transaction with 2 changesets`, + ); expect(mockRoundRepository.insertRound).not.toHaveBeenCalled(); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts index 86dad31..1c853d6 100644 --- a/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IApplicationRepository, NewApplication } from "@grants-stack-indexer/repository"; +import { + IApplicationRepository, + NewApplication, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { ChainId } from "@grants-stack-indexer/shared"; import { createApplicationHandlers } from "../../../src/data-loader/handlers/application.handlers.js"; @@ -10,6 +14,7 @@ describe("Application Handlers", () => { insertApplication: vi.fn(), updateApplication: vi.fn(), } as unknown as IApplicationRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createApplicationHandlers(mockRepository); @@ -24,7 +29,7 @@ describe("Application Handlers", () => { args: application, }); - expect(mockRepository.insertApplication).toHaveBeenCalledWith(application); + expect(mockRepository.insertApplication).toHaveBeenCalledWith(application, undefined); }); it("handle UpdateApplication changeset", async () => { @@ -38,11 +43,12 @@ describe("Application Handlers", () => { }, } as const; - await handlers.UpdateApplication(update); + await handlers.UpdateApplication(update, mockTxConnection); expect(mockRepository.updateApplication).toHaveBeenCalledWith( { chainId: 1, roundId: "round1", id: "app1" }, { status: "APPROVED" }, + mockTxConnection, ); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts new file mode 100644 index 0000000..0d17dfd --- /dev/null +++ b/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts @@ -0,0 +1,42 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + IApplicationPayoutRepository, + NewApplicationPayout, + TransactionConnection, +} from "@grants-stack-indexer/repository"; + +import { createApplicationPayoutHandlers } from "../../../src/data-loader/handlers/applicationPayout.handlers.js"; + +describe("ApplicationPayout Handlers", () => { + const mockRepository = { + insertApplicationPayout: vi.fn(), + } as IApplicationPayoutRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; + + const handlers = createApplicationPayoutHandlers(mockRepository); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("handle InsertApplication changeset", async () => { + const applicationPayout = { + id: "1", + name: "Test Application", + } as unknown as NewApplicationPayout; + + await handlers.InsertApplicationPayout( + { + type: "InsertApplicationPayout", + args: { applicationPayout }, + }, + mockTxConnection, + ); + + expect(mockRepository.insertApplicationPayout).toHaveBeenCalledWith( + applicationPayout, + mockTxConnection, + ); + }); +}); diff --git a/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts new file mode 100644 index 0000000..deb562d --- /dev/null +++ b/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts @@ -0,0 +1,54 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + IDonationRepository, + NewDonation, + TransactionConnection, +} from "@grants-stack-indexer/repository"; + +import { createDonationHandlers } from "../../../src/data-loader/handlers/donation.handlers.js"; + +describe("Donation Handlers", () => { + const mockRepository = { + insertDonation: vi.fn(), + insertManyDonations: vi.fn(), + } as IDonationRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; + + const handlers = createDonationHandlers(mockRepository); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("handle InsertDonation changeset", async () => { + const donation = { id: "1", name: "Test Donation" } as unknown as NewDonation; + await handlers.InsertDonation({ + type: "InsertDonation", + args: { donation }, + }); + + expect(mockRepository.insertDonation).toHaveBeenCalledWith(donation, undefined); + }); + + it("handle InsertManyDonations changeset", async () => { + const donations = [ + { id: "1", name: "Test Donation" }, + { id: "2", name: "Test Donation 2" }, + { id: "3", name: "Test Donation 3" }, + ] as unknown as NewDonation[]; + + await handlers.InsertManyDonations( + { + type: "InsertManyDonations", + args: { donations }, + }, + mockTxConnection, + ); + + expect(mockRepository.insertManyDonations).toHaveBeenCalledWith( + donations, + mockTxConnection, + ); + }); +}); diff --git a/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts index 5ef0f45..087ece9 100644 --- a/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IProjectRepository, NewProject } from "@grants-stack-indexer/repository"; +import { + IProjectRepository, + NewProject, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { Address, ChainId } from "@grants-stack-indexer/shared"; import { createProjectHandlers } from "../../../src/data-loader/handlers/project.handlers.js"; @@ -14,6 +18,7 @@ describe("Project Handlers", () => { insertProjectRole: vi.fn(), deleteManyProjectRoles: vi.fn(), } as unknown as IProjectRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createProjectHandlers(mockRepository); @@ -39,12 +44,15 @@ describe("Project Handlers", () => { projectType: "canonical", } as NewProject; - await handlers.InsertProject({ - type: "InsertProject", - args: { project }, - }); + await handlers.InsertProject( + { + type: "InsertProject", + args: { project }, + }, + mockTxConnection, + ); - expect(mockRepository.insertProject).toHaveBeenCalledWith(project); + expect(mockRepository.insertProject).toHaveBeenCalledWith(project, mockTxConnection); }); it("handle UpdateProject changeset", async () => { @@ -65,6 +73,7 @@ describe("Project Handlers", () => { expect(mockRepository.updateProject).toHaveBeenCalledWith( { id: "project-1", chainId: 1 }, { name: "Updated Project", updatedAtBlock: 200n }, + undefined, ); }); @@ -81,7 +90,10 @@ describe("Project Handlers", () => { args: { pendingProjectRole: pendingRole }, }); - expect(mockRepository.insertPendingProjectRole).toHaveBeenCalledWith(pendingRole); + expect(mockRepository.insertPendingProjectRole).toHaveBeenCalledWith( + pendingRole, + undefined, + ); }); it("handle DeletePendingProjectRoles changeset", async () => { @@ -94,7 +106,10 @@ describe("Project Handlers", () => { await handlers.DeletePendingProjectRoles(changeset); - expect(mockRepository.deleteManyPendingProjectRoles).toHaveBeenCalledWith([1, 2, 3]); + expect(mockRepository.deleteManyPendingProjectRoles).toHaveBeenCalledWith( + [1, 2, 3], + undefined, + ); }); it("handle InsertProjectRole changeset", async () => { @@ -111,7 +126,7 @@ describe("Project Handlers", () => { args: { projectRole }, }); - expect(mockRepository.insertProjectRole).toHaveBeenCalledWith(projectRole); + expect(mockRepository.insertProjectRole).toHaveBeenCalledWith(projectRole, undefined); }); it("handle DeleteAllProjectRolesByRole changeset", async () => { @@ -132,6 +147,8 @@ describe("Project Handlers", () => { changeset.args.projectRole.chainId, changeset.args.projectRole.projectId, changeset.args.projectRole.role, + undefined, + undefined, ); }); @@ -155,6 +172,7 @@ describe("Project Handlers", () => { changeset.args.projectRole.projectId, changeset.args.projectRole.role, changeset.args.projectRole.address, + undefined, ); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts index 6ddb625..c06a960 100644 --- a/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IRoundRepository, NewRound } from "@grants-stack-indexer/repository"; +import { + IRoundRepository, + NewRound, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { Address, ChainId } from "@grants-stack-indexer/shared"; import { createRoundHandlers } from "../../../src/data-loader/handlers/round.handlers.js"; @@ -16,6 +20,7 @@ describe("Round Handlers", () => { insertRoundRole: vi.fn(), deleteManyRoundRolesByRoleAndAddress: vi.fn(), } as unknown as IRoundRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createRoundHandlers(mockRepository); @@ -30,12 +35,15 @@ describe("Round Handlers", () => { matchAmount: 1000n, } as NewRound; - await handlers.InsertRound({ - type: "InsertRound" as const, - args: { round }, - }); + await handlers.InsertRound( + { + type: "InsertRound" as const, + args: { round }, + }, + mockTxConnection, + ); - expect(mockRepository.insertRound).toHaveBeenCalledWith(round); + expect(mockRepository.insertRound).toHaveBeenCalledWith(round, mockTxConnection); }); it("handle UpdateRound changeset", async () => { @@ -56,6 +64,7 @@ describe("Round Handlers", () => { expect(mockRepository.updateRound).toHaveBeenCalledWith( { id: "round-1", chainId: 1 as ChainId }, { matchAmount: 2000n, matchAmountInUsd: "2000" }, + undefined, ); }); @@ -77,6 +86,7 @@ describe("Round Handlers", () => { expect(mockRepository.updateRound).toHaveBeenCalledWith( { chainId: 1 as ChainId, strategyAddress: "0x123" as Address }, { matchAmount: 2000n, matchAmountInUsd: "2000" }, + undefined, ); }); @@ -97,6 +107,7 @@ describe("Round Handlers", () => { { chainId: 1 as ChainId, roundId: "round-1" }, 1000n, "1000", + undefined, ); }); @@ -115,6 +126,7 @@ describe("Round Handlers", () => { expect(mockRepository.incrementRoundTotalDistributed).toHaveBeenCalledWith( { chainId: 1 as ChainId, roundId: "round-1" }, 1000n, + undefined, ); }); @@ -135,6 +147,7 @@ describe("Round Handlers", () => { expect(mockRepository.insertPendingRoundRole).toHaveBeenCalledWith( changeset.args.pendingRoundRole, + undefined, ); }); @@ -148,7 +161,10 @@ describe("Round Handlers", () => { await handlers.DeletePendingRoundRoles(changeset); - expect(mockRepository.deleteManyPendingRoundRoles).toHaveBeenCalledWith([1, 2, 3]); + expect(mockRepository.deleteManyPendingRoundRoles).toHaveBeenCalledWith( + [1, 2, 3], + undefined, + ); }); it("handle InsertRoundRole changeset", async () => { @@ -167,7 +183,10 @@ describe("Round Handlers", () => { await handlers.InsertRoundRole(changeset); - expect(mockRepository.insertRoundRole).toHaveBeenCalledWith(changeset.args.roundRole); + expect(mockRepository.insertRoundRole).toHaveBeenCalledWith( + changeset.args.roundRole, + undefined, + ); }); it("handle DeleteAllRoundRolesByRoleAndAddress changeset", async () => { @@ -190,6 +209,7 @@ describe("Round Handlers", () => { changeset.args.roundRole.roundId, changeset.args.roundRole.role, changeset.args.roundRole.address, + undefined, ); }); }); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index ec74912..057784b 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -11,6 +11,7 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; import { AlloEvent, @@ -92,6 +93,7 @@ describe("Orchestrator", { sequential: true }, () => { const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, + transactionManager: {} as unknown as ITransactionManager, projectRepository: {} as unknown as IProjectRepository, roundRepository: {} as unknown as IRoundRepository, applicationRepository: {} as unknown as IApplicationRepository, @@ -151,13 +153,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValueOnce(mockEvents) .mockResolvedValue([]); eventsProcessorSpy.mockResolvedValue([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -246,13 +244,9 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertProject", "InsertRoundRole", "DeletePendingRoundRoles"], - numExecuted: 3, - numSuccessful: 3, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -374,13 +368,9 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertApplication"], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -485,13 +475,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValue([]); eventsProcessorSpy.mockResolvedValue([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -544,13 +530,9 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertPendingRoundRole"], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -579,13 +561,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValue([]); eventsProcessorSpy.mockRejectedValueOnce(error).mockResolvedValueOnce([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -674,41 +652,50 @@ describe("Orchestrator", { sequential: true }, () => { expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); }); - it.skip("logs DataLoader errors", async () => { - const mockEvent = createMockEvent("Allo", "PoolCreated", 1); - const mockChangesets: Changeset[] = [ - { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + it("logs DataLoader errors", async () => { + const mockEvent = createMockEvent("Registry", "ProfileCreated", 1, undefined); + const changesets = [ + { + type: "InsertPendingRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, ]; - const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); - vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockResolvedValue( - mockChangesets, - ); - dataLoaderSpy.mockResolvedValue({ - numFailed: 1, - errors: ["Failed to update project"], - changesets: ["UpdateProject"], - numExecuted: 1, - numSuccessful: 0, + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); }); + const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + const error = new Error("Failed to apply changesets"); + dataLoaderSpy.mockRejectedValue(error); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { - if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called"); + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); }); - expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("Failed to apply changesets"), - ); - expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(logger.error).toHaveBeenCalledWith(error, { + event: mockEvent, + className: Orchestrator.name, + chainId, + }); expect(dataLoaderSpy).toHaveBeenCalledTimes(1); - expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); }); }); }); diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index cb52126..0dd931c 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -9,6 +9,7 @@ import { IProjectRepository, IRoundRepository, IStrategyProcessingCheckpointRepository, + ITransactionManager, Strategy, } from "@grants-stack-indexer/repository"; import { @@ -138,6 +139,7 @@ describe("RetroactiveProcessor", () => { applicationRepository: {} as IApplicationRepository, donationRepository: {} as IDonationRepository, applicationPayoutRepository: {} as IApplicationPayoutRepository, + transactionManager: {} as ITransactionManager, pricingProvider: { getTokenPrice: vi.fn(), }, @@ -216,13 +218,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -271,13 +267,7 @@ describe("RetroactiveProcessor", () => { return []; }); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -325,13 +315,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -372,13 +356,7 @@ describe("RetroactiveProcessor", () => { vi.spyOn(processor["eventsFetcher"], "fetchEvents").mockResolvedValueOnce([mockEvent]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -417,13 +395,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -464,13 +436,7 @@ describe("RetroactiveProcessor", () => { .mockRejectedValueOnce(new InvalidEvent(mockEvent1)) .mockResolvedValueOnce([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); await processor.processRetroactiveStrategies();