Skip to content

Commit

Permalink
Api update
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitolo-Andrea committed Nov 26, 2024
1 parent e106913 commit b94be37
Show file tree
Hide file tree
Showing 23 changed files with 943 additions and 124 deletions.
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<!--3rd party library-->
<dependency>
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/it/gov/pagopa/common/configuration/MongoConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.mongodb.lang.NonNull;
import it.gov.pagopa.common.utils.CommonConstants;
import lombok.Getter;
import lombok.Setter;
import org.bson.types.Decimal128;
import org.springframework.boot.autoconfigure.mongo.MongoClientSettingsBuilderCustomizer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.converter.Converter;
Expand All @@ -20,16 +22,19 @@
import java.util.concurrent.TimeUnit;

@Configuration
@EnableConfigurationProperties(MongoConfig.MongoDbCustomProperties.class)
public class MongoConfig {

@Configuration
@ConfigurationProperties(prefix = "spring.data.mongodb.config")
@Getter
@Setter
public static class MongoDbCustomProperties {
@Setter

ConnectionPoolSettings connectionPool;

@Getter
@Setter
static class ConnectionPoolSettings {
public static class ConnectionPoolSettings {
int maxSize;
int minSize;
long maxWaitTimeMS;
Expand Down Expand Up @@ -67,7 +72,7 @@ public MongoCustomConversions mongoCustomConversions() {
}

@WritingConverter
private static class BigDecimalDecimal128Converter implements Converter<BigDecimal, Decimal128> {
public static class BigDecimalDecimal128Converter implements Converter<BigDecimal, Decimal128> {

@Override
public Decimal128 convert(@NonNull BigDecimal source) {
Expand All @@ -76,7 +81,7 @@ public Decimal128 convert(@NonNull BigDecimal source) {
}

@ReadingConverter
private static class Decimal128BigDecimalConverter implements Converter<Decimal128, BigDecimal> {
public static class Decimal128BigDecimalConverter implements Converter<Decimal128, BigDecimal> {

@Override
public BigDecimal convert(@NonNull Decimal128 source) {
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/it/gov/pagopa/common/kafka/utils/KafkaConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package it.gov.pagopa.common.kafka.utils;

public class KafkaConstants {
private KafkaConstants(){}

//region error-topic headers
public static final String ERROR_MSG_HEADER_APPLICATION_NAME = "applicationName";
public static final String ERROR_MSG_HEADER_GROUP = "group";
public static final String ERROR_MSG_HEADER_SRC_TYPE = "srcType";
public static final String ERROR_MSG_HEADER_SRC_SERVER = "srcServer";
public static final String ERROR_MSG_HEADER_SRC_TOPIC = "srcTopic";
public static final String ERROR_MSG_HEADER_DESCRIPTION = "description";
public static final String ERROR_MSG_HEADER_RETRY = "retry";
public static final String ERROR_MSG_HEADER_RETRYABLE = "retryable";
public static final String ERROR_MSG_HEADER_STACKTRACE = "stacktrace";
//endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package it.gov.pagopa.common.reactive.kafka.consumer;

import com.fasterxml.jackson.databind.ObjectReader;
import it.gov.pagopa.common.kafka.utils.KafkaConstants;
import it.gov.pagopa.common.reactive.kafka.exception.UncommittableError;
import it.gov.pagopa.common.reactive.utils.PerformanceLogger;
import it.gov.pagopa.common.utils.CommonUtilities;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
* Base class to extend in order to configure a timed commit behavior when using KafkaBinder.
* Other than extend this class, you should:
* <ol>
* <li>Turn off the autoCommit (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.autoCommitOffset=false)</li>
* <li>Set the ackMode to MANUAL_IMMEDIATE (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.ackMode=MANUAL_IMMEDIATE)</li>
* </ol>
* @param <T> The type of the message to read and deserialize
* @param <R> The type of the message resulted
*/
@Slf4j
public abstract class BaseKafkaConsumer<T, R> {

/** Key used inside the {@link Context} to store the startTime */
protected static final String CONTEXT_KEY_START_TIME = "START_TIME";
/** Key used inside the {@link Context} to store a msg identifier used for logging purpose */
protected static final String CONTEXT_KEY_MSG_ID = "MSG_ID";

private final String applicationName;

private static final Collector<KafkaAcknowledgeResult<?>, ?, Map<Integer, Pair<Long, KafkaAcknowledgeResult<?>>>> kafkaAcknowledgeResultMapCollector =
Collectors.groupingBy(KafkaAcknowledgeResult::partition
, Collectors.teeing(
Collectors.minBy(Comparator.comparing(KafkaAcknowledgeResult::offset)),
Collectors.maxBy(Comparator.comparing(KafkaAcknowledgeResult::offset)),
(min, max) -> Pair.of(
min.map(KafkaAcknowledgeResult::offset).orElse(null),
max.orElse(null))
));

protected BaseKafkaConsumer(String applicationName) {
this.applicationName = applicationName;
}

record KafkaAcknowledgeResult<T> (Acknowledgment ack, Integer partition, Long offset, T result){
public KafkaAcknowledgeResult(Message<?> message, T result) {
this(
(Acknowledgment) CommonUtilities.getHeaderValue(message, KafkaHeaders.ACKNOWLEDGMENT),
getMessagePartitionId(message),
getMessageOffset(message),
result
);
}
}

private static Integer getMessagePartitionId(Message<?> message) {
return (Integer) CommonUtilities.getHeaderValue(message, KafkaHeaders.RECEIVED_PARTITION);
}

private static Long getMessageOffset(Message<?> message) {
return (Long) CommonUtilities.getHeaderValue(message, KafkaHeaders.OFFSET);
}

/** It will ask the superclass to handle the messages, then sequentially it will acknowledge them */
public final void execute(Flux<Message<String>> messagesFlux) {
Flux<List<R>> processUntilCommits =
messagesFlux
.flatMapSequential(this::executeAcknowledgeAware)

.buffer(getCommitDelay())
.map(p -> {
Map<Integer, Pair<Long, KafkaAcknowledgeResult<?>>> partition2Offsets = p.stream()
.collect(kafkaAcknowledgeResultMapCollector);

log.info("[KAFKA_COMMIT][{}] Committing {} messages: {}", getFlowName(), p.size(),
partition2Offsets.entrySet().stream()
.map(e->"partition %d: %d - %d".formatted(e.getKey(),e.getValue().getKey(), e.getValue().getValue().offset()))
.collect(Collectors.joining(";")));

partition2Offsets.forEach((partition, offsets) -> Optional.ofNullable(offsets.getValue().ack()).ifPresent(Acknowledgment::acknowledge));

return p.stream()
.map(KafkaAcknowledgeResult::result)
.filter(Objects::nonNull)
.toList();
}
);

subscribeAfterCommits(processUntilCommits);
}

/** The {@link Duration} to wait before to commit processed messages */
protected abstract Duration getCommitDelay();

/** {@link Flux} to which subscribe in order to start its execution and eventually perform some logic on results */
protected abstract void subscribeAfterCommits(Flux<List<R>> afterCommits2subscribe);

private Mono<KafkaAcknowledgeResult<R>> executeAcknowledgeAware(Message<String> message) {
KafkaAcknowledgeResult<R> defaultAck = new KafkaAcknowledgeResult<>(message, null);

byte[] retryingApplicationName = message.getHeaders().get(KafkaConstants.ERROR_MSG_HEADER_APPLICATION_NAME, byte[].class);
if(retryingApplicationName != null && !new String(retryingApplicationName, StandardCharsets.UTF_8).equals(this.applicationName)){
log.info("[{}] Discarding message due to other application retry ({}): {}", getFlowName(), retryingApplicationName, CommonUtilities.readMessagePayload(message));
return Mono.just(defaultAck);
}

Map<String, Object> ctx=new HashMap<>();
ctx.put(CONTEXT_KEY_START_TIME, System.currentTimeMillis());
ctx.put(CONTEXT_KEY_MSG_ID, CommonUtilities.readMessagePayload(message));

return execute(message, ctx)
.map(r -> new KafkaAcknowledgeResult<>(message, r))
.defaultIfEmpty(defaultAck)

.onErrorResume(e -> {
if(e instanceof UncommittableError) {
return Mono.error(e);
} else {
return Mono.just(defaultAck);
}
})
.doOnNext(r -> doFinally(message, ctx))

.onErrorResume(e -> {
log.info("Retrying after reactive pipeline error: ", e);
return executeAcknowledgeAware(message);
});
}

/** to perform some operation at the end of business logic execution, thus before to wait for commit. As default, it will perform an INFO logging with performance time */
protected void doFinally(Message<String> message, Map<String, Object> ctx) {
Long startTime = (Long)ctx.get(CONTEXT_KEY_START_TIME);
String msgId = (String)ctx.get(CONTEXT_KEY_MSG_ID);
if(startTime != null){
PerformanceLogger.logTiming(getFlowName(), startTime,
"(partition: %s, offset: %s) %s".formatted(getMessagePartitionId(message), getMessageOffset(message), msgId));
}
}

/** Name used for logging purpose */
public String getFlowName() {
return getClass().getSimpleName();
}

/** It will deserialize the message and then call the {@link #execute(Object, Message, Map)} method */
protected Mono<R> execute(Message<String> message, Map<String, Object> ctx){
return Mono.just(message)
.mapNotNull(this::deserializeMessage)
.flatMap(payload->execute(payload, message, ctx));
}

/** The {@link ObjectReader} to use in order to deserialize the input message */
protected abstract ObjectReader getObjectReader();
/** The action to take if the deserialization will throw an error */
protected abstract Consumer<Throwable> onDeserializationError(Message<String> message);

/** The function invoked in order to process the current message */
protected abstract Mono<R> execute(T payload, Message<String> message, Map<String, Object> ctx);

/** It will read and deserialize {@link Message#getPayload()} using the given {@link #getObjectReader()} */
protected T deserializeMessage(Message<String> message) {
return CommonUtilities.deserializeMessage(message, getObjectReader(),null/*onDeserializationError(message)*/);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package it.gov.pagopa.common.reactive.kafka.exception;

public class UncommittableError extends RuntimeException {
public UncommittableError(String message){
super(message);
}

public UncommittableError(String message, Exception e){
super(message, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package it.gov.pagopa.common.reactive.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Function;

@Slf4j
public class PerformanceLogger {

private PerformanceLogger(){}

//region Mono ops
public static <T> Mono<T> logTimingOnNext(String flowName, Mono<T> publisher, Function<T, String> data2LogPayload){
return logTimingOnNext(flowName, System.currentTimeMillis(), publisher, data2LogPayload);
}
public static <T> Mono<T> logTimingOnNext(String flowName, long startTime, Mono<T> publisher, Function<T, String> data2LogPayload){
return publisher
.doOnNext(x -> logTiming(flowName, startTime, data2LogPayload!=null? data2LogPayload.apply(x) : ""));
}

public static <T> Mono<T> logTimingFinally(String flowName, Mono<T> publisher, String logPayload){
return logTimingFinally(flowName, System.currentTimeMillis(), publisher, logPayload);
}
public static <T> Mono<T> logTimingFinally(String flowName, long startTime, Mono<T> publisher, String logPayload){
return publisher
.doFinally(x -> logTiming(flowName, startTime, ObjectUtils.firstNonNull(logPayload, "")));
}
//endregion

//region Flux ops
public static <T> Flux<T> logTimingOnNext(String flowName, Flux<T> publisher, Function<T, String> data2LogPayload){
return logTimingOnNext(flowName, System.currentTimeMillis(), publisher, data2LogPayload);
}
public static <T> Flux<T> logTimingOnNext(String flowName, long startTime, Flux<T> publisher, Function<T, String> data2LogPayload){
return publisher
.doOnNext(x -> logTiming(flowName, startTime, data2LogPayload!=null? data2LogPayload.apply(x) : ""));
}

public static <T> Flux<T> logTimingFinally(String flowName, Flux<T> publisher, String logPayload){
return logTimingFinally(flowName, System.currentTimeMillis(), publisher, logPayload);
}
public static <T> Flux<T> logTimingFinally(String flowName, long startTime, Flux<T> publisher, String logPayload){
return publisher
.doFinally(x -> logTiming(flowName, startTime, ObjectUtils.firstNonNull(logPayload, "")));
}
//endregion

public static void logTiming(String flowName, long startTime, String logPayload){
log.info("[PERFORMANCE_LOG] [{}] Time occurred to perform business logic: {} ms {}", flowName, System.currentTimeMillis() - startTime, logPayload);
}
}
Loading

0 comments on commit b94be37

Please sign in to comment.