Skip to content

Commit

Permalink
Merge pull request #61 from samagra-comms/development
Browse files Browse the repository at this point in the history
Development Branch Merge
  • Loading branch information
pankajjangid05 authored Jun 16, 2023
2 parents 8e3d7c8 + da8cdc2 commit cdb3ceb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.kie.api.io.Resource;
import org.kie.api.runtime.KieSession;
import org.kie.internal.io.ResourceFactory;
Expand All @@ -16,9 +17,7 @@
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -88,7 +87,7 @@ Map<String, Object> kafkaConsumerConfiguration() {
configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return configuration;
}

Expand Down Expand Up @@ -121,7 +120,7 @@ SenderOptions<Integer, String> kafkaSenderOptions() {

@Bean
Flux<ReceiverRecord<String, String>> reactiveKafkaReceiver(ReceiverOptions<String, String> kafkaReceiverOptions) {
return KafkaReceiver.create(kafkaReceiverOptions).receive();
return KafkaReceiver.create(kafkaReceiverOptions).receive().doOnNext(r -> r.receiverOffset().acknowledge());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void onMessage(@Payload String stringMessage) {
if (msg != null && msg.getProvider() != null && msg.getProvider().equalsIgnoreCase("firebase")
&& msg.getChannel().equalsIgnoreCase("web")) {
pushNotificationCount++;
log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount);
log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount + " UserId : " + msg.getTo().getUserID());
kafkaProducer.send(notificationOutbound, msg.toXML());
} else {
pushOtherCount++;
Expand Down
62 changes: 52 additions & 10 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

Expand All @@ -43,11 +44,7 @@
import javax.xml.bind.JAXBException;
import java.io.ByteArrayInputStream;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.Comparator;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -103,16 +100,34 @@ public class ReactiveConsumer {
private long consumeCount;
private long pushCount;

private Set<String> messageIdSet = new HashSet<>();
private HashSet<String> federatedUsers = new HashSet<>();

private long insertSetCount, existSetCount, existingFederatedUsers;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
public void onMessage(@Payload String stringMessage) {
try {
final long startTime = System.nanoTime();
logTimeTaken(startTime, 0, null);
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));

if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) {
consumeCount++;
log.info("Consume topic by Orchestrator count : " + consumeCount);
// This code for kafka duplication problem
if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) {
String messageId = msg.getMessageId().getChannelMessageId();
if (messageIdSet.contains(messageId)) {
existSetCount++;
log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId);
return;
} else {
insertSetCount++;
log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId);
messageIdSet.add(messageId);
}
}
log.info("ReactiveConsumer: Total MessageId Set : " + messageIdSet.size());
}

SenderReceiverInfo from = msg.getFrom();
Expand Down Expand Up @@ -142,6 +157,7 @@ public void accept(XMessage msg) {
// msg.setFrom(from);
if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
try {
log.info("ReactiveConsumer:broadcastNotificationChunkSize : " + broadcastNotificationChunkSize);
/* Switch From & To */
switchFromTo(msg);
Integer chunkSize = null;
Expand All @@ -156,15 +172,15 @@ public void accept(XMessage msg) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+totalFederatedUsers);
log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + totalFederatedUsers);
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 1;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+jsonArray.length());
log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + jsonArray.length());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
count++;
}
Expand All @@ -187,7 +203,7 @@ public void accept(XMessage msg) {
logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : "
+ pushCount + " :: orchestrator-notification-process-end-time: %d ms");
} catch (Exception ex) {
log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: "+ex.getMessage());
log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: " + ex.getMessage());
}
} else {
try {
Expand Down Expand Up @@ -223,7 +239,7 @@ public void accept(Throwable throwable) {
})
.subscribe();
} catch (Exception ex) {
log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: "+ex.getMessage());
log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: " + ex.getMessage());
}
}

Expand Down Expand Up @@ -346,6 +362,23 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str

/* Get federated users from federation services */
JSONArray users = userService.getUsersFromFederatedServers(botId, page);
for (int i = 0; i < users.length(); i++) {
JSONObject jsonObject = (JSONObject) users.get(i);
if (jsonObject != null && !jsonObject.isNull("phoneNo")) {
String phoneNo = jsonObject.getString("phoneNo");
if (federatedUsers.contains(phoneNo)) {
existingFederatedUsers++;
log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo);
} else {
log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo);
federatedUsers.add(phoneNo);
}
} else {
log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString());
}
}

log.info("ReactiveConsumer:getFederatedUsersMeta::Count: " + (users == null ? "user not found" : users.length()) + " >> Set count: " + federatedUsers.size());

/* Check if users, & related meta data exists in transformer */
if (users != null && transformer.get("meta") != null
Expand Down Expand Up @@ -378,6 +411,13 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str
/* Fetch user messages by template from template service */
ArrayList<JSONObject> usersMessage = userService.getUsersMessageByTemplate(node);

// for (int i = 0; i < usersMessage.size(); i++) {
// JSONObject jsonObject = usersMessage.get(i);
// if(jsonObject != null && !jsonObject.isNull("")){
//
// }
// }

log.info("ReactiveConsumer:getUsersMessageByTemplate::Count: " + usersMessage.size());

/* Set User messages against the user phone */
Expand Down Expand Up @@ -414,6 +454,8 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str
federatedUsersMeta.put("list", userMetaData);

return federatedUsersMeta.toString();
} else {
log.error("ReactiveConsumer:getFederatedUsersMetaElse::Users not found");
}
return "";
}
Expand Down

0 comments on commit cdb3ceb

Please sign in to comment.