Skip to content

Commit

Permalink
Merge branch 'master' into MODTLR-5
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-barannyk authored Jan 17, 2024
2 parents c67b1aa + 0951dd3 commit f21e78b
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 1 deletion.
31 changes: 30 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@

<!-- test dependencies -->
<wiremock.version>3.2.0</wiremock.version>
<testcontainers.version>1.17.6</testcontainers.version>
<testcontainers.version>1.19.3</testcontainers.version>
<awaitility.version>4.2.0</awaitility.version>
</properties>

<dependencies>
Expand All @@ -58,6 +59,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down Expand Up @@ -116,6 +121,13 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
Expand All @@ -128,12 +140,29 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/folio/config/KafkaConsumerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.folio.config;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import lombok.extern.log4j.Log4j2;

@Configuration
@Log4j2
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
log.debug("consumerFactory:: {}", kafkaProperties);

Map<String, Object> consumerConfig = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(consumerConfig);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaProperties kafkaProperties) {

var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));

return factory;
}

}
22 changes: 22 additions & 0 deletions src/main/java/org/folio/config/KafkaProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.folio.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import lombok.Data;

@Data
@Configuration
@ConfigurationProperties("kafka")
public class KafkaProperties {
private String bootstrapServers;
private KafkaConsumerProperties consumer;

@Data
public static class KafkaConsumerProperties {
private String groupId;
private String autoOffsetReset;
}

}

28 changes: 28 additions & 0 deletions src/main/java/org/folio/controller/KafkaEventListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.folio.controller;

import org.folio.service.KafkaEventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import lombok.extern.log4j.Log4j2;

@Component
@Log4j2
public class KafkaEventListener {
private final KafkaEventHandler eventHandler;

public KafkaEventListener(@Autowired KafkaEventHandler eventHandler) {
this.eventHandler = eventHandler;
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request",
groupId = "${kafka.consumer.group-id}"
)
public void handleRequestEvent(String event) {
log.info("handleRequestEvent:: message received: {}", event);
eventHandler.handle(event);
}

}
5 changes: 5 additions & 0 deletions src/main/java/org/folio/service/KafkaEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.folio.service;

public interface KafkaEventHandler {
void handle(String event);
}
16 changes: 16 additions & 0 deletions src/main/java/org/folio/service/impl/KafkaEventHandlerImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.folio.service.impl;

import org.folio.service.KafkaEventHandler;
import org.springframework.stereotype.Service;

import lombok.extern.log4j.Log4j2;

@Service
@Log4j2
public class KafkaEventHandlerImpl implements KafkaEventHandler {

@Override
public void handle(String event) {
log.info("handle:: event consumed: {}", event);
}
}
5 changes: 5 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ spring:
openfeign:
okhttp:
enabled: true
kafka:
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
consumer:
auto-offset-reset: earliest
group-id: ${ENV:folio}-mod-tlr-group
folio:
tenant:
validation:
Expand Down
108 changes: 108 additions & 0 deletions src/test/java/org/folio/controller/KafkaEventListenerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.folio.controller;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import lombok.SneakyThrows;

@SpringBootTest
@Testcontainers
class KafkaEventListenerTest {
private static final String ENV = "folio";
private static final String TENANT = "test_tenant";
private static final String REQUEST_TOPIC_NAME = buildTopicName("circulation", "request");
private static final String CONSUMER_GROUP_ID = "folio-mod-tlr-group";

private static KafkaProducer<String, String> kafkaProducer;
private static AdminClient kafkaAdminClient;

@Container
private static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.3"));

@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("kafka.bootstrap-servers", kafka::getBootstrapServers);
}

@BeforeAll
public static void beforeClass() {
kafkaProducer = new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));

kafkaAdminClient = KafkaAdminClient.create(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()));
}

@AfterAll
public static void afterClass() {
kafkaProducer.close();
kafkaAdminClient.close();
}

@Test
void requestEventIsConsumed() {
publishEventAndWait(REQUEST_TOPIC_NAME, CONSUMER_GROUP_ID, "test message");
}

@SneakyThrows
private static int getOffset(String topic, String consumerGroup) {
return kafkaAdminClient.listConsumerGroupOffsets(consumerGroup)
.partitionsToOffsetAndMetadata()
.thenApply(partitions -> Optional.ofNullable(partitions.get(new TopicPartition(topic, 0)))
.map(OffsetAndMetadata::offset)
.map(Long::intValue)
.orElse(0))
.get(10, TimeUnit.SECONDS);
}

private static void publishEventAndWait(String topic, String consumerGroupId, String payload) {
final int initialOffset = getOffset(topic, consumerGroupId);
publishEvent(topic, payload);
waitForOffset(topic, consumerGroupId, initialOffset + 1);
}

private static void publishEvent(String topic, String payload) {
kafkaProducer.send(new ProducerRecord<>(topic, payload));
}

private static void waitForOffset(String topic, String consumerGroupId, int expectedOffset) {
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.until(() -> getOffset(topic, consumerGroupId), offset -> offset.equals(expectedOffset));
}

private static String buildTopicName(String module, String objectType) {
return buildTopicName(ENV, TENANT, module, objectType);
}

private static String buildTopicName(String env, String tenant, String module, String objectType) {
return String.format("%s.%s.%s.%s", env, tenant, module, objectType);
}

}

0 comments on commit f21e78b

Please sign in to comment.