Skip to content

Commit

Permalink
use factory customizer
Browse files Browse the repository at this point in the history
  • Loading branch information
dmtkachenko committed Dec 18, 2024
1 parent c2b4274 commit 0d5c326
Showing 1 changed file with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package org.folio.consortia.config.kafka;

import java.util.HashMap;
import java.util.Map;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.RequiredArgsConstructor;

@Component
@Configuration
@RequiredArgsConstructor
Expand All @@ -47,22 +41,25 @@ public <V> ConcurrentKafkaListenerContainerFactory<String, V> kafkaListenerConta
}

@Bean
public <V> ConsumerFactory<String, V> consumerFactory(ObjectMapper objectMapper, FolioModuleMetadata folioModuleMetadata) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
public DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(FolioModuleMetadata folioModuleMetadata) {
return consumerFactory -> {
var props = consumerFactory.getConfigurationProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("folioModuleMetadata", folioModuleMetadata);
return new DefaultKafkaConsumerFactory<>(props);
consumerFactory.updateConfigs(props);
};
}

@Bean
public <V> ProducerFactory<String, V> producerFactory(
FolioExecutionContext folioExecutionContext) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("folioExecutionContext", folioExecutionContext);
return new DefaultKafkaProducerFactory<>(props);
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(FolioExecutionContext folioExecutionContext) {
return producerFactory -> {
var props = producerFactory.getConfigurationProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("folioExecutionContext", folioExecutionContext);
producerFactory.updateConfigs(props);
};
}

@Bean
Expand Down

0 comments on commit 0d5c326

Please sign in to comment.