Skip to content

Commit

Permalink
add api events for import
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed Dec 19, 2024
1 parent 0b73d00 commit f3ca811
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
3 changes: 2 additions & 1 deletion api/src/modules/api-events/api-events.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, Module } from '@nestjs/common';
import { Global, Logger, Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';

import { ApiEvent } from 'modules/api-events/api-event.entity';
Expand All @@ -11,6 +11,7 @@ import { ApiEventsService } from 'modules/api-events/api-events.service';

export const logger: Logger = new Logger('ApiEvents');

@Global()
@Module({
imports: [
TypeOrmModule.forFeature([
Expand Down
1 change: 1 addition & 0 deletions api/src/modules/events/app-events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { ImportProgressTrackerFactory } from 'modules/events/import-data-progres
ImportProgressEmitter,
ImportProgressTrackerFactory,
ImportProgressSocket,
CqrsModule,
],
})
export class AppEventsModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { ApiEventsService } from 'modules/api-events/api-events.service';
import { Task } from 'modules/tasks/task.entity';

export enum IMPORT_DATA_EVENTS {
started = API_EVENT_KINDS.data__importStarted__v1alpha1,
failed = API_EVENT_KINDS.data__importFailed__v1alpha1,
succeeded = API_EVENT_KINDS.data__importSucceeded__v1alpha1,
STARTED = API_EVENT_KINDS.data__importStarted__v1alpha1,
FAILED = API_EVENT_KINDS.data__importFailed__v1alpha1,
SUCCEED = API_EVENT_KINDS.data__importSucceeded__v1alpha1,
}

export class ImportDataEvent implements IEvent {
Expand Down
2 changes: 2 additions & 0 deletions api/src/modules/import-data/import-data.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ImportMailService } from 'modules/import-data/import-mail/import-mail.s
import { NotificationsModule } from 'modules/notifications/notifications.module';
import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validation/excel-validator.service';
import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcing-data.db-cleaner';
import { ImportDataEventHandler } from '../events/import-data-events/import-data.event-handler';

// TODO: Move EUDR related stuff to EUDR modules

Expand Down Expand Up @@ -67,6 +68,7 @@ import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcin
ImportMailService,
ExcelValidatorService,
SourcingDataDbCleaner,
ImportDataEventHandler,
{
provide: 'FILE_UPLOAD_SIZE_LIMIT',
useValue: config.get('fileUploads.sizeLimit'),
Expand Down
26 changes: 24 additions & 2 deletions api/src/modules/import-data/workers/import-data.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import { Task, TASK_STATUS, TASK_TYPE } from 'modules/tasks/task.entity';
import { importQueueName } from 'modules/import-data/workers/import-queue.name';
import { ImportProgressSocket } from 'modules/events/import-data-progress/import-progress.socket';
import { ImportMailService } from 'modules/import-data/import-mail/import-mail.service';
import { EventBus } from '@nestjs/cqrs';
import {
IMPORT_DATA_EVENTS,
ImportDataEvent,
} from '../../events/import-data-events/import-data.event-handler';

@Processor(importQueueName)
export class ImportDataConsumer {
Expand All @@ -24,6 +29,7 @@ export class ImportDataConsumer {
public readonly tasksService: TasksService,
public readonly importSocket: ImportProgressSocket,
public readonly importMail: ImportMailService,
public readonly eventBus: EventBus,
) {}

@OnQueueError()
Expand All @@ -33,7 +39,7 @@ export class ImportDataConsumer {
);
}

// TODO: Handle events finished and failed cases
// TODO: we probably want to handle success and failures using CQRS

@OnQueueFailed()
async onJobFailed(job: Job<ExcelImportJob>, err: any): Promise<void> {
Expand All @@ -47,11 +53,16 @@ export class ImportDataConsumer {
newErrors: err.validationErrors,
});
this.importSocket.emitImportFailureToSocket({ error: err });
this.eventBus.publish(
new ImportDataEvent(task.id, IMPORT_DATA_EVENTS.FAILED, err),
);

this.logger.error(
`Import Failed for file: ${job.data.xlsxFileData.filename} for task: ${task.id}: ${err}`,
);

// TODO: If the error is not related to the file, we should not send an error report (as it will be empty), we should send a generic error message

const errorReport: string = await this.tasksService.getTaskErrorReport(
task.id,
{
Expand All @@ -75,6 +86,12 @@ export class ImportDataConsumer {
});

this.importSocket.emitImportCompleteToSocket({ status: 'completed' });
this.eventBus.publish(
new ImportDataEvent(task.id, IMPORT_DATA_EVENTS.SUCCEED, {
file: job.data.xlsxFileData.originalname,
userId: task.user.id,
}),
);
await this.importMail.sendImportSuccessMail({
email: task.user.email,
fileName: job.data.xlsxFileData.originalname,
Expand All @@ -87,7 +104,12 @@ export class ImportDataConsumer {

@Process('excel-import-job')
async readImportDataJob(job: Job<ExcelImportJob>): Promise<void> {
await this.importDataService.processImportJob(job);
this.eventBus.publish(
new ImportDataEvent(job.data.taskId, IMPORT_DATA_EVENTS.STARTED, {
file: job.data.xlsxFileData.originalname,
}),
);
return this.importDataService.processImportJob(job);
}

private isJobStalled(err: Error): boolean {
Expand Down

0 comments on commit f3ca811

Please sign in to comment.