diff --git a/api/src/modules/api-events/api-events.module.ts b/api/src/modules/api-events/api-events.module.ts index f1f22773a..95222b92c 100644 --- a/api/src/modules/api-events/api-events.module.ts +++ b/api/src/modules/api-events/api-events.module.ts @@ -1,4 +1,4 @@ -import { Logger, Module } from '@nestjs/common'; +import { Global, Logger, Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { ApiEvent } from 'modules/api-events/api-event.entity'; @@ -11,6 +11,7 @@ import { ApiEventsService } from 'modules/api-events/api-events.service'; export const logger: Logger = new Logger('ApiEvents'); +@Global() @Module({ imports: [ TypeOrmModule.forFeature([ diff --git a/api/src/modules/events/app-events.module.ts b/api/src/modules/events/app-events.module.ts index cf197c191..02e15bd16 100644 --- a/api/src/modules/events/app-events.module.ts +++ b/api/src/modules/events/app-events.module.ts @@ -19,6 +19,7 @@ import { ImportProgressTrackerFactory } from 'modules/events/import-data-progres ImportProgressEmitter, ImportProgressTrackerFactory, ImportProgressSocket, + CqrsModule, ], }) export class AppEventsModule {} diff --git a/api/src/modules/events/import-data-events/import-data.event-handler.ts b/api/src/modules/events/import-data-events/import-data.event-handler.ts index 9bc53e48f..7781140e5 100644 --- a/api/src/modules/events/import-data-events/import-data.event-handler.ts +++ b/api/src/modules/events/import-data-events/import-data.event-handler.ts @@ -4,9 +4,9 @@ import { ApiEventsService } from 'modules/api-events/api-events.service'; import { Task } from 'modules/tasks/task.entity'; export enum IMPORT_DATA_EVENTS { - started = API_EVENT_KINDS.data__importStarted__v1alpha1, - failed = API_EVENT_KINDS.data__importFailed__v1alpha1, - succeeded = API_EVENT_KINDS.data__importSucceeded__v1alpha1, + STARTED = API_EVENT_KINDS.data__importStarted__v1alpha1, + FAILED = API_EVENT_KINDS.data__importFailed__v1alpha1, + SUCCEED = API_EVENT_KINDS.data__importSucceeded__v1alpha1, } export class ImportDataEvent implements IEvent { diff --git a/api/src/modules/import-data/import-data.module.ts b/api/src/modules/import-data/import-data.module.ts index 6c62a9537..5af773ed8 100644 --- a/api/src/modules/import-data/import-data.module.ts +++ b/api/src/modules/import-data/import-data.module.ts @@ -25,6 +25,7 @@ import { ImportMailService } from 'modules/import-data/import-mail/import-mail.s import { NotificationsModule } from 'modules/notifications/notifications.module'; import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validation/excel-validator.service'; import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcing-data.db-cleaner'; +import { ImportDataEventHandler } from '../events/import-data-events/import-data.event-handler'; // TODO: Move EUDR related stuff to EUDR modules @@ -67,6 +68,7 @@ import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcin ImportMailService, ExcelValidatorService, SourcingDataDbCleaner, + ImportDataEventHandler, { provide: 'FILE_UPLOAD_SIZE_LIMIT', useValue: config.get('fileUploads.sizeLimit'), diff --git a/api/src/modules/import-data/workers/import-data.consumer.ts b/api/src/modules/import-data/workers/import-data.consumer.ts index a3ac36e74..8df217105 100644 --- a/api/src/modules/import-data/workers/import-data.consumer.ts +++ b/api/src/modules/import-data/workers/import-data.consumer.ts @@ -14,6 +14,11 @@ import { Task, TASK_STATUS, TASK_TYPE } from 'modules/tasks/task.entity'; import { importQueueName } from 'modules/import-data/workers/import-queue.name'; import { ImportProgressSocket } from 'modules/events/import-data-progress/import-progress.socket'; import { ImportMailService } from 'modules/import-data/import-mail/import-mail.service'; +import { EventBus } from '@nestjs/cqrs'; +import { + IMPORT_DATA_EVENTS, + ImportDataEvent, +} from '../../events/import-data-events/import-data.event-handler'; @Processor(importQueueName) export class ImportDataConsumer { @@ -24,6 +29,7 @@ export class ImportDataConsumer { public readonly tasksService: TasksService, public readonly importSocket: ImportProgressSocket, public readonly importMail: ImportMailService, + public readonly eventBus: EventBus, ) {} @OnQueueError() @@ -33,7 +39,7 @@ export class ImportDataConsumer { ); } - // TODO: Handle events finished and failed cases + // TODO: we probably want to handle success and failures using CQRS @OnQueueFailed() async onJobFailed(job: Job, err: any): Promise { @@ -47,11 +53,16 @@ export class ImportDataConsumer { newErrors: err.validationErrors, }); this.importSocket.emitImportFailureToSocket({ error: err }); + this.eventBus.publish( + new ImportDataEvent(task.id, IMPORT_DATA_EVENTS.FAILED, err), + ); this.logger.error( `Import Failed for file: ${job.data.xlsxFileData.filename} for task: ${task.id}: ${err}`, ); + // TODO: If the error is not related to the file, we should not send an error report (as it will be empty), we should send a generic error message + const errorReport: string = await this.tasksService.getTaskErrorReport( task.id, { @@ -75,6 +86,12 @@ export class ImportDataConsumer { }); this.importSocket.emitImportCompleteToSocket({ status: 'completed' }); + this.eventBus.publish( + new ImportDataEvent(task.id, IMPORT_DATA_EVENTS.SUCCEED, { + file: job.data.xlsxFileData.originalname, + userId: task.user.id, + }), + ); await this.importMail.sendImportSuccessMail({ email: task.user.email, fileName: job.data.xlsxFileData.originalname, @@ -87,7 +104,12 @@ export class ImportDataConsumer { @Process('excel-import-job') async readImportDataJob(job: Job): Promise { - await this.importDataService.processImportJob(job); + this.eventBus.publish( + new ImportDataEvent(job.data.taskId, IMPORT_DATA_EVENTS.STARTED, { + file: job.data.xlsxFileData.originalname, + }), + ); + return this.importDataService.processImportJob(job); } private isJobStalled(err: Error): boolean {