From 78502cfbd9106b1d09bbc01c6b7c565e5f7ea8df Mon Sep 17 00:00:00 2001 From: Dmytro Tkachenko Date: Wed, 18 Dec 2024 16:41:47 +0200 Subject: [PATCH] fix building consumer properties --- .../config/kafka/KafkaConfiguration.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/folio/consortia/config/kafka/KafkaConfiguration.java b/src/main/java/org/folio/consortia/config/kafka/KafkaConfiguration.java index 87802f4..53fc83a 100644 --- a/src/main/java/org/folio/consortia/config/kafka/KafkaConfiguration.java +++ b/src/main/java/org/folio/consortia/config/kafka/KafkaConfiguration.java @@ -1,5 +1,7 @@ package org.folio.consortia.config.kafka; +import java.util.HashMap; +import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -8,13 +10,13 @@ import org.folio.spring.FolioExecutionContext; import org.folio.spring.FolioModuleMetadata; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer; -import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.stereotype.Component; @@ -41,25 +43,22 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerConta } @Bean - public DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(FolioModuleMetadata folioModuleMetadata) { - return consumerFactory -> { - var props = consumerFactory.getConfigurationProperties(); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put("folioModuleMetadata", folioModuleMetadata); - consumerFactory.updateConfigs(props); - }; + public ConsumerFactory consumerFactory(FolioModuleMetadata folioModuleMetadata) { + Map props = new HashMap<>(kafkaProperties.buildConsumerProperties(null)); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("folioModuleMetadata", folioModuleMetadata); + return new DefaultKafkaConsumerFactory<>(props); } @Bean - public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(FolioExecutionContext folioExecutionContext) { - return producerFactory -> { - var props = producerFactory.getConfigurationProperties(); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put("folioExecutionContext", folioExecutionContext); - producerFactory.updateConfigs(props); - }; + public ProducerFactory producerFactory( + FolioExecutionContext folioExecutionContext) { + Map props = new HashMap<>(kafkaProperties.buildProducerProperties(null)); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put("folioExecutionContext", folioExecutionContext); + return new DefaultKafkaProducerFactory<>(props); } @Bean