Skip to content

Commit

Permalink
feat: database transactions (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 authored Jan 3, 2025
1 parent 3b294aa commit 0a645ba
Show file tree
Hide file tree
Showing 38 changed files with 549 additions and 363 deletions.
4 changes: 4 additions & 0 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
KyselyRoundRepository,
KyselyStrategyProcessingCheckpointRepository,
KyselyStrategyRegistryRepository,
KyselyTransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, Logger } from "@grants-stack-indexer/shared";

Expand Down Expand Up @@ -50,6 +51,8 @@ export class SharedDependenciesService {
logger,
);

const transactionManager = new KyselyTransactionManager(kyselyDatabase);

const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA);
const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA);
const applicationRepository = new KyselyApplicationRepository(
Expand Down Expand Up @@ -97,6 +100,7 @@ export class SharedDependenciesService {
donationRepository,
metadataProvider,
applicationPayoutRepository,
transactionManager,
},
registriesRepositories: {
eventRegistryRepository,
Expand Down
2 changes: 2 additions & 0 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
})),
KyselyEventRegistryRepository: vi.fn(),
KyselyStrategyProcessingCheckpointRepository: vi.fn(),
KyselyTransactionManager: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand Down Expand Up @@ -145,6 +146,7 @@ describe("SharedDependenciesService", () => {
expect(dependencies.core).toHaveProperty("donationRepository");
expect(dependencies.core).toHaveProperty("metadataProvider");
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");
expect(dependencies.core).toHaveProperty("transactionManager");

// Verify registries
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
Expand Down
58 changes: 27 additions & 31 deletions packages/data-flow/src/data-loader/dataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import {
IDonationRepository,
IProjectRepository,
IRoundRepository,
ITransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, stringify } from "@grants-stack-indexer/shared";
import { ILogger } from "@grants-stack-indexer/shared";

import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js";
import { IDataLoader, InvalidChangeset } from "../internal.js";
import {
createApplicationHandlers,
createApplicationPayoutHandlers,
Expand Down Expand Up @@ -42,6 +43,7 @@ export class DataLoader implements IDataLoader {
donation: IDonationRepository;
applicationPayout: IApplicationPayoutRepository;
},
private readonly transactionManager: ITransactionManager,
private readonly logger: ILogger,
) {
this.handlers = {
Expand All @@ -54,40 +56,34 @@ export class DataLoader implements IDataLoader {
}

/** @inheritdoc */
public async applyChanges(changesets: Changeset[]): Promise<ExecutionResult> {
const result: ExecutionResult = {
changesets: [],
numExecuted: 0,
numSuccessful: 0,
numFailed: 0,
errors: [],
};

public async applyChanges(changesets: Changeset[]): Promise<void> {
const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]);
if (invalidTypes.length > 0) {
throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type));
}

//TODO: research how to manage transactions so we can rollback on error
for (const changeset of changesets) {
result.numExecuted++;
try {
//TODO: inside each handler, we should add zod validation that the args match the expected type
await this.handlers[changeset.type](changeset as never);
result.changesets.push(changeset.type);
result.numSuccessful++;
} catch (error) {
result.numFailed++;
result.errors.push(
`Failed to apply changeset ${changeset.type}: ${
error instanceof Error ? error.message : String(error)
}`,
);
this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`);
break;
}
}
await this.transactionManager.runInTransaction(async (tx) => {
this.logger.debug(`Starting transaction on ${changesets.length} changesets...`, {
className: DataLoader.name,
});
for (const changeset of changesets) {
try {
//TODO: inside each handler, we should add zod validation that the args match the expected type
await this.handlers[changeset.type](changeset as never, tx);
} catch (error) {
this.logger.debug(
`Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`,
{
className: DataLoader.name,
},
);

return result;
throw error;
}
}
});
this.logger.debug(`Successfully applied ${changesets.length} changesets`, {
className: DataLoader.name,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ export type ApplicationHandlers = {
export const createApplicationHandlers = (
repository: IApplicationRepository,
): ApplicationHandlers => ({
InsertApplication: (async (changeset): Promise<void> => {
await repository.insertApplication(changeset.args);
InsertApplication: (async (changeset, txConnection): Promise<void> => {
await repository.insertApplication(changeset.args, txConnection);
}) satisfies ChangesetHandler<"InsertApplication">,

UpdateApplication: (async (changeset): Promise<void> => {
UpdateApplication: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, applicationId, application } = changeset.args;
await repository.updateApplication({ chainId, roundId, id: applicationId }, application);
await repository.updateApplication(
{ chainId, roundId, id: applicationId },
application,
txConnection,
);
}) satisfies ChangesetHandler<"UpdateApplication">,
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type ApplicationPayoutHandlers = {
export const createApplicationPayoutHandlers = (
repository: IApplicationPayoutRepository,
): ApplicationPayoutHandlers => ({
InsertApplicationPayout: (async (changeset): Promise<void> => {
await repository.insertApplicationPayout(changeset.args.applicationPayout);
InsertApplicationPayout: (async (changeset, txConnection): Promise<void> => {
await repository.insertApplicationPayout(changeset.args.applicationPayout, txConnection);
}) satisfies ChangesetHandler<"InsertApplicationPayout">,
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ export type DonationHandlers = {
* @returns An object containing all application-related handlers
*/
export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({
InsertDonation: (async (changeset): Promise<void> => {
await repository.insertDonation(changeset.args.donation);
InsertDonation: (async (changeset, txConnection): Promise<void> => {
await repository.insertDonation(changeset.args.donation, txConnection);
}) satisfies ChangesetHandler<"InsertDonation">,

InsertManyDonations: (async (changeset): Promise<void> => {
await repository.insertManyDonations(changeset.args.donations);
InsertManyDonations: (async (changeset, txConnection): Promise<void> => {
await repository.insertManyDonations(changeset.args.donations, txConnection);
}) satisfies ChangesetHandler<"InsertManyDonations">,
});
28 changes: 14 additions & 14 deletions packages/data-flow/src/data-loader/handlers/project.handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@ export type ProjectHandlers = {
* @returns An object containing all project-related handlers
*/
export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({
InsertProject: (async (changeset): Promise<void> => {
InsertProject: (async (changeset, txConnection): Promise<void> => {
const { project } = changeset.args;
await repository.insertProject(project);
await repository.insertProject(project, txConnection);
}) satisfies ChangesetHandler<"InsertProject">,

UpdateProject: (async (changeset): Promise<void> => {
UpdateProject: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, project } = changeset.args;
await repository.updateProject({ id: projectId, chainId }, project);
await repository.updateProject({ id: projectId, chainId }, project, txConnection);
}) satisfies ChangesetHandler<"UpdateProject">,

InsertPendingProjectRole: (async (changeset): Promise<void> => {
InsertPendingProjectRole: (async (changeset, txConnection): Promise<void> => {
const { pendingProjectRole } = changeset.args;
await repository.insertPendingProjectRole(pendingProjectRole);
await repository.insertPendingProjectRole(pendingProjectRole, txConnection);
}) satisfies ChangesetHandler<"InsertPendingProjectRole">,

DeletePendingProjectRoles: (async (changeset): Promise<void> => {
DeletePendingProjectRoles: (async (changeset, txConnection): Promise<void> => {
const { ids } = changeset.args;
await repository.deleteManyPendingProjectRoles(ids);
await repository.deleteManyPendingProjectRoles(ids, txConnection);
}) satisfies ChangesetHandler<"DeletePendingProjectRoles">,

InsertProjectRole: (async (changeset): Promise<void> => {
InsertProjectRole: (async (changeset, txConnection): Promise<void> => {
const { projectRole } = changeset.args;
await repository.insertProjectRole(projectRole);
await repository.insertProjectRole(projectRole, txConnection);
}) satisfies ChangesetHandler<"InsertProjectRole">,

DeleteAllProjectRolesByRole: (async (changeset): Promise<void> => {
DeleteAllProjectRolesByRole: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, role } = changeset.args.projectRole;
await repository.deleteManyProjectRoles(chainId, projectId, role);
await repository.deleteManyProjectRoles(chainId, projectId, role, undefined, txConnection);
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">,

DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise<void> => {
DeleteAllProjectRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, projectId, role, address } = changeset.args.projectRole;
await repository.deleteManyProjectRoles(chainId, projectId, role, address);
await repository.deleteManyProjectRoles(chainId, projectId, role, address, txConnection);
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">,
});
44 changes: 28 additions & 16 deletions packages/data-flow/src/data-loader/handlers/round.handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ export type RoundHandlers = {
* @returns An object containing all round-related handlers
*/
export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({
InsertRound: (async (changeset): Promise<void> => {
InsertRound: (async (changeset, txConnection): Promise<void> => {
const { round } = changeset.args;
await repository.insertRound(round);
await repository.insertRound(round, txConnection);
}) satisfies ChangesetHandler<"InsertRound">,

UpdateRound: (async (changeset): Promise<void> => {
UpdateRound: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, round } = changeset.args;
await repository.updateRound({ id: roundId, chainId }, round);
await repository.updateRound({ id: roundId, chainId }, round, txConnection);
}) satisfies ChangesetHandler<"UpdateRound">,

UpdateRoundByStrategyAddress: (async (changeset): Promise<void> => {
UpdateRoundByStrategyAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, strategyAddress, round } = changeset.args;
if (round) {
await repository.updateRound({ strategyAddress, chainId: chainId }, round);
await repository.updateRound(
{ strategyAddress, chainId: chainId },
round,
txConnection,
);
}
}) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">,

IncrementRoundFundedAmount: (async (changeset): Promise<void> => {
IncrementRoundFundedAmount: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args;
await repository.incrementRoundFunds(
{
Expand All @@ -43,37 +47,45 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers
},
fundedAmount,
fundedAmountInUsd,
txConnection,
);
}) satisfies ChangesetHandler<"IncrementRoundFundedAmount">,

IncrementRoundTotalDistributed: (async (changeset): Promise<void> => {
IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, amount } = changeset.args;
await repository.incrementRoundTotalDistributed(
{
chainId,
roundId,
},
amount,
txConnection,
);
}) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">,

InsertPendingRoundRole: (async (changeset): Promise<void> => {
InsertPendingRoundRole: (async (changeset, txConnection): Promise<void> => {
const { pendingRoundRole } = changeset.args;
await repository.insertPendingRoundRole(pendingRoundRole);
await repository.insertPendingRoundRole(pendingRoundRole, txConnection);
}) satisfies ChangesetHandler<"InsertPendingRoundRole">,

DeletePendingRoundRoles: (async (changeset): Promise<void> => {
DeletePendingRoundRoles: (async (changeset, txConnection): Promise<void> => {
const { ids } = changeset.args;
await repository.deleteManyPendingRoundRoles(ids);
await repository.deleteManyPendingRoundRoles(ids, txConnection);
}) satisfies ChangesetHandler<"DeletePendingRoundRoles">,

InsertRoundRole: (async (changeset): Promise<void> => {
InsertRoundRole: (async (changeset, txConnection): Promise<void> => {
const { roundRole } = changeset.args;
await repository.insertRoundRole(roundRole);
await repository.insertRoundRole(roundRole, txConnection);
}) satisfies ChangesetHandler<"InsertRoundRole">,

DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise<void> => {
DeleteAllRoundRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
const { chainId, roundId, role, address } = changeset.args.roundRole;
await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address);
await repository.deleteManyRoundRolesByRoleAndAddress(
chainId,
roundId,
role,
address,
txConnection,
);
}) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">,
});
3 changes: 2 additions & 1 deletion packages/data-flow/src/data-loader/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Changeset } from "@grants-stack-indexer/repository";
import { Changeset, TransactionConnection } from "@grants-stack-indexer/repository";

export type ChangesetHandler<T extends Changeset["type"]> = (
changeset: Extract<Changeset, { type: T }>,
txConnection?: TransactionConnection,
) => Promise<void>;

export type ChangesetHandlers = {
Expand Down
4 changes: 1 addition & 3 deletions packages/data-flow/src/interfaces/dataLoader.interface.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { Changeset } from "@grants-stack-indexer/repository";

import type { ExecutionResult } from "../internal.js";

export interface IDataLoader {
/**
* Applies the changesets to the database.
* @param changesets - The changesets to apply.
* @returns The execution result.
* @throws {InvalidChangeset} if there are changesets with invalid types.
*/
applyChanges(changesets: Changeset[]): Promise<ExecutionResult>;
applyChanges(changesets: Changeset[]): Promise<void>;
}
16 changes: 2 additions & 14 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class Orchestrator {
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
},
this.dependencies.transactionManager,
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
Expand Down Expand Up @@ -145,20 +146,7 @@ export class Orchestrator {
}

const changesets = await this.eventsProcessor.processEvent(event);
const executionResult = await this.dataLoader.applyChanges(changesets);

if (executionResult.numFailed > 0) {
//TODO: should we retry the failed changesets?
this.logger.error(
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(
event,
)}`,
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
}
await this.dataLoader.applyChanges(changesets);
} catch (error: unknown) {
// TODO: improve error handling, retries and notify
if (
Expand Down
13 changes: 2 additions & 11 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class RetroactiveProcessor {
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
},
this.dependencies.transactionManager,
this.logger,
);
}
Expand Down Expand Up @@ -208,17 +209,7 @@ export class RetroactiveProcessor {

event.strategyId = strategyId;
const changesets = await this.eventsProcessor.processEvent(event);
const executionResult = await this.dataLoader.applyChanges(changesets);

if (executionResult.numFailed > 0) {
this.logger.error(
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`,
{
className: RetroactiveProcessor.name,
chainId: this.chainId,
},
);
}
await this.dataLoader.applyChanges(changesets);
} catch (error) {
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
// Expected errors that we can safely ignore
Expand Down
Loading

0 comments on commit 0a645ba

Please sign in to comment.