Skip to content

Commit

Permalink
Kafka-Consumer: Enabled batch processing of incoming metrics. The Max…
Browse files Browse the repository at this point in the history
…PayloadSize paramter has to be added to the KAFKA_CONFIG

Closing issue Open-IoT-Service-Platform#60.

Signed-off-by: Marcel Wagner <[email protected]>
  • Loading branch information
wagmarcel committed Mar 8, 2021
1 parent dedab58 commit 0afcb9d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 46 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ apply plugin: 'checkstyle'
apply plugin: 'pmd'

checkstyle {
toolVersion = "6.1.1"
toolVersion = "6.4"
configFile = rootProject.file('checkstyle/checkstyle.xml')
sourceSets = [sourceSets.main]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.oisp.databackend.handlers.kafkaconsumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.oisp.databackend.config.oisp.OispConfig;
import com.oisp.databackend.datasources.DataDao;
import com.oisp.databackend.datastructures.Observation;
Expand All @@ -16,28 +15,26 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.HashMap;

import java.util.List;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumer {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int MAX_ATTEMPTS = 3;
private static final int MAX_FAILURES = 3;
private final String maxpolls = "1000";
@Autowired
private KafkaConsumerProperties kafkaConsumerProperties;
@Autowired
Expand Down Expand Up @@ -71,7 +68,7 @@ public void setDataDao(DataDao dataDao) {
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand All @@ -91,52 +88,61 @@ public ConsumerFactory<String, String> consumerFactory() {
props.put(
ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
oispConfig.getBackendConfig().getKafkaConfig().getMaxPayloadSize());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxpolls);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,
String>> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<String, String>();
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialRandomBackOffPolicy());
retryTemplate.setThrowLastExceptionOnExhausted(true);
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(MAX_ATTEMPTS));
factory.setRetryTemplate(retryTemplate);
return factory;
}
factory.setBatchListener(true);


@Bean
public ErrorHandler seekToCurrentErrorHandler() {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(MAX_FAILURES);
seekToCurrentErrorHandler.setCommitRecovered(true);
return seekToCurrentErrorHandler;
return factory;
}

@KafkaListener(topics = "#{kafkaConsumerProperties.getTopic()}")
public void receive(String rawObservations) throws IOException, ServiceException {
public void receive(List<String> rawObservationList) throws IOException, ServiceException {
logger.debug("Start processing kafka samples batch " + rawObservationList.size());
ObjectMapper mapper = new ObjectMapper();
Observation[] observations = null;
try {
Observation observation = mapper.readValue(rawObservations, Observation.class);
observations = new Observation[]{observation};
if ("ByteArray".equals(observation.getDataType())) {
observation.setbValue(Base64.getDecoder().decode(observation.getValue()));
observation.setValue("0");
}
} catch (IllegalArgumentException | ListenerExecutionFailedException | MismatchedInputException e) {
logger.debug("Tried to parse single observation. Now trying array: " + e);
observations = mapper.readValue(rawObservations, Observation[].class);
}
logger.info("Received Observations in topic " + kafkaConsumerProperties.getTopic()
+ ". Message: " + observations.toString());
if (!dataDao.put(observations)) {
//String rawObservations = rawObservationList.get(0);
List<Observation> observationList = new ArrayList<>();

rawObservationList.forEach(rawObservation -> {
Observation[] observations = null;
if (rawObservation.trim().startsWith("[")) {
try {
observations = mapper.readValue(rawObservation, Observation[].class);
} catch (IllegalArgumentException | ListenerExecutionFailedException
| com.fasterxml.jackson.core.JsonProcessingException e) {
logger.warn("Tried to parse array. Will ignore the sample: " + e);
}
} else {
try {
Observation observation = mapper.readValue(rawObservation, Observation.class);
observations = new Observation[]{observation};
if ("ByteArray".equals(observation.getDataType())) {
observation.setbValue(Base64.getDecoder().decode(observation.getValue()));
observation.setValue("0");
}
} catch (IllegalArgumentException | ListenerExecutionFailedException
| com.fasterxml.jackson.core.JsonProcessingException e) {
logger.warn("Tried to parse single observation. Will ignore the sample " + e);
}
}
if (observations != null) {
logger.debug("Received Observations in topic " + kafkaConsumerProperties.getTopic()
+ ". Message: " + observations.toString());
observationList.addAll(Arrays.asList(observations));
}
});
if (!dataDao.put(observationList.stream().toArray(Observation[]::new))) {
throw new ServiceException("Data store error.");
}

logger.debug("End processing kafka sample");
}
}

0 comments on commit 0afcb9d

Please sign in to comment.