Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Integrate pg-boss for job scheduling #478

Closed
3 changes: 2 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
"passport-github2": "^0.1.12",
"passport-gitlab2": "^5.0.0",
"passport-google-oauth20": "^2.0.0",
"pg-boss": "^10.1.4",
"redis": "^4.6.13",
"rxjs": "^7.8.1",
"socket.io": "^4.7.5",
"uuid": "^9.0.1"
},
"devDependencies": {
"reflect-metadata": "^0.2.2",
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
Expand All @@ -67,6 +67,7 @@
"jest-mock-extended": "^3.0.5",
"prettier": "^3.0.0",
"prisma": "5.19.1",
"reflect-metadata": "^0.2.2",
"source-map-support": "^0.5.21",
"supertest": "^6.3.3",
"ts-jest": "^29.1.0",
Expand Down
62 changes: 62 additions & 0 deletions apps/api/src/common/job-handler.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { PG_BOSS } from '@/provider/pgboss.provider'
import { Inject, Injectable, LoggerService } from '@nestjs/common'
import PgBoss from 'pg-boss'

@Injectable()
export class JobHandlerService {
private readonly logger: LoggerService

constructor(
@Inject(PG_BOSS) private boss: PgBoss,
@Inject('Logger') logger: LoggerService
) {
this.logger = logger
}

async registerJob<T, V>(
queue: string,
callback: (job: V) => Promise<T>
): Promise<void> {
try {
await this.boss.work(queue, async ([job]) => {
try {
const jobData = job.data as V
await callback(jobData)
await this.boss.complete(queue, job.id)
} catch (error) {
this.logger.error(`Error processing job in queue ${queue}`)
throw new Error(`Error processing job in queue ${queue}`)
}
})
this.logger.log(`Registered job handler for queue ${queue}`)
} catch (error) {
this.logger.error(`Error registering job handler for queue ${queue}`)
throw new Error(`Error registering job handler for queue ${queue}`)
}
}

async scheduleJob<V extends object>(
name: string,
cron: string,
data: V,
options?: PgBoss.ScheduleOptions
): Promise<void> {
try {
const jobId = await this.boss.schedule(name, cron, data, options)
this.logger.log(`Scheduled job in queue ${name} with ID: ${jobId}`)
} catch (error) {
this.logger.error(`Error scheduling job in queue ${name}`)
throw new Error(`Error scheduling job in queue ${name}: ${error.message}`)
}
}

async stop(): Promise<void> {
try {
await this.boss.stop()
this.logger.log('PgBoss stopped successfully')
} catch (error) {
this.logger.error(`Error stopping PgBoss: ${error.message}`)
throw new Error(`Error stopping PgBoss: ${error.message}`)
}
}
}
15 changes: 15 additions & 0 deletions apps/api/src/provider/pgboss.provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// pgboss.provider.ts
import { Provider } from '@nestjs/common'
import PgBoss from 'pg-boss'

export const PG_BOSS = 'PG_BOSS'

export const PgBossProvider: Provider = {
provide: PG_BOSS,
useFactory: async () => {
const connectionString = process.env.DATABASE_CONNECTION_STRING
const boss = new PgBoss(connectionString)
await boss.start()
return boss
}
}
7 changes: 6 additions & 1 deletion apps/api/src/provider/provider.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Global, Module } from '@nestjs/common'
import { REDIS_CLIENT, RedisProvider } from './redis.provider'
import { MINIO_CLIENT, MinioProvider } from './minio.provider'
import { PG_BOSS, PgBossProvider } from './pgboss.provider'

@Global()
@Module({
Expand All @@ -12,8 +13,12 @@ import { MINIO_CLIENT, MinioProvider } from './minio.provider'
{
provide: MINIO_CLIENT,
useValue: MinioProvider
},
{
provide: PG_BOSS,
useValue: PgBossProvider
}
],
providers: [RedisProvider, MinioProvider]
providers: [RedisProvider, MinioProvider, PgBossProvider]
})
export class ProviderModule {}
4 changes: 4 additions & 0 deletions apps/api/src/secret/controller/secret.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { RedisClientType } from 'redis'
import { ProviderModule } from '@/provider/provider.module'
import { AuthorityCheckerService } from '@/common/authority-checker.service'
import { CommonModule } from '@/common/common.module'
import { PG_BOSS } from '@/provider/pgboss.provider'
import PgBoss from 'pg-boss'

describe('SecretController', () => {
let controller: SecretController
Expand All @@ -32,6 +34,8 @@ describe('SecretController', () => {
.useValue(mockDeep<RedisClientType>())
.overrideProvider(PrismaService)
.useValue(mockDeep<PrismaService>())
.overrideProvider(PG_BOSS)
.useValue(mockDeep<PgBoss>())
.compile()

controller = module.get<SecretController>(SecretController)
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/secret/service/secret.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { RedisClientType } from 'redis'
import { ProviderModule } from '@/provider/provider.module'
import { AuthorityCheckerService } from '@/common/authority-checker.service'
import { CommonModule } from '@/common/common.module'
import { PG_BOSS } from '@/provider/pgboss.provider'
import PgBoss from 'pg-boss'

describe('SecretService', () => {
let service: SecretService
Expand All @@ -30,6 +32,8 @@ describe('SecretService', () => {
.useValue(mockDeep<RedisClientType>())
.overrideProvider(PrismaService)
.useValue(mockDeep<PrismaService>())
.overrideProvider(PG_BOSS)
.useValue(mockDeep<PgBoss>())
.compile()

service = module.get<SecretService>(SecretService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { mockDeep } from 'jest-mock-extended'
import { ProviderModule } from '@/provider/provider.module'
import { AuthorityCheckerService } from '@/common/authority-checker.service'
import { CommonModule } from '@/common/common.module'
import { PG_BOSS } from '@/provider/pgboss.provider'
import PgBoss from 'pg-boss'

describe('VariableController', () => {
let controller: VariableController
Expand All @@ -30,6 +32,8 @@ describe('VariableController', () => {
})
.overrideProvider(REDIS_CLIENT)
.useValue(mockDeep<RedisClientType>())
.overrideProvider(PG_BOSS)
.useValue(mockDeep<PgBoss>())
.compile()

controller = module.get<VariableController>(VariableController)
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/variable/service/variable.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { mockDeep } from 'jest-mock-extended'
import { ProviderModule } from '@/provider/provider.module'
import { AuthorityCheckerService } from '@/common/authority-checker.service'
import { CommonModule } from '@/common/common.module'
import { PG_BOSS } from '@/provider/pgboss.provider'
import PgBoss from 'pg-boss'

describe('VariableService', () => {
let service: VariableService
Expand All @@ -28,6 +30,8 @@ describe('VariableService', () => {
})
.overrideProvider(REDIS_CLIENT)
.useValue(mockDeep<RedisClientType>())
.overrideProvider(PG_BOSS)
.useValue(mockDeep<PgBoss>())
.compile()

service = module.get<VariableService>(VariableService)
Expand Down
Loading
Loading