Skip to content

Commit

Permalink
fix building consumer properties
Browse files Browse the repository at this point in the history
  • Loading branch information
dmtkachenko committed Dec 18, 2024
1 parent 0d5c326 commit 78502cf
Showing 1 changed file with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.folio.consortia.config.kafka;

import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -8,13 +10,13 @@
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -41,25 +43,22 @@ public <V> ConcurrentKafkaListenerContainerFactory<String, V> kafkaListenerConta
}

@Bean
public DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(FolioModuleMetadata folioModuleMetadata) {
return consumerFactory -> {
var props = consumerFactory.getConfigurationProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("folioModuleMetadata", folioModuleMetadata);
consumerFactory.updateConfigs(props);
};
public <V> ConsumerFactory<String, V> consumerFactory(FolioModuleMetadata folioModuleMetadata) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties(null));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("folioModuleMetadata", folioModuleMetadata);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(FolioExecutionContext folioExecutionContext) {
return producerFactory -> {
var props = producerFactory.getConfigurationProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("folioExecutionContext", folioExecutionContext);
producerFactory.updateConfigs(props);
};
public <V> ProducerFactory<String, V> producerFactory(
FolioExecutionContext folioExecutionContext) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties(null));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("folioExecutionContext", folioExecutionContext);
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
Expand Down

0 comments on commit 78502cf

Please sign in to comment.