Skip to content

Commit

Permalink
[SELC-5034] feature: added logic to discriminate event type in cdc mo…
Browse files Browse the repository at this point in the history
…dule (#288)

Co-authored-by: andrea-putzu <[email protected]>
  • Loading branch information
empassaro and andrea-putzu authored Jun 5, 2024
1 parent bb7d9c1 commit 41ff0d7
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package it.pagopa.selfcare.onboarding.event;

import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;
import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;

@ApplicationScoped
public class NotificationService {
@Inject
@RestClient
NotificationsApi notificationsApi;
private final OnboardingMapper onboardingMapper;
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final Integer minutesThresholdForUpdateNotification;

public NotificationService(OnboardingMapper onboardingMapper,
@ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff,
@ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry,
@ConfigProperty(name = "onboarding-cdc.minutes-threshold-for-update-notification") Integer minutesThresholdForUpdateNotification) {
this.onboardingMapper = onboardingMapper;
this.retryMinBackOff = retryMinBackOff;
this.retryMaxBackOff = retryMaxBackOff;
this.maxRetry = maxRetry;
this.minutesThresholdForUpdateNotification = minutesThresholdForUpdateNotification;
}

public Uni<OrchestrationResponse> invokeNotificationApi(Onboarding onboarding) {
assert onboarding != null;
QueueEvent queueEvent = determineEventType(onboarding);
return notificationsApi.apiNotificationPost(queueEvent.name(), onboardingMapper.toEntity(onboarding))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry);
}

private QueueEvent determineEventType(Onboarding onboarding) {
return switch (onboarding.getStatus()) {
case COMPLETED -> (isOverUpdateThreshold(onboarding.getUpdatedAt(), onboarding.getActivatedAt())) ? QueueEvent.UPDATE : QueueEvent.ADD;
case DELETED -> QueueEvent.UPDATE;
default -> throw new IllegalArgumentException("Onboarding status not supported");
};
}

private boolean isOverUpdateThreshold(LocalDateTime updatedAt, LocalDateTime activatedAt) {
return Objects.nonNull(updatedAt)
&& Objects.nonNull(activatedAt)
&& updatedAt.isAfter(activatedAt.plusMinutes(minutesThresholdForUpdateNotification));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@
import io.smallrye.mutiny.Multi;
import it.pagopa.selfcare.onboarding.event.constant.CdcStartAtConstant;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;

import java.time.Duration;
import java.util.*;

import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.COMPLETED;
import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.DELETED;
import static java.util.Arrays.asList;

@Startup
@Slf4j
@ApplicationScoped
public class OnboardingCdcService {

@Inject
@RestClient
NotificationsApi notificationsApi;
private final OnboardingMapper onboardingMapper;
private static final String COLLECTION_NAME = "onboardings";
private static final String OPERATION_NAME = "ONBOARDING-CDC-OnboardingsUpdate";
private static final String EVENT_NAME = "ONBOARDING-CDC";
Expand All @@ -52,25 +44,18 @@ public class OnboardingCdcService {
private final TableClient tableClient;
private final String mongodbDatabase;
private final ReactiveMongoClient mongoClient;
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final NotificationService notificationService;

public OnboardingCdcService(OnboardingMapper onboardingMapper, ReactiveMongoClient mongoClient,
public OnboardingCdcService(ReactiveMongoClient mongoClient,
@ConfigProperty(name = "quarkus.mongodb.database") String mongodbDatabase,
@ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff,
@ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry,
TelemetryClient telemetryClient,
TableClient tableClient) {
this.onboardingMapper = onboardingMapper;
TableClient tableClient,
NotificationService notificationService) {
this.mongoClient = mongoClient;
this.mongodbDatabase = mongodbDatabase;
this.maxRetry = maxRetry;
this.retryMaxBackOff = retryMaxBackOff;
this.retryMinBackOff = retryMinBackOff;
this.telemetryClient = telemetryClient;
this.tableClient = tableClient;
this.notificationService = notificationService;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream();
}
Expand All @@ -87,7 +72,7 @@ private void initOrderStream() {
if (Objects.nonNull(cdcStartAtEntity))
resumeToken = (String) cdcStartAtEntity.getProperty(CdcStartAtConstant.CDC_START_AT_PROPERTY);
} catch (TableServiceException e) {
log.warn("Table StarAt not found, it is starting from now ...");
log.warn("Table StartAt not found, it is starting from now ...");
}
}

Expand All @@ -100,7 +85,7 @@ private void initOrderStream() {

Bson match = Aggregates.match(Filters.and(
Filters.in("operationType", asList("update", "replace", "insert")),
Filters.in("fullDocument.status", Arrays.asList("COMPLETED", "DELETED"))));
Filters.in("fullDocument.status", Arrays.asList(COMPLETED.name(), DELETED.name()))));
Bson project = Aggregates.project(fields(include("_id", "ns", "documentKey", "fullDocument")));
List<Bson> pipeline = Arrays.asList(match, project);

Expand All @@ -123,16 +108,13 @@ private ReactiveMongoCollection<Onboarding> getCollection() {
}

protected void consumerOnboardingEvent(ChangeStreamDocument<Onboarding> document) {


assert document.getFullDocument() != null;
assert document.getDocumentKey() != null;

log.info("Starting consumerOnboardingEvent ... ");
log.info("Sending Onboarding notification having id {}", document.getFullDocument().getId());

notificationsApi.apiNotificationPost(onboardingMapper.toEntity(document.getFullDocument()))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry)
notificationService.invokeNotificationApi(document.getFullDocument())
.subscribe().with(
result -> {
log.info("Onboarding notification having id: {} successfully sent", document.getDocumentKey().toJson());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package it.pagopa.selfcare.onboarding.event.entity.util;

public enum QueueEvent {
ADD,
UPDATE
}
11 changes: 11 additions & 0 deletions apps/onboarding-cdc/src/main/openapi/onboarding_functions.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@
],
"summary": "",
"description": "",
"parameters": [
{
"name": "queueEvent",
"in": "query",
"description": "Query Event type",
"required": false,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ onboarding-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopm
onboarding-cdc.retry.min-backoff=${ONBOARDING-CDC-RETRY-MIN-BACKOFF:1}
onboarding-cdc.retry.max-backoff=${ONBOARDING-CDC-RETRY-MAX-BACKOFF:2}
onboarding-cdc.retry=${ONBOARDING-CDC-RETRY:3}
onboarding-cdc.minutes-threshold-for-update-notification=${ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION:5}

quarkus.openapi-generator.codegen.spec.onboarding_functions_json.mutiny=true
quarkus.openapi-generator.codegen.spec.onboarding_functions_json.type-mappings.DateTime=java.time.LocalDateTime
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package it.pagopa.selfcare.onboarding.event;

import io.quarkus.test.InjectMock;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.inject.Inject;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;
import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse;

import java.time.LocalDateTime;

import static org.mockito.Mockito.*;

@QuarkusTest
public class NotificationServiceTest {
@Mock
private OnboardingMapper onboardingMapper;
@InjectMock
@RestClient
private NotificationsApi notificationsApi;
@Inject
private NotificationService notificationService;

@Test
@DisplayName("Should handle Invoke Notification API Success passing event ADD")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventAdd() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.COMPLETED);
onboarding.setUpdatedAt(LocalDateTime.now());
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.ADD.name()), any());
}

@Test
@DisplayName("Should handle Invoke Notification API Success passing event UPDATE")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdate() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.COMPLETED);
onboarding.setUpdatedAt(LocalDateTime.now().plusMinutes(10)); // 5 minutes should be the threshold
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any());
}

@Test
@DisplayName("Should handle Invoke Notification API Success passing event UPDATE with status DELETED")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdateWithStatusDeleted() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.DELETED);
onboarding.setUpdatedAt(LocalDateTime.now()); // 5 minutes should be the threshold
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ public HttpResponseMessage resendNotification(
.build();
}

final String queueEventString = request.getQueryParameters().get("queueEvent");
final QueueEvent queueEvent = Objects.isNull(queueEventString) ? QueueEvent.UPDATE : QueueEvent.valueOf(queueEventString);


final Optional<Onboarding> onboarding = onboardingService.getOnboarding(onboardingId);
if(onboarding.isEmpty()) {
return request.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("Onboarding with ID: " + onboardingId + " not found")
.build();
}
notificationEventService.send(context, onboarding.get(), QueueEvent.UPDATE);
notificationEventService.send(context, onboarding.get(), queueEvent);
return request.createResponseBuilder(HttpStatus.OK).build();
}
}
4 changes: 4 additions & 0 deletions infra/container_apps/onboarding-cdc/env/dev/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ app_settings = [
{
name = "ONBOARDING-CDC-MONGODB-WATCH-ENABLED"
value = "true"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down
4 changes: 4 additions & 0 deletions infra/container_apps/onboarding-cdc/env/prod/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ app_settings = [
{
name = "ONBOARDING_FUNCTIONS_URL"
value = "https://selc-d-onboarding-fn.azurewebsites.net"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down
4 changes: 4 additions & 0 deletions infra/container_apps/onboarding-cdc/env/uat/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ app_settings = [
{
name = "ONBOARDING_FUNCTIONS_URL"
value = "https://selc-d-onboarding-fn.azurewebsites.net"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down

0 comments on commit 41ff0d7

Please sign in to comment.