Skip to content

Commit

Permalink
feat: transaction manager w/callback pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Dec 31, 2024
1 parent 3b294aa commit ea2c124
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 106 deletions.
3 changes: 3 additions & 0 deletions packages/repository/src/external.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ export {

export type { StrategyProcessingCheckpoint, NewStrategyProcessingCheckpoint } from "./internal.js";

export type { ITransactionManager, TransactionConnection } from "./internal.js";
export { KyselyTransactionManager } from "./internal.js";

export { createKyselyPostgresDb as createKyselyDatabase } from "./internal.js";
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { NewApplicationPayout } from "../types/applicationPayout.types.js";
import { TransactionConnection } from "../types/transaction.types.js";

export interface IApplicationPayoutRepository {
export interface IApplicationPayoutRepository<
TxConnection extends TransactionConnection = TransactionConnection,
> {
/**
* Inserts a new application payout into the database.
* @param applicationPayout - The new application payout to insert.
* @param tx Optional transaction connection
* @returns A promise that resolves when the application payout is inserted.
*/
insertApplicationPayout(applicationPayout: NewApplicationPayout): Promise<void>;
insertApplicationPayout(
applicationPayout: NewApplicationPayout,
tx?: TxConnection,
): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Address, ChainId } from "@grants-stack-indexer/shared";

import { Application, NewApplication, PartialApplication } from "../types/application.types.js";
import { TransactionConnection } from "../types/transaction.types.js";

export interface IApplicationReadRepository {
/**
Expand Down Expand Up @@ -65,22 +66,27 @@ export interface IApplicationReadRepository {
getApplicationsByRoundId(chainId: ChainId, roundId: string): Promise<Application[]>;
}

export interface IApplicationRepository extends IApplicationReadRepository {
export interface IApplicationRepository<
TxConnection extends TransactionConnection = TransactionConnection,
> extends IApplicationReadRepository {
/**
* Inserts a new application into the repository.
* @param application The new application to insert.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertApplication(application: NewApplication): Promise<void>;
insertApplication(application: NewApplication, tx?: TxConnection): Promise<void>;

/**
* Updates an existing application in the repository.
* @param where An object containing the (id, chainId, and roundId) of the application to update.
* @param application The partial application data to update.
* @param tx Optional transaction connection
* @returns A promise that resolves when the update is complete.
*/
updateApplication(
where: { id: string; chainId: ChainId; roundId: string },
application: PartialApplication,
tx?: TxConnection,
): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import { NewDonation } from "../internal.js";
import { TransactionConnection } from "../types/transaction.types.js";

export interface IDonationRepository {
export interface IDonationRepository<
TxConnection extends TransactionConnection = TransactionConnection,
> {
/**
* Insert a single donation
* @param donation The donation to insert
* @param tx Optional transaction connection
* @returns A promise that resolves when the donation is inserted
*/
insertDonation(donation: NewDonation): Promise<void>;
insertDonation(donation: NewDonation, tx?: TxConnection): Promise<void>;

/**
* Insert many donations
* @param donations The donations to insert
* @param tx Optional transaction connection
* @returns A promise that resolves when the donations are inserted
*/
insertManyDonations(donations: NewDonation[]): Promise<void>;
insertManyDonations(donations: NewDonation[], tx?: TxConnection): Promise<void>;
}
1 change: 1 addition & 0 deletions packages/repository/src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from "./applicationPayoutRepository.interface.js";
export * from "./strategyRepository.interface.js";
export * from "./eventsRepository.interface.js";
export * from "./strategyProcessingCheckpointRepository.interface.js";
export * from "./transactionManager.interface.js";
29 changes: 23 additions & 6 deletions packages/repository/src/interfaces/projectRepository.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
Project,
ProjectRoleNames,
} from "../types/project.types.js";
import { TransactionConnection } from "../types/transaction.types.js";

export interface IProjectReadRepository {
/**
Expand Down Expand Up @@ -67,55 +68,71 @@ export interface IProjectReadRepository {
getProjectByAnchorOrThrow(chainId: ChainId, anchorAddress: Address): Promise<Project>;
}

export interface IProjectRepository extends IProjectReadRepository {
export interface IProjectRepository<
TxConnection extends TransactionConnection = TransactionConnection,
> extends IProjectReadRepository {
/**
* Inserts a new project into the repository.
* @param project The new project to be inserted.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertProject(project: NewProject): Promise<void>;
insertProject(project: NewProject, tx?: TxConnection): Promise<void>;

/**
* Updates an existing project in the repository.
* @param where An object containing the id and chainId to identify the project to update.
* @param project The partial project data to update.
* @param tx Optional transaction connection
* @returns A promise that resolves when the update is complete.
*/
updateProject(where: { id: string; chainId: ChainId }, project: PartialProject): Promise<void>;
updateProject(
where: { id: string; chainId: ChainId },
project: PartialProject,
tx?: TxConnection,
): Promise<void>;

/**
* Inserts a new project role into the repository.
* @param projectRole The new project role to be inserted.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertProjectRole(projectRole: NewProjectRole): Promise<void>;
insertProjectRole(projectRole: NewProjectRole, tx?: TxConnection): Promise<void>;

/**
* Deletes multiple project roles based on the provided criteria.
* @param chainId The chain ID of the project roles to delete.
* @param projectId The project ID of the roles to delete.
* @param role The role type to delete.
* @param address Optional address to further filter the roles to delete.
* @param tx Optional transaction connection
* @returns A promise that resolves when the deletion is complete.
*/
deleteManyProjectRoles(
chainId: ChainId,
projectId: string,
role: ProjectRoleNames,
address?: Address,
tx?: TxConnection,
): Promise<void>;

/**
* Inserts a new pending project role into the repository.
* @param pendingProjectRole The new pending project role to be inserted.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertPendingProjectRole(pendingProjectRole: NewPendingProjectRole): Promise<void>;
insertPendingProjectRole(
pendingProjectRole: NewPendingProjectRole,
tx?: TxConnection,
): Promise<void>;

/**
* Deletes multiple pending project roles based on their IDs.
* @param ids An array of IDs of the pending project roles to delete.
* @param tx Optional transaction connection
* @returns A promise that resolves when the deletion is complete.
*/
deleteManyPendingProjectRoles(ids: number[]): Promise<void>;
deleteManyPendingProjectRoles(ids: number[], tx?: TxConnection): Promise<void>;
}
25 changes: 20 additions & 5 deletions packages/repository/src/interfaces/roundRepository.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
RoundRole,
RoundRoleNames,
} from "../types/round.types.js";
import { TransactionConnection } from "../types/transaction.types.js";

export interface IRoundReadRepository {
/**
Expand Down Expand Up @@ -94,30 +95,36 @@ export interface IRoundReadRepository {
getPendingRoundRoles(chainId: ChainId, role: RoundRoleNames): Promise<PendingRoundRole[]>;
}

export interface IRoundRepository extends IRoundReadRepository {
export interface IRoundRepository<
TxConnection extends TransactionConnection = TransactionConnection,
> extends IRoundReadRepository {
/**
* Inserts a new round into the repository.
* @param round The new round to insert.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertRound(round: NewRound): Promise<void>;
insertRound(round: NewRound, tx?: TxConnection): Promise<void>;

/**
* Updates an existing round in the repository.
* @param where An object containing the id and chainId of the round to update.
* @param round The partial round data to update.
* @param tx Optional transaction connection
* @returns A promise that resolves when the update is complete.
*/
updateRound(
where: { id: string; chainId: ChainId } | { chainId: ChainId; strategyAddress: Address },
round: PartialRound,
tx?: TxConnection,
): Promise<void>;

/**
* Increments the funds for a specific round.
* @param where An object containing the chainId and roundId of the round to update.
* @param amount The amount to increment by.
* @param amountInUsd The amount in USD to increment by.
* @param tx Optional transaction connection
* @returns A promise that resolves when the increment is complete.
*/
incrementRoundFunds(
Expand All @@ -127,12 +134,14 @@ export interface IRoundRepository extends IRoundReadRepository {
},
amount: bigint,
amountInUsd: string,
tx?: TxConnection,
): Promise<void>;

/**
* Increments the total distributed amount for a specific round.
* @param where An object containing the chainId and roundId of the round to update.
* @param amount The amount to increment by.
* @param tx Optional transaction connection
* @returns A promise that resolves when the increment is complete.
*/
incrementRoundTotalDistributed(
Expand All @@ -141,41 +150,47 @@ export interface IRoundRepository extends IRoundReadRepository {
roundId: string;
},
amount: bigint,
tx?: TxConnection,
): Promise<void>;

/**
* Inserts a new round role into the repository.
* @param roundRole The new round role to insert.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertRoundRole(roundRole: NewRoundRole): Promise<void>;
insertRoundRole(roundRole: NewRoundRole, tx?: TxConnection): Promise<void>;

/**
* Deletes multiple round roles based on chain ID, round ID, role, and address.
* @param chainId The chain ID of the roles to delete.
* @param roundId The round ID of the roles to delete.
* @param role The role name of the roles to delete.
* @param address The address associated with the roles to delete.
* @param tx Optional transaction connection
* @returns A promise that resolves when the deletion is complete.
*/
deleteManyRoundRolesByRoleAndAddress(
chainId: ChainId,
roundId: string,
role: RoundRoleNames,
address: Address,
tx?: TxConnection,
): Promise<void>;

/**
* Inserts a new pending round role into the repository.
* @param pendingRoundRole The new pending round role to insert.
* @param tx Optional transaction connection
* @returns A promise that resolves when the insertion is complete.
*/
insertPendingRoundRole(pendingRoundRole: NewPendingRoundRole): Promise<void>;
insertPendingRoundRole(pendingRoundRole: NewPendingRoundRole, tx?: TxConnection): Promise<void>;

/**
* Deletes multiple pending round roles by their IDs.
* @param ids An array of IDs of the pending round roles to delete.
* @param tx Optional transaction connection
* @returns A promise that resolves when the deletion is complete.
*/
deleteManyPendingRoundRoles(ids: number[]): Promise<void>;
deleteManyPendingRoundRoles(ids: number[], tx?: TxConnection): Promise<void>;
}
20 changes: 20 additions & 0 deletions packages/repository/src/interfaces/transactionManager.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { TransactionConnection } from "../internal.js";

/**
* The ITransactionManager interface provides a generic transaction management solution using a callback pattern.
*
* The generic type parameter TxConnection extends TransactionConnection to allow for different transaction
* connection implementations while maintaining type safety.
*/
export interface ITransactionManager<
TxConnection extends TransactionConnection = TransactionConnection,
> {
/*
* Provides a transaction connection to the given function.
* If the function throws an error, the transaction will be rolled back.
* If the function returns a promise, the transaction will be committed after the promise is resolved.
*
* Note: only DB calls that use the provided transaction connection will be executed in the transaction.
*/
runInTransaction<T>(fn: (tx: TxConnection) => Promise<T>): Promise<T>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
PartialApplication,
} from "../../internal.js";

export class KyselyApplicationRepository implements IApplicationRepository {
export class KyselyApplicationRepository implements IApplicationRepository<Kysely<Database>> {
constructor(
private readonly db: Kysely<Database>,
private readonly schemaName: string,
Expand Down Expand Up @@ -96,25 +96,23 @@ export class KyselyApplicationRepository implements IApplicationRepository {
}

/* @inheritdoc */
async insertApplication(application: NewApplication): Promise<void> {
async insertApplication(application: NewApplication, tx?: Kysely<Database>): Promise<void> {
const _application = this.formatApplication(application);
const queryBuilder = (tx || this.db).withSchema(this.schemaName);

await this.db
.withSchema(this.schemaName)
.insertInto("applications")
.values(_application)
.execute();
await queryBuilder.insertInto("applications").values(_application).execute();
}

/* @inheritdoc */
async updateApplication(
where: { id: string; chainId: ChainId; roundId: string },
application: PartialApplication,
tx?: Kysely<Database>,
): Promise<void> {
const _application = this.formatApplication(application);
const queryBuilder = (tx || this.db).withSchema(this.schemaName);

await this.db
.withSchema(this.schemaName)
await queryBuilder
.updateTable("applications")
.set(_application)
.where("id", "=", where.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ import { Kysely } from "kysely";

import { Database, IApplicationPayoutRepository, NewApplicationPayout } from "../../internal.js";

export class KyselyApplicationPayoutRepository implements IApplicationPayoutRepository {
export class KyselyApplicationPayoutRepository
implements IApplicationPayoutRepository<Kysely<Database>>
{
constructor(
private readonly db: Kysely<Database>,
private readonly schemaName: string,
) {}

/** @inheritdoc */
async insertApplicationPayout(applicationPayout: NewApplicationPayout): Promise<void> {
await this.db
.withSchema(this.schemaName)
.insertInto("applicationsPayouts")
.values(applicationPayout)
.execute();
async insertApplicationPayout(
applicationPayout: NewApplicationPayout,
tx?: Kysely<Database>,
): Promise<void> {
const queryBuilder = (tx || this.db).withSchema(this.schemaName);
await queryBuilder.insertInto("applicationsPayouts").values(applicationPayout).execute();
}
}
Loading

0 comments on commit ea2c124

Please sign in to comment.