From 04b85a3906e17b9459626b417966ee3c170447ea Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Tue, 4 Apr 2023 14:47:51 +0530 Subject: [PATCH 1/5] Email config added in properties file for utils --- src/main/resources/application.properties | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1014e9c..2aced5a 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -93,4 +93,11 @@ spring.azure.blob.store.account.key=${AZURE_BLOB_STORE_ACCOUNT_KEY:#{"key"}} spring.azure.blob.store.container.name=${AZURE_BLOB_STORE_CONTAINER:#{"container"}} # Template Service Base Url -template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}} \ No newline at end of file +template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}} + +# Email Config +spring.mail.host=${EMAIL_HOST:#{""}} +spring.mail.port=${EMAIL_PORT:#{"587"}} +spring.mail.username=${EMAIL_USERNAME:#{""}} +spring.mail.password=${EMAIL_PASSWORD:#{""}} +spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} \ No newline at end of file From 6561b1c2c13dbaf612a9aa3984e65b2e4a06fd63 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 14 Apr 2023 17:11:34 +0530 Subject: [PATCH 2/5] [Feature] - Sending Notifications in Chunks --- .../Consumer/ReactiveConsumer.java | 101 +++++++++++++++--- src/main/resources/application.properties | 5 +- 2 files changed, 89 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index caf0408..3e55c81 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -22,10 +22,7 @@ import io.fusionauth.domain.UserRegistration; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import messagerosa.core.model.DeviceType; -import messagerosa.core.model.SenderReceiverInfo; -import messagerosa.core.model.Transformer; -import messagerosa.core.model.XMessage; +import messagerosa.core.model.*; import messagerosa.xml.XMessageParser; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONArray; @@ -43,11 +40,7 @@ import javax.xml.bind.JAXBException; import java.io.ByteArrayInputStream; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -95,7 +88,9 @@ public class ReactiveConsumer { private final String DEFAULT_APP_NAME = "Global Bot"; LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - + @Value("${broadcastNotificationChunkSize}") + private String broadcastNotificationChunkSize; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { @@ -137,10 +132,41 @@ public void accept(XMessage msg) { if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { - log.info("final msg.toXML(): "+msg.toXML().toString()); - if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + log.info("final msg.toXML(): " + msg.toXML().toString()); + + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch(NumberFormatException ex){ + chunkSize = null; + } + if(chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().get(0) != null && msg.getTransformers().size() > 0 + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 0; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; + } + } + } else { + log.error("federatedUsers not found : " + msg.toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + } else{ + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + + } else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { kafkaProducer.send(genericTransformerTopic, msg.toXML()); } else { kafkaProducer.send(odkTransformerTopic, msg.toXML()); @@ -477,7 +503,7 @@ private void logTimeTaken(long startTime, int checkpointID) { * @return */ private Mono getLastMessageID(XMessage msg) { - if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { + if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO msg1) { @@ -489,13 +515,15 @@ public String apply(XMessageDAO msg1) { } }); - } else if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { + } else if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO lastMessage) { return String.valueOf(lastMessage.getId()); } }); + } else { + log.error("UserId not found : "+msg.toString()); } return Mono.empty(); } @@ -550,4 +578,45 @@ private void switchFromTo(XMessage xMessage) { xMessage.setFrom(to); xMessage.setTo(from); } + + /** + * Convert Federated users into chunks + * @param users + * @param chunkSize + * @return + */ + private List chunkArrayList(JSONArray users, int chunkSize) { + if (users != null && users.length() > 0) { + ArrayList arrayToChunk = new ArrayList<>(); + + for (int i = 0; i < users.length(); i++) { + arrayToChunk.add(users.getJSONObject(i)); + } + ArrayList> chunkList = new ArrayList<>(); + int guide = arrayToChunk.size(); + int index = 0; + int tale = chunkSize; + while (tale < arrayToChunk.size()) { + chunkList.add(arrayToChunk.subList(index, tale)); + guide = guide - chunkSize; + index = index + chunkSize; + tale = tale + chunkSize; + } + if (guide > 0) { + chunkList.add(arrayToChunk.subList(index, index + guide)); + } + List userChunksList = new ArrayList<>(); + for (List l : chunkList) { + JSONArray jsonArray = new JSONArray(); + for (JSONObject jsonObject : l) { + jsonArray.put(jsonObject); + } + userChunksList.add(jsonArray); + } + return userChunksList; + } else{ + log.error("Federated Users null found : "+users); + } + return null; + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2aced5a..14ad224 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -100,4 +100,7 @@ spring.mail.host=${EMAIL_HOST:#{""}} spring.mail.port=${EMAIL_PORT:#{"587"}} spring.mail.username=${EMAIL_USERNAME:#{""}} spring.mail.password=${EMAIL_PASSWORD:#{""}} -spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} \ No newline at end of file +spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} + +# Send notifications in chunks +broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}} From d7c778d44e065d05dc1ca1ee7f748e167fbdf0a5 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Sat, 15 Apr 2023 01:33:17 +0530 Subject: [PATCH 3/5] [Hot Fix] - Sending Notifications in Chunks --- .../Consumer/ReactiveConsumer.java | 54 +++++++------------ 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 3e55c81..a3545ed 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -142,7 +142,7 @@ public void accept(XMessage msg) { chunkSize = null; } if(chunkSize != null) { - if (msg.getTransformers() != null && msg.getTransformers().get(0) != null && msg.getTransformers().size() > 0 + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); int totalFederatedUsers = federatedUsers.length(); @@ -150,7 +150,7 @@ public void accept(XMessage msg) { kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } else { List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); - int count = 0; + int count = 1; for (JSONArray jsonArray : jsonArrayList) { log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); @@ -289,7 +289,7 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { */ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { String botId = botNode.get("id").asText(); - + /* Get federated users from federation services */ JSONArray users = userService.getUsersFromFederatedServers(botId); @@ -304,7 +304,7 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { ObjectNode node = mapper.createObjectNode(); node.put("body", transformerMeta.get("body").asText()); node.put("type", transformerMeta.get("templateType").asText()); - + ArrayNode sampleData = mapper.createArrayNode(); for (int i = 0; i < users.length(); i++) { ObjectNode userData = mapper.createObjectNode(); @@ -320,12 +320,12 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { sampleData.add(userData); } node.put("sampleData", sampleData); - + /* Fetch user messages by template from template service */ ArrayList usersMessage = userService.getUsersMessageByTemplate(node); - + log.info("usersMessage: "+usersMessage); - + /* Set User messages against the user phone */ ObjectNode federatedUsersMeta = mapper.createObjectNode(); ArrayNode userMetaData = mapper.createArrayNode(); @@ -435,10 +435,10 @@ public void accept(Throwable throwable) { */ private String getFAUserIdForApp(String deviceID, UUID appID) { String userID = null; - + Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); userID = result != null ? result.toString() : null; - + if(userID == null || userID.isEmpty()) { ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); @@ -587,36 +587,18 @@ private void switchFromTo(XMessage xMessage) { */ private List chunkArrayList(JSONArray users, int chunkSize) { if (users != null && users.length() > 0) { - ArrayList arrayToChunk = new ArrayList<>(); - - for (int i = 0; i < users.length(); i++) { - arrayToChunk.add(users.getJSONObject(i)); - } - ArrayList> chunkList = new ArrayList<>(); - int guide = arrayToChunk.size(); - int index = 0; - int tale = chunkSize; - while (tale < arrayToChunk.size()) { - chunkList.add(arrayToChunk.subList(index, tale)); - guide = guide - chunkSize; - index = index + chunkSize; - tale = tale + chunkSize; - } - if (guide > 0) { - chunkList.add(arrayToChunk.subList(index, index + guide)); - } - List userChunksList = new ArrayList<>(); - for (List l : chunkList) { - JSONArray jsonArray = new JSONArray(); - for (JSONObject jsonObject : l) { - jsonArray.put(jsonObject); - } - userChunksList.add(jsonArray); + ArrayList chunksList = new ArrayList<>(); + chunksList.add(new JSONArray()); + for (int x = 0; x < users.length(); x++) { + JSONObject user = users.getJSONObject(x); + if (chunksList.get(chunksList.size() - 1).length() == chunkSize) + chunksList.add(new JSONArray()); + chunksList.get(chunksList.size() - 1).put(user); } - return userChunksList; + return chunksList; } else{ log.error("Federated Users null found : "+users); + return null; } - return null; } } From f114c270c6889fe32fc4cace4eee8aab01262350 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Mon, 17 Apr 2023 13:03:38 +0530 Subject: [PATCH 4/5] [Hot Fix] - Sending Notifications in Chunks --- .../uci/orchestrator/Consumer/ReactiveConsumer.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index a3545ed..b07748f 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -22,7 +22,10 @@ import io.fusionauth.domain.UserRegistration; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import messagerosa.core.model.*; +import messagerosa.core.model.DeviceType; +import messagerosa.core.model.SenderReceiverInfo; +import messagerosa.core.model.Transformer; +import messagerosa.core.model.XMessage; import messagerosa.xml.XMessageParser; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONArray; @@ -40,7 +43,11 @@ import javax.xml.bind.JAXBException; import java.io.ByteArrayInputStream; import java.time.LocalDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.Comparator; import java.util.function.Consumer; import java.util.function.Function; From 1839c52d1fb4f3b4c8af3b11054114569d3db782 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 21 Apr 2023 18:47:53 +0530 Subject: [PATCH 5/5] [Notification] - Making query URL paginated for user segment --- .../uci/orchestrator/Consumer/ReactiveConsumer.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index b07748f..d339271 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -249,7 +249,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { ? transformerMeta.findValue("formID").asText() : ""); if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer)); + if(xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")){ + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); + } else{ + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + } } if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { @@ -294,11 +298,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { * @param transformer * @return Federated users as json string */ - private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { + private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) { String botId = botNode.get("id").asText(); /* Get federated users from federation services */ - JSONArray users = userService.getUsersFromFederatedServers(botId); + JSONArray users = userService.getUsersFromFederatedServers(botId, page); /* Check if users, & related meta data exists in transformer */ if(users != null && transformer.get("meta") != null