From 6561b1c2c13dbaf612a9aa3984e65b2e4a06fd63 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 14 Apr 2023 17:11:34 +0530 Subject: [PATCH] [Feature] - Sending Notifications in Chunks --- .../Consumer/ReactiveConsumer.java | 101 +++++++++++++++--- src/main/resources/application.properties | 5 +- 2 files changed, 89 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index caf0408..3e55c81 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -22,10 +22,7 @@ import io.fusionauth.domain.UserRegistration; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import messagerosa.core.model.DeviceType; -import messagerosa.core.model.SenderReceiverInfo; -import messagerosa.core.model.Transformer; -import messagerosa.core.model.XMessage; +import messagerosa.core.model.*; import messagerosa.xml.XMessageParser; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONArray; @@ -43,11 +40,7 @@ import javax.xml.bind.JAXBException; import java.io.ByteArrayInputStream; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -95,7 +88,9 @@ public class ReactiveConsumer { private final String DEFAULT_APP_NAME = "Global Bot"; LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - + @Value("${broadcastNotificationChunkSize}") + private String broadcastNotificationChunkSize; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { @@ -137,10 +132,41 @@ public void accept(XMessage msg) { if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { - log.info("final msg.toXML(): "+msg.toXML().toString()); - if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + log.info("final msg.toXML(): " + msg.toXML().toString()); + + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch(NumberFormatException ex){ + chunkSize = null; + } + if(chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().get(0) != null && msg.getTransformers().size() > 0 + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 0; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; + } + } + } else { + log.error("federatedUsers not found : " + msg.toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + } else{ + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + + } else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { kafkaProducer.send(genericTransformerTopic, msg.toXML()); } else { kafkaProducer.send(odkTransformerTopic, msg.toXML()); @@ -477,7 +503,7 @@ private void logTimeTaken(long startTime, int checkpointID) { * @return */ private Mono getLastMessageID(XMessage msg) { - if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { + if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO msg1) { @@ -489,13 +515,15 @@ public String apply(XMessageDAO msg1) { } }); - } else if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { + } else if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO lastMessage) { return String.valueOf(lastMessage.getId()); } }); + } else { + log.error("UserId not found : "+msg.toString()); } return Mono.empty(); } @@ -550,4 +578,45 @@ private void switchFromTo(XMessage xMessage) { xMessage.setFrom(to); xMessage.setTo(from); } + + /** + * Convert Federated users into chunks + * @param users + * @param chunkSize + * @return + */ + private List chunkArrayList(JSONArray users, int chunkSize) { + if (users != null && users.length() > 0) { + ArrayList arrayToChunk = new ArrayList<>(); + + for (int i = 0; i < users.length(); i++) { + arrayToChunk.add(users.getJSONObject(i)); + } + ArrayList> chunkList = new ArrayList<>(); + int guide = arrayToChunk.size(); + int index = 0; + int tale = chunkSize; + while (tale < arrayToChunk.size()) { + chunkList.add(arrayToChunk.subList(index, tale)); + guide = guide - chunkSize; + index = index + chunkSize; + tale = tale + chunkSize; + } + if (guide > 0) { + chunkList.add(arrayToChunk.subList(index, index + guide)); + } + List userChunksList = new ArrayList<>(); + for (List l : chunkList) { + JSONArray jsonArray = new JSONArray(); + for (JSONObject jsonObject : l) { + jsonArray.put(jsonObject); + } + userChunksList.add(jsonArray); + } + return userChunksList; + } else{ + log.error("Federated Users null found : "+users); + } + return null; + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2aced5a..14ad224 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -100,4 +100,7 @@ spring.mail.host=${EMAIL_HOST:#{""}} spring.mail.port=${EMAIL_PORT:#{"587"}} spring.mail.username=${EMAIL_USERNAME:#{""}} spring.mail.password=${EMAIL_PASSWORD:#{""}} -spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} \ No newline at end of file +spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} + +# Send notifications in chunks +broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}}