From 41ff0d7c1ad4b402f771053f9dde1961759177f1 Mon Sep 17 00:00:00 2001 From: empassaro <113031808+empassaro@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:01:18 +0200 Subject: [PATCH] [SELC-5034] feature: added logic to discriminate event type in cdc module (#288) Co-authored-by: andrea-putzu --- .../onboarding/event/NotificationService.java | 61 ++++++++++++ .../event/OnboardingCdcService.java | 38 ++------ .../event/entity/util/QueueEvent.java | 6 ++ .../main/openapi/onboarding_functions.json | 11 +++ .../src/main/resources/application.properties | 1 + .../event/NotificationServiceTest.java | 92 +++++++++++++++++++ .../functions/NotificationFunctions.java | 6 +- .../onboarding-cdc/env/dev/terraform.tfvars | 4 + .../onboarding-cdc/env/prod/terraform.tfvars | 4 + .../onboarding-cdc/env/uat/terraform.tfvars | 4 + 10 files changed, 198 insertions(+), 29 deletions(-) create mode 100644 apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/NotificationService.java create mode 100644 apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/entity/util/QueueEvent.java create mode 100644 apps/onboarding-cdc/src/test/java/it/pagopa/selfcare/onboarding/event/NotificationServiceTest.java diff --git a/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/NotificationService.java b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/NotificationService.java new file mode 100644 index 000000000..d0602b8fa --- /dev/null +++ b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/NotificationService.java @@ -0,0 +1,61 @@ +package it.pagopa.selfcare.onboarding.event; + +import io.smallrye.mutiny.Uni; +import it.pagopa.selfcare.onboarding.event.entity.Onboarding; +import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent; +import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi; +import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Objects; + +@ApplicationScoped +public class NotificationService { + @Inject + @RestClient + NotificationsApi notificationsApi; + private final OnboardingMapper onboardingMapper; + private final Integer retryMinBackOff; + private final Integer retryMaxBackOff; + private final Integer maxRetry; + private final Integer minutesThresholdForUpdateNotification; + + public NotificationService(OnboardingMapper onboardingMapper, + @ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff, + @ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff, + @ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry, + @ConfigProperty(name = "onboarding-cdc.minutes-threshold-for-update-notification") Integer minutesThresholdForUpdateNotification) { + this.onboardingMapper = onboardingMapper; + this.retryMinBackOff = retryMinBackOff; + this.retryMaxBackOff = retryMaxBackOff; + this.maxRetry = maxRetry; + this.minutesThresholdForUpdateNotification = minutesThresholdForUpdateNotification; + } + + public Uni invokeNotificationApi(Onboarding onboarding) { + assert onboarding != null; + QueueEvent queueEvent = determineEventType(onboarding); + return notificationsApi.apiNotificationPost(queueEvent.name(), onboardingMapper.toEntity(onboarding)) + .onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry); + } + + private QueueEvent determineEventType(Onboarding onboarding) { + return switch (onboarding.getStatus()) { + case COMPLETED -> (isOverUpdateThreshold(onboarding.getUpdatedAt(), onboarding.getActivatedAt())) ? QueueEvent.UPDATE : QueueEvent.ADD; + case DELETED -> QueueEvent.UPDATE; + default -> throw new IllegalArgumentException("Onboarding status not supported"); + }; + } + + private boolean isOverUpdateThreshold(LocalDateTime updatedAt, LocalDateTime activatedAt) { + return Objects.nonNull(updatedAt) + && Objects.nonNull(activatedAt) + && updatedAt.isAfter(activatedAt.plusMinutes(minutesThresholdForUpdateNotification)); + } +} diff --git a/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/OnboardingCdcService.java b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/OnboardingCdcService.java index e054386b4..cbefb41e4 100644 --- a/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/OnboardingCdcService.java +++ b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/OnboardingCdcService.java @@ -17,32 +17,24 @@ import io.smallrye.mutiny.Multi; import it.pagopa.selfcare.onboarding.event.constant.CdcStartAtConstant; import it.pagopa.selfcare.onboarding.event.entity.Onboarding; -import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; import lombok.extern.slf4j.Slf4j; import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.eclipse.microprofile.rest.client.inject.RestClient; -import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi; -import java.time.Duration; import java.util.*; import static com.mongodb.client.model.Projections.fields; import static com.mongodb.client.model.Projections.include; +import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.COMPLETED; +import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.DELETED; import static java.util.Arrays.asList; @Startup @Slf4j @ApplicationScoped public class OnboardingCdcService { - - @Inject - @RestClient - NotificationsApi notificationsApi; - private final OnboardingMapper onboardingMapper; private static final String COLLECTION_NAME = "onboardings"; private static final String OPERATION_NAME = "ONBOARDING-CDC-OnboardingsUpdate"; private static final String EVENT_NAME = "ONBOARDING-CDC"; @@ -52,25 +44,18 @@ public class OnboardingCdcService { private final TableClient tableClient; private final String mongodbDatabase; private final ReactiveMongoClient mongoClient; - private final Integer retryMinBackOff; - private final Integer retryMaxBackOff; - private final Integer maxRetry; + private final NotificationService notificationService; - public OnboardingCdcService(OnboardingMapper onboardingMapper, ReactiveMongoClient mongoClient, + public OnboardingCdcService(ReactiveMongoClient mongoClient, @ConfigProperty(name = "quarkus.mongodb.database") String mongodbDatabase, - @ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff, - @ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff, - @ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry, TelemetryClient telemetryClient, - TableClient tableClient) { - this.onboardingMapper = onboardingMapper; + TableClient tableClient, + NotificationService notificationService) { this.mongoClient = mongoClient; this.mongodbDatabase = mongodbDatabase; - this.maxRetry = maxRetry; - this.retryMaxBackOff = retryMaxBackOff; - this.retryMinBackOff = retryMinBackOff; this.telemetryClient = telemetryClient; this.tableClient = tableClient; + this.notificationService = notificationService; telemetryClient.getContext().getOperation().setName(OPERATION_NAME); initOrderStream(); } @@ -87,7 +72,7 @@ private void initOrderStream() { if (Objects.nonNull(cdcStartAtEntity)) resumeToken = (String) cdcStartAtEntity.getProperty(CdcStartAtConstant.CDC_START_AT_PROPERTY); } catch (TableServiceException e) { - log.warn("Table StarAt not found, it is starting from now ..."); + log.warn("Table StartAt not found, it is starting from now ..."); } } @@ -100,7 +85,7 @@ private void initOrderStream() { Bson match = Aggregates.match(Filters.and( Filters.in("operationType", asList("update", "replace", "insert")), - Filters.in("fullDocument.status", Arrays.asList("COMPLETED", "DELETED")))); + Filters.in("fullDocument.status", Arrays.asList(COMPLETED.name(), DELETED.name())))); Bson project = Aggregates.project(fields(include("_id", "ns", "documentKey", "fullDocument"))); List pipeline = Arrays.asList(match, project); @@ -123,16 +108,13 @@ private ReactiveMongoCollection getCollection() { } protected void consumerOnboardingEvent(ChangeStreamDocument document) { - - assert document.getFullDocument() != null; assert document.getDocumentKey() != null; log.info("Starting consumerOnboardingEvent ... "); log.info("Sending Onboarding notification having id {}", document.getFullDocument().getId()); - notificationsApi.apiNotificationPost(onboardingMapper.toEntity(document.getFullDocument())) - .onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry) + notificationService.invokeNotificationApi(document.getFullDocument()) .subscribe().with( result -> { log.info("Onboarding notification having id: {} successfully sent", document.getDocumentKey().toJson()); diff --git a/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/entity/util/QueueEvent.java b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/entity/util/QueueEvent.java new file mode 100644 index 000000000..319a91480 --- /dev/null +++ b/apps/onboarding-cdc/src/main/java/it/pagopa/selfcare/onboarding/event/entity/util/QueueEvent.java @@ -0,0 +1,6 @@ +package it.pagopa.selfcare.onboarding.event.entity.util; + +public enum QueueEvent { + ADD, + UPDATE +} diff --git a/apps/onboarding-cdc/src/main/openapi/onboarding_functions.json b/apps/onboarding-cdc/src/main/openapi/onboarding_functions.json index 292282ced..177da4b0b 100644 --- a/apps/onboarding-cdc/src/main/openapi/onboarding_functions.json +++ b/apps/onboarding-cdc/src/main/openapi/onboarding_functions.json @@ -85,6 +85,17 @@ ], "summary": "", "description": "", + "parameters": [ + { + "name": "queueEvent", + "in": "query", + "description": "Query Event type", + "required": false, + "schema": { + "type": "string" + } + } + ], "requestBody": { "content": { "application/json": { diff --git a/apps/onboarding-cdc/src/main/resources/application.properties b/apps/onboarding-cdc/src/main/resources/application.properties index 78b0aee9f..e359691ad 100644 --- a/apps/onboarding-cdc/src/main/resources/application.properties +++ b/apps/onboarding-cdc/src/main/resources/application.properties @@ -15,6 +15,7 @@ onboarding-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopm onboarding-cdc.retry.min-backoff=${ONBOARDING-CDC-RETRY-MIN-BACKOFF:1} onboarding-cdc.retry.max-backoff=${ONBOARDING-CDC-RETRY-MAX-BACKOFF:2} onboarding-cdc.retry=${ONBOARDING-CDC-RETRY:3} +onboarding-cdc.minutes-threshold-for-update-notification=${ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION:5} quarkus.openapi-generator.codegen.spec.onboarding_functions_json.mutiny=true quarkus.openapi-generator.codegen.spec.onboarding_functions_json.type-mappings.DateTime=java.time.LocalDateTime diff --git a/apps/onboarding-cdc/src/test/java/it/pagopa/selfcare/onboarding/event/NotificationServiceTest.java b/apps/onboarding-cdc/src/test/java/it/pagopa/selfcare/onboarding/event/NotificationServiceTest.java new file mode 100644 index 000000000..fc202c01e --- /dev/null +++ b/apps/onboarding-cdc/src/test/java/it/pagopa/selfcare/onboarding/event/NotificationServiceTest.java @@ -0,0 +1,92 @@ +package it.pagopa.selfcare.onboarding.event; + +import io.quarkus.test.InjectMock; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import it.pagopa.selfcare.onboarding.common.OnboardingStatus; +import it.pagopa.selfcare.onboarding.event.entity.Onboarding; +import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent; +import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper; +import jakarta.inject.Inject; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi; +import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse; + +import java.time.LocalDateTime; + +import static org.mockito.Mockito.*; + +@QuarkusTest +public class NotificationServiceTest { + @Mock + private OnboardingMapper onboardingMapper; + @InjectMock + @RestClient + private NotificationsApi notificationsApi; + @Inject + private NotificationService notificationService; + + @Test + @DisplayName("Should handle Invoke Notification API Success passing event ADD") + public void shouldHandleInvokeNotificationApiSuccessForQueueEventAdd() { + Onboarding onboarding = new Onboarding(); + onboarding.setStatus(OnboardingStatus.COMPLETED); + onboarding.setUpdatedAt(LocalDateTime.now()); + onboarding.setActivatedAt(LocalDateTime.now()); + + when(notificationsApi.apiNotificationPost(any(), any())) + .thenReturn(Uni.createFrom().item(new OrchestrationResponse())); + + UniAssertSubscriber subscriber = notificationService + .invokeNotificationApi(onboarding) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber.assertCompleted().awaitItem(); + + verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.ADD.name()), any()); + } + + @Test + @DisplayName("Should handle Invoke Notification API Success passing event UPDATE") + public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdate() { + Onboarding onboarding = new Onboarding(); + onboarding.setStatus(OnboardingStatus.COMPLETED); + onboarding.setUpdatedAt(LocalDateTime.now().plusMinutes(10)); // 5 minutes should be the threshold + onboarding.setActivatedAt(LocalDateTime.now()); + + when(notificationsApi.apiNotificationPost(any(), any())) + .thenReturn(Uni.createFrom().item(new OrchestrationResponse())); + + UniAssertSubscriber subscriber = notificationService + .invokeNotificationApi(onboarding) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber.assertCompleted().awaitItem(); + + verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any()); + } + + @Test + @DisplayName("Should handle Invoke Notification API Success passing event UPDATE with status DELETED") + public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdateWithStatusDeleted() { + Onboarding onboarding = new Onboarding(); + onboarding.setStatus(OnboardingStatus.DELETED); + onboarding.setUpdatedAt(LocalDateTime.now()); // 5 minutes should be the threshold + onboarding.setActivatedAt(LocalDateTime.now()); + + when(notificationsApi.apiNotificationPost(any(), any())) + .thenReturn(Uni.createFrom().item(new OrchestrationResponse())); + + UniAssertSubscriber subscriber = notificationService + .invokeNotificationApi(onboarding) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber.assertCompleted().awaitItem(); + + verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any()); + } +} \ No newline at end of file diff --git a/apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/functions/NotificationFunctions.java b/apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/functions/NotificationFunctions.java index fccc0a00d..6af9b57d8 100644 --- a/apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/functions/NotificationFunctions.java +++ b/apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/functions/NotificationFunctions.java @@ -85,13 +85,17 @@ public HttpResponseMessage resendNotification( .build(); } + final String queueEventString = request.getQueryParameters().get("queueEvent"); + final QueueEvent queueEvent = Objects.isNull(queueEventString) ? QueueEvent.UPDATE : QueueEvent.valueOf(queueEventString); + + final Optional onboarding = onboardingService.getOnboarding(onboardingId); if(onboarding.isEmpty()) { return request.createResponseBuilder(HttpStatus.NOT_FOUND) .body("Onboarding with ID: " + onboardingId + " not found") .build(); } - notificationEventService.send(context, onboarding.get(), QueueEvent.UPDATE); + notificationEventService.send(context, onboarding.get(), queueEvent); return request.createResponseBuilder(HttpStatus.OK).build(); } } diff --git a/infra/container_apps/onboarding-cdc/env/dev/terraform.tfvars b/infra/container_apps/onboarding-cdc/env/dev/terraform.tfvars index 9ef32986f..caf574cc9 100644 --- a/infra/container_apps/onboarding-cdc/env/dev/terraform.tfvars +++ b/infra/container_apps/onboarding-cdc/env/dev/terraform.tfvars @@ -46,6 +46,10 @@ app_settings = [ { name = "ONBOARDING-CDC-MONGODB-WATCH-ENABLED" value = "true" + }, + { + name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION" + value = "5" } ] diff --git a/infra/container_apps/onboarding-cdc/env/prod/terraform.tfvars b/infra/container_apps/onboarding-cdc/env/prod/terraform.tfvars index d40c9da05..ecdc14388 100644 --- a/infra/container_apps/onboarding-cdc/env/prod/terraform.tfvars +++ b/infra/container_apps/onboarding-cdc/env/prod/terraform.tfvars @@ -33,6 +33,10 @@ app_settings = [ { name = "ONBOARDING_FUNCTIONS_URL" value = "https://selc-d-onboarding-fn.azurewebsites.net" + }, + { + name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION" + value = "5" } ] diff --git a/infra/container_apps/onboarding-cdc/env/uat/terraform.tfvars b/infra/container_apps/onboarding-cdc/env/uat/terraform.tfvars index 0860903b2..f400974d8 100644 --- a/infra/container_apps/onboarding-cdc/env/uat/terraform.tfvars +++ b/infra/container_apps/onboarding-cdc/env/uat/terraform.tfvars @@ -35,6 +35,10 @@ app_settings = [ { name = "ONBOARDING_FUNCTIONS_URL" value = "https://selc-d-onboarding-fn.azurewebsites.net" + }, + { + name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION" + value = "5" } ]