diff --git a/pom.xml b/pom.xml index d2b59dde..2f422aa0 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,8 @@ 3.2.0 - 1.17.6 + 1.19.3 + 4.2.0 @@ -58,6 +59,10 @@ + + org.springframework.kafka + spring-kafka + com.fasterxml.jackson.module @@ -116,6 +121,13 @@ + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + org.testcontainers postgresql @@ -128,12 +140,29 @@ ${testcontainers.version} test + + org.testcontainers + kafka + ${testcontainers.version} + test + + + org.springframework.kafka + spring-kafka-test + test + org.wiremock wiremock-standalone ${wiremock.version} test + + org.awaitility + awaitility + ${awaitility.version} + test + diff --git a/src/main/java/org/folio/config/KafkaConsumerConfig.java b/src/main/java/org/folio/config/KafkaConsumerConfig.java new file mode 100644 index 00000000..4c1616b0 --- /dev/null +++ b/src/main/java/org/folio/config/KafkaConsumerConfig.java @@ -0,0 +1,43 @@ +package org.folio.config; + +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import lombok.extern.log4j.Log4j2; + +@Configuration +@Log4j2 +public class KafkaConsumerConfig { + + @Bean + public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) { + log.debug("consumerFactory:: {}", kafkaProperties); + + Map consumerConfig = Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(), + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(consumerConfig); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + KafkaProperties kafkaProperties) { + + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory(kafkaProperties)); + + return factory; + } + +} \ No newline at end of file diff --git a/src/main/java/org/folio/config/KafkaProperties.java b/src/main/java/org/folio/config/KafkaProperties.java new file mode 100644 index 00000000..75ddfa9f --- /dev/null +++ b/src/main/java/org/folio/config/KafkaProperties.java @@ -0,0 +1,22 @@ +package org.folio.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties("kafka") +public class KafkaProperties { + private String bootstrapServers; + private KafkaConsumerProperties consumer; + + @Data + public static class KafkaConsumerProperties { + private String groupId; + private String autoOffsetReset; + } + +} + diff --git a/src/main/java/org/folio/controller/KafkaEventListener.java b/src/main/java/org/folio/controller/KafkaEventListener.java new file mode 100644 index 00000000..bbae044d --- /dev/null +++ b/src/main/java/org/folio/controller/KafkaEventListener.java @@ -0,0 +1,28 @@ +package org.folio.controller; + +import org.folio.service.KafkaEventHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import lombok.extern.log4j.Log4j2; + +@Component +@Log4j2 +public class KafkaEventListener { + private final KafkaEventHandler eventHandler; + + public KafkaEventListener(@Autowired KafkaEventHandler eventHandler) { + this.eventHandler = eventHandler; + } + + @KafkaListener( + topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request", + groupId = "${kafka.consumer.group-id}" + ) + public void handleRequestEvent(String event) { + log.info("handleRequestEvent:: message received: {}", event); + eventHandler.handle(event); + } + +} \ No newline at end of file diff --git a/src/main/java/org/folio/service/KafkaEventHandler.java b/src/main/java/org/folio/service/KafkaEventHandler.java new file mode 100644 index 00000000..52b281cf --- /dev/null +++ b/src/main/java/org/folio/service/KafkaEventHandler.java @@ -0,0 +1,5 @@ +package org.folio.service; + +public interface KafkaEventHandler { + void handle(String event); +} diff --git a/src/main/java/org/folio/service/impl/KafkaEventHandlerImpl.java b/src/main/java/org/folio/service/impl/KafkaEventHandlerImpl.java new file mode 100644 index 00000000..7311ff6a --- /dev/null +++ b/src/main/java/org/folio/service/impl/KafkaEventHandlerImpl.java @@ -0,0 +1,16 @@ +package org.folio.service.impl; + +import org.folio.service.KafkaEventHandler; +import org.springframework.stereotype.Service; + +import lombok.extern.log4j.Log4j2; + +@Service +@Log4j2 +public class KafkaEventHandlerImpl implements KafkaEventHandler { + + @Override + public void handle(String event) { + log.info("handle:: event consumed: {}", event); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8f959e27..5fa89cd8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -32,6 +32,11 @@ spring: openfeign: okhttp: enabled: true +kafka: + bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} + consumer: + auto-offset-reset: earliest + group-id: ${ENV:folio}-mod-tlr-group folio: tenant: validation: diff --git a/src/test/java/org/folio/controller/KafkaEventListenerTest.java b/src/test/java/org/folio/controller/KafkaEventListenerTest.java new file mode 100644 index 00000000..a1df4c60 --- /dev/null +++ b/src/test/java/org/folio/controller/KafkaEventListenerTest.java @@ -0,0 +1,108 @@ +package org.folio.controller; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import lombok.SneakyThrows; + +@SpringBootTest +@Testcontainers +class KafkaEventListenerTest { + private static final String ENV = "folio"; + private static final String TENANT = "test_tenant"; + private static final String REQUEST_TOPIC_NAME = buildTopicName("circulation", "request"); + private static final String CONSUMER_GROUP_ID = "folio-mod-tlr-group"; + + private static KafkaProducer kafkaProducer; + private static AdminClient kafkaAdminClient; + + @Container + private static final KafkaContainer kafka = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:7.5.3")); + + @DynamicPropertySource + static void overrideProperties(DynamicPropertyRegistry registry) { + registry.add("kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + @BeforeAll + public static void beforeClass() { + kafkaProducer = new KafkaProducer<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class + )); + + kafkaAdminClient = KafkaAdminClient.create(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers())); + } + + @AfterAll + public static void afterClass() { + kafkaProducer.close(); + kafkaAdminClient.close(); + } + + @Test + void requestEventIsConsumed() { + publishEventAndWait(REQUEST_TOPIC_NAME, CONSUMER_GROUP_ID, "test message"); + } + + @SneakyThrows + private static int getOffset(String topic, String consumerGroup) { + return kafkaAdminClient.listConsumerGroupOffsets(consumerGroup) + .partitionsToOffsetAndMetadata() + .thenApply(partitions -> Optional.ofNullable(partitions.get(new TopicPartition(topic, 0))) + .map(OffsetAndMetadata::offset) + .map(Long::intValue) + .orElse(0)) + .get(10, TimeUnit.SECONDS); + } + + private static void publishEventAndWait(String topic, String consumerGroupId, String payload) { + final int initialOffset = getOffset(topic, consumerGroupId); + publishEvent(topic, payload); + waitForOffset(topic, consumerGroupId, initialOffset + 1); + } + + private static void publishEvent(String topic, String payload) { + kafkaProducer.send(new ProducerRecord<>(topic, payload)); + } + + private static void waitForOffset(String topic, String consumerGroupId, int expectedOffset) { + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .until(() -> getOffset(topic, consumerGroupId), offset -> offset.equals(expectedOffset)); + } + + private static String buildTopicName(String module, String objectType) { + return buildTopicName(ENV, TENANT, module, objectType); + } + + private static String buildTopicName(String env, String tenant, String module, String objectType) { + return String.format("%s.%s.%s.%s", env, tenant, module, objectType); + } + +} \ No newline at end of file