Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: database transactions #48

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
KyselyRoundRepository,
KyselyStrategyProcessingCheckpointRepository,
KyselyStrategyRegistryRepository,
KyselyTransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, Logger } from "@grants-stack-indexer/shared";

Expand Down Expand Up @@ -50,6 +51,8 @@ export class SharedDependenciesService {
logger,
);

const transactionManager = new KyselyTransactionManager(kyselyDatabase);

const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA);
const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA);
const applicationRepository = new KyselyApplicationRepository(
Expand Down Expand Up @@ -97,6 +100,7 @@ export class SharedDependenciesService {
donationRepository,
metadataProvider,
applicationPayoutRepository,
transactionManager,
},
registriesRepositories: {
eventRegistryRepository,
Expand Down
2 changes: 2 additions & 0 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
})),
KyselyEventRegistryRepository: vi.fn(),
KyselyStrategyProcessingCheckpointRepository: vi.fn(),
KyselyTransactionManager: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand Down Expand Up @@ -145,6 +146,7 @@ describe("SharedDependenciesService", () => {
expect(dependencies.core).toHaveProperty("donationRepository");
expect(dependencies.core).toHaveProperty("metadataProvider");
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");
expect(dependencies.core).toHaveProperty("transactionManager");

// Verify registries
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
Expand Down
58 changes: 27 additions & 31 deletions packages/data-flow/src/data-loader/dataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import {
IDonationRepository,
IProjectRepository,
IRoundRepository,
ITransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, stringify } from "@grants-stack-indexer/shared";
import { ILogger } from "@grants-stack-indexer/shared";

import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js";
import { IDataLoader, InvalidChangeset } from "../internal.js";
import {
createApplicationHandlers,
createApplicationPayoutHandlers,
Expand Down Expand Up @@ -42,6 +43,7 @@ export class DataLoader implements IDataLoader {
donation: IDonationRepository;
applicationPayout: IApplicationPayoutRepository;
},
private readonly transactionManager: ITransactionManager,
private readonly logger: ILogger,
) {
this.handlers = {
Expand All @@ -54,40 +56,34 @@ export class DataLoader implements IDataLoader {
}

/** @inheritdoc */
public async applyChanges(changesets: Changeset[]): Promise<ExecutionResult> {
const result: ExecutionResult = {
changesets: [],
numExecuted: 0,
numSuccessful: 0,
numFailed: 0,
errors: [],
};

public async applyChanges(changesets: Changeset[]): Promise<void> {
const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]);
if (invalidTypes.length > 0) {
throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type));
}

//TODO: research how to manage transactions so we can rollback on error
for (const changeset of changesets) {
result.numExecuted++;
try {
//TODO: inside each handler, we should add zod validation that the args match the expected type
await this.handlers[changeset.type](changeset as never);
result.changesets.push(changeset.type);
result.numSuccessful++;
} catch (error) {
result.numFailed++;
result.errors.push(
`Failed to apply changeset ${changeset.type}: ${
error instanceof Error ? error.message : String(error)
}`,
);
this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`);
break;
}
}
await this.transactionManager.runInTransaction(async (tx) => {
this.logger.debug(`Starting transaction on ${changesets.length} changesets...`, {
className: DataLoader.name,
});
for (const changeset of changesets) {
try {
//TODO: inside each handler, we should add zod validation that the args match the expected type
await this.handlers[changeset.type](changeset as never, tx);
} catch (error) {
this.logger.debug(
`Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`,
{
className: DataLoader.name,
},
);

return result;
throw error;
}
}
});
this.logger.debug(`Successfully applied ${changesets.length} changesets`, {
className: DataLoader.name,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ export type ApplicationHandlers = {
export const createApplicationHandlers = (
repository: IApplicationRepository,
): ApplicationHandlers => ({
InsertApplication: (async (changeset): Promise<void> => {
await repository.insertApplication(changeset.args);
InsertApplication: (async (changeset, txConnection): Promise<void> => {
await repository.insertApplication(changeset.args, txConnection);
}) satisfies ChangesetHandler<"InsertApplication">,

UpdateApplication: (async (changeset): Promise<void> => {
UpdateApplication: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, applicationId, application } = changeset.args;
await repository.updateApplication({ chainId, roundId, id: applicationId }, application);
await repository.updateApplication(
{ chainId, roundId, id: applicationId },
application,
txConnection,
);
}) satisfies ChangesetHandler<"UpdateApplication">,
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type ApplicationPayoutHandlers = {
export const createApplicationPayoutHandlers = (
repository: IApplicationPayoutRepository,
): ApplicationPayoutHandlers => ({
InsertApplicationPayout: (async (changeset): Promise<void> => {
await repository.insertApplicationPayout(changeset.args.applicationPayout);
InsertApplicationPayout: (async (changeset, txConnection): Promise<void> => {
await repository.insertApplicationPayout(changeset.args.applicationPayout, txConnection);
}) satisfies ChangesetHandler<"InsertApplicationPayout">,
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ export type DonationHandlers = {
* @returns An object containing all application-related handlers
*/
export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({
InsertDonation: (async (changeset): Promise<void> => {
await repository.insertDonation(changeset.args.donation);
InsertDonation: (async (changeset, txConnection): Promise<void> => {
await repository.insertDonation(changeset.args.donation, txConnection);
}) satisfies ChangesetHandler<"InsertDonation">,

InsertManyDonations: (async (changeset): Promise<void> => {
await repository.insertManyDonations(changeset.args.donations);
InsertManyDonations: (async (changeset, txConnection): Promise<void> => {
await repository.insertManyDonations(changeset.args.donations, txConnection);
}) satisfies ChangesetHandler<"InsertManyDonations">,
});
28 changes: 14 additions & 14 deletions packages/data-flow/src/data-loader/handlers/project.handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@ export type ProjectHandlers = {
* @returns An object containing all project-related handlers
*/
export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({
InsertProject: (async (changeset): Promise<void> => {
InsertProject: (async (changeset, txConnection): Promise<void> => {
const { project } = changeset.args;
await repository.insertProject(project);
await repository.insertProject(project, txConnection);
}) satisfies ChangesetHandler<"InsertProject">,

UpdateProject: (async (changeset): Promise<void> => {
UpdateProject: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, project } = changeset.args;
await repository.updateProject({ id: projectId, chainId }, project);
await repository.updateProject({ id: projectId, chainId }, project, txConnection);
}) satisfies ChangesetHandler<"UpdateProject">,

InsertPendingProjectRole: (async (changeset): Promise<void> => {
InsertPendingProjectRole: (async (changeset, txConnection): Promise<void> => {
const { pendingProjectRole } = changeset.args;
await repository.insertPendingProjectRole(pendingProjectRole);
await repository.insertPendingProjectRole(pendingProjectRole, txConnection);
}) satisfies ChangesetHandler<"InsertPendingProjectRole">,

DeletePendingProjectRoles: (async (changeset): Promise<void> => {
DeletePendingProjectRoles: (async (changeset, txConnection): Promise<void> => {
const { ids } = changeset.args;
await repository.deleteManyPendingProjectRoles(ids);
await repository.deleteManyPendingProjectRoles(ids, txConnection);
}) satisfies ChangesetHandler<"DeletePendingProjectRoles">,

InsertProjectRole: (async (changeset): Promise<void> => {
InsertProjectRole: (async (changeset, txConnection): Promise<void> => {
const { projectRole } = changeset.args;
await repository.insertProjectRole(projectRole);
await repository.insertProjectRole(projectRole, txConnection);
}) satisfies ChangesetHandler<"InsertProjectRole">,

DeleteAllProjectRolesByRole: (async (changeset): Promise<void> => {
DeleteAllProjectRolesByRole: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, role } = changeset.args.projectRole;
await repository.deleteManyProjectRoles(chainId, projectId, role);
await repository.deleteManyProjectRoles(chainId, projectId, role, undefined, txConnection);
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">,

DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise<void> => {
DeleteAllProjectRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, role, address } = changeset.args.projectRole;
await repository.deleteManyProjectRoles(chainId, projectId, role, address);
await repository.deleteManyProjectRoles(chainId, projectId, role, address, txConnection);
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">,
});
44 changes: 28 additions & 16 deletions packages/data-flow/src/data-loader/handlers/round.handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ export type RoundHandlers = {
* @returns An object containing all round-related handlers
*/
export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({
InsertRound: (async (changeset): Promise<void> => {
InsertRound: (async (changeset, txConnection): Promise<void> => {
const { round } = changeset.args;
await repository.insertRound(round);
await repository.insertRound(round, txConnection);
}) satisfies ChangesetHandler<"InsertRound">,

UpdateRound: (async (changeset): Promise<void> => {
UpdateRound: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, round } = changeset.args;
await repository.updateRound({ id: roundId, chainId }, round);
await repository.updateRound({ id: roundId, chainId }, round, txConnection);
}) satisfies ChangesetHandler<"UpdateRound">,

UpdateRoundByStrategyAddress: (async (changeset): Promise<void> => {
UpdateRoundByStrategyAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, strategyAddress, round } = changeset.args;
if (round) {
await repository.updateRound({ strategyAddress, chainId: chainId }, round);
await repository.updateRound(
{ strategyAddress, chainId: chainId },
round,
txConnection,
);
}
}) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">,

IncrementRoundFundedAmount: (async (changeset): Promise<void> => {
IncrementRoundFundedAmount: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args;
await repository.incrementRoundFunds(
{
Expand All @@ -43,37 +47,45 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers
},
fundedAmount,
fundedAmountInUsd,
txConnection,
);
}) satisfies ChangesetHandler<"IncrementRoundFundedAmount">,

IncrementRoundTotalDistributed: (async (changeset): Promise<void> => {
IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, amount } = changeset.args;
await repository.incrementRoundTotalDistributed(
{
chainId,
roundId,
},
amount,
txConnection,
);
}) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">,

InsertPendingRoundRole: (async (changeset): Promise<void> => {
InsertPendingRoundRole: (async (changeset, txConnection): Promise<void> => {
const { pendingRoundRole } = changeset.args;
await repository.insertPendingRoundRole(pendingRoundRole);
await repository.insertPendingRoundRole(pendingRoundRole, txConnection);
}) satisfies ChangesetHandler<"InsertPendingRoundRole">,

DeletePendingRoundRoles: (async (changeset): Promise<void> => {
DeletePendingRoundRoles: (async (changeset, txConnection): Promise<void> => {
const { ids } = changeset.args;
await repository.deleteManyPendingRoundRoles(ids);
await repository.deleteManyPendingRoundRoles(ids, txConnection);
}) satisfies ChangesetHandler<"DeletePendingRoundRoles">,

InsertRoundRole: (async (changeset): Promise<void> => {
InsertRoundRole: (async (changeset, txConnection): Promise<void> => {
const { roundRole } = changeset.args;
await repository.insertRoundRole(roundRole);
await repository.insertRoundRole(roundRole, txConnection);
}) satisfies ChangesetHandler<"InsertRoundRole">,

DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise<void> => {
DeleteAllRoundRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, role, address } = changeset.args.roundRole;
await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address);
await repository.deleteManyRoundRolesByRoleAndAddress(
chainId,
roundId,
role,
address,
txConnection,
);
}) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">,
});
3 changes: 2 additions & 1 deletion packages/data-flow/src/data-loader/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Changeset } from "@grants-stack-indexer/repository";
import { Changeset, TransactionConnection } from "@grants-stack-indexer/repository";

export type ChangesetHandler<T extends Changeset["type"]> = (
changeset: Extract<Changeset, { type: T }>,
txConnection?: TransactionConnection,
) => Promise<void>;

export type ChangesetHandlers = {
Expand Down
4 changes: 1 addition & 3 deletions packages/data-flow/src/interfaces/dataLoader.interface.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { Changeset } from "@grants-stack-indexer/repository";

import type { ExecutionResult } from "../internal.js";

export interface IDataLoader {
/**
* Applies the changesets to the database.
* @param changesets - The changesets to apply.
* @returns The execution result.
* @throws {InvalidChangeset} if there are changesets with invalid types.
*/
applyChanges(changesets: Changeset[]): Promise<ExecutionResult>;
applyChanges(changesets: Changeset[]): Promise<void>;
}
16 changes: 2 additions & 14 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class Orchestrator {
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
},
this.dependencies.transactionManager,
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
Expand Down Expand Up @@ -145,20 +146,7 @@ export class Orchestrator {
}

const changesets = await this.eventsProcessor.processEvent(event);
const executionResult = await this.dataLoader.applyChanges(changesets);

if (executionResult.numFailed > 0) {
//TODO: should we retry the failed changesets?
this.logger.error(
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(
event,
)}`,
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
}
await this.dataLoader.applyChanges(changesets);
} catch (error: unknown) {
// TODO: improve error handling, retries and notify
if (
Expand Down
13 changes: 2 additions & 11 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class RetroactiveProcessor {
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
},
this.dependencies.transactionManager,
this.logger,
);
}
Expand Down Expand Up @@ -208,17 +209,7 @@ export class RetroactiveProcessor {

event.strategyId = strategyId;
const changesets = await this.eventsProcessor.processEvent(event);
const executionResult = await this.dataLoader.applyChanges(changesets);

if (executionResult.numFailed > 0) {
this.logger.error(
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`,
{
className: RetroactiveProcessor.name,
chainId: this.chainId,
},
);
}
await this.dataLoader.applyChanges(changesets);
} catch (error) {
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
// Expected errors that we can safely ignore
Expand Down
Loading
Loading