Skip to content

Commit

Permalink
Implemented kafka transaction for multiple partitions (#70)
Browse files Browse the repository at this point in the history
* Implemented kafka transaction for multiple partitions

* Added new producer file for transformer

* [Hot Fix] - Topic Partition cont get from env file
  • Loading branch information
pankajjangid05 authored Sep 15, 2023
1 parent 4b26409 commit fd1ddc9
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 3 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.3.10</version> <!-- Use the version compatible with your Spring Boot version -->
</dependency>
<dependency>
<groupId>com.uci</groupId>
<artifactId>message-rosa</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
Expand Down Expand Up @@ -142,11 +143,13 @@ ProducerFactory<String, String> producerFactory() {
}

@Bean
@Primary
KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return (KafkaTemplate<String, String>) kafkaTemplate;
}


/**
* Create process outbound topic, if does not exists
*
Expand Down Expand Up @@ -181,4 +184,43 @@ public NewTopic createGenericTransformerTopic() {
public BotServiceParams getBotServiceParams() {
return new BotServiceParams();
}


/**
* This is for com.odk.transformer topic
*/

@Bean
Map<String, Object> kafkaTransactionalProducerConfiguration() {
Map<String, Object> configuration = new HashMap<>();
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configuration.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer-txn");
configuration.put(ProducerConfig.ACKS_CONFIG, "all");
configuration.put(org.springframework.kafka.support.serializer.JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
configuration.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configuration.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transformer-id");
return configuration;
}


@Bean
ProducerFactory<String, String> transactionalProducerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(kafkaTransactionalProducerConfiguration());
producerFactory.transactionCapable();
producerFactory.setTransactionIdPrefix("transformer-");
return producerFactory;
}

@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> transactionalProducerFactory) {
return new KafkaTransactionManager<>(transactionalProducerFactory);
}

@Bean(name = "transactionalKafkaTemplate")
KafkaTemplate<String, String> transactionalKafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(transactionalProducerFactory());
return kafkaTemplate;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.uci.orchestrator.Application;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.cassandra.CassandraDataAutoConfiguration;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableKafka
@EnableAsync
Expand All @@ -17,10 +23,25 @@
@EntityScan(basePackages = {"com.uci.dao.models", "com.uci.orchestrator"})
@PropertySource("application-messagerosa.properties")
@PropertySource("application.properties")
@SpringBootApplication(exclude = { CassandraDataAutoConfiguration.class })
@SpringBootApplication(exclude = {CassandraDataAutoConfiguration.class})
@EnableTransactionManagement
@Slf4j
public class OrchestratorApplication {

@Value("${transformer.topic.partition.count}")
private Integer partitionCount;

public static void main(String[] args) {
SpringApplication.run(OrchestratorApplication.class, args);
}

@Bean
public NewTopic transactionsTopic() {
log.info("Transformer Topic Partitions Count : " + partitionCount);
return TopicBuilder.name("com.odk.transformer")
.partitions(partitionCount)
.replicas(1)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.uci.orchestrator.Service.CommonService;
import com.uci.orchestrator.Service.TransformerProducer;
import com.uci.utils.BotService;
import com.uci.utils.bot.util.BotUtil;
import com.uci.utils.cache.service.RedisCacheService;
Expand All @@ -19,6 +20,7 @@
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverRecord;

Expand All @@ -38,6 +40,8 @@ public class ReactiveConsumer {

@Autowired
public SimpleProducer kafkaProducer;
@Autowired
public TransformerProducer transactionalKafkaProducer;

@Value("${odk-transformer}")
public String odkTransformerTopic;
Expand Down Expand Up @@ -69,6 +73,7 @@ public class ReactiveConsumer {
private CommonService commonService;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
@Transactional
public void onMessage(@Payload String stringMessage) {
try {
final long startTime = System.nanoTime();
Expand Down Expand Up @@ -221,7 +226,8 @@ public void accept(XMessage msg) {
if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
kafkaProducer.send(genericTransformerTopic, msg.toXML());
} else {
kafkaProducer.send(odkTransformerTopic, msg.toXML());
transactionalKafkaProducer.sendTransaction(odkTransformerTopic, msg.toXML());
// kafkaProducer.send(odkTransformerTopic, msg.toXML());
}
// reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML());
} catch (JAXBException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.uci.orchestrator.Service;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.validation.constraints.NotNull;

@Service
@Slf4j
public class TransformerProducer {
@Qualifier("transactionalKafkaTemplate")
private final KafkaTemplate<String, String> transactionalKafkaTemplate;

public TransformerProducer(KafkaTemplate<String, String> transactionalKafkaTemplate) {
this.transactionalKafkaTemplate = transactionalKafkaTemplate;
}

@Transactional
public void sendTransaction(String topic, String message) {
transactionalKafkaTemplate
.send(topic, message)
.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("Kafka::Send:Exception: Unable to push topic {} message {} due to {}", topic, message, throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("Kafka::Send: Pushed to topic {}", topic);
}
});
}
}
3 changes: 3 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,6 @@ notification-kafka-cache=${NOTIFICATION_KAFKA_CACHE:#{""}}
webclient.interval=${WEBCLIENT_INTERVAL:#{5000}}
webclient.retryMaxAttempts=${WEBCLIENT_RETRY_MAX_ATTEMPTS:#{3}}
webclient.retryMinBackoff=${WEBCLIENT_RETRY_MIN_BACK_OFF:#{5}}

# Transformer Topic Partitions
transformer.topic.partition.count=${TRANSFORMER_TOPIC_PARTITIONS:#{1}}

0 comments on commit fd1ddc9

Please sign in to comment.