diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f588aa5..02040b8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -3,10 +3,10 @@ name: Maven Build on: push: branches: - ["release-4.*.*", "release-5.*.*", "release-v2.*.*"] + ["release-4.*.*", "release-5.*.*", "release-v2.*.*", "master", "development"] pull_request: branches: - ["release-4.*.*", "release-5.*.*", "release-v2.*.*"] + ["release-4.*.*", "release-5.*.*", "release-v2.*.*", "master", "development"] jobs: build: diff --git a/.github/workflows/docker-build-push.yml b/.github/workflows/docker-build-push.yml index 91ce07a..7f0eb18 100644 --- a/.github/workflows/docker-build-push.yml +++ b/.github/workflows/docker-build-push.yml @@ -3,7 +3,7 @@ name: Docker Build on: push: tags: - - 'v*.*.*' + ["v*.*.*", "v*.*.*-*"] jobs: docker-build-push: diff --git a/pom.xml b/pom.xml index ac66d39..53ac13a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,12 +11,15 @@ com.uci orchestrator - 2.1.0 + 2.2.1 orchestrator Demo project for Spring Boot 11 + 2.2.1 + 2.2.1 + 2.2.1 @@ -109,7 +112,7 @@ com.uci dao - 2.1.0 + ${dao.version} org.springframework.boot @@ -122,12 +125,12 @@ com.uci message-rosa - 2.1.0 + ${messagerosa.version} com.uci utils - 2.1.0 + ${utils.version} org.springframework.boot diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index caf0408..d339271 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -44,10 +44,10 @@ import java.io.ByteArrayInputStream; import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.UUID; +import java.util.Comparator; import java.util.function.Consumer; import java.util.function.Function; @@ -95,7 +95,9 @@ public class ReactiveConsumer { private final String DEFAULT_APP_NAME = "Global Bot"; LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - + @Value("${broadcastNotificationChunkSize}") + private String broadcastNotificationChunkSize; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { @@ -137,10 +139,41 @@ public void accept(XMessage msg) { if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { - log.info("final msg.toXML(): "+msg.toXML().toString()); - if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + log.info("final msg.toXML(): " + msg.toXML().toString()); + + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch(NumberFormatException ex){ + chunkSize = null; + } + if(chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 1; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; + } + } + } else { + log.error("federatedUsers not found : " + msg.toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + } else{ + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + + } else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { kafkaProducer.send(genericTransformerTopic, msg.toXML()); } else { kafkaProducer.send(odkTransformerTopic, msg.toXML()); @@ -216,7 +249,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { ? transformerMeta.findValue("formID").asText() : ""); if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer)); + if(xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")){ + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); + } else{ + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + } } if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { @@ -261,11 +298,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { * @param transformer * @return Federated users as json string */ - private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { + private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) { String botId = botNode.get("id").asText(); - + /* Get federated users from federation services */ - JSONArray users = userService.getUsersFromFederatedServers(botId); + JSONArray users = userService.getUsersFromFederatedServers(botId, page); /* Check if users, & related meta data exists in transformer */ if(users != null && transformer.get("meta") != null @@ -278,7 +315,7 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { ObjectNode node = mapper.createObjectNode(); node.put("body", transformerMeta.get("body").asText()); node.put("type", transformerMeta.get("templateType").asText()); - + ArrayNode sampleData = mapper.createArrayNode(); for (int i = 0; i < users.length(); i++) { ObjectNode userData = mapper.createObjectNode(); @@ -294,12 +331,12 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) { sampleData.add(userData); } node.put("sampleData", sampleData); - + /* Fetch user messages by template from template service */ ArrayList usersMessage = userService.getUsersMessageByTemplate(node); - + log.info("usersMessage: "+usersMessage); - + /* Set User messages against the user phone */ ObjectNode federatedUsersMeta = mapper.createObjectNode(); ArrayNode userMetaData = mapper.createArrayNode(); @@ -409,10 +446,10 @@ public void accept(Throwable throwable) { */ private String getFAUserIdForApp(String deviceID, UUID appID) { String userID = null; - + Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); userID = result != null ? result.toString() : null; - + if(userID == null || userID.isEmpty()) { ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); @@ -477,7 +514,7 @@ private void logTimeTaken(long startTime, int checkpointID) { * @return */ private Mono getLastMessageID(XMessage msg) { - if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { + if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("text")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO msg1) { @@ -489,13 +526,15 @@ public String apply(XMessageDAO msg1) { } }); - } else if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { + } else if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("button")) { return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { @Override public String apply(XMessageDAO lastMessage) { return String.valueOf(lastMessage.getId()); } }); + } else { + log.error("UserId not found : "+msg.toString()); } return Mono.empty(); } @@ -550,4 +589,27 @@ private void switchFromTo(XMessage xMessage) { xMessage.setFrom(to); xMessage.setTo(from); } + + /** + * Convert Federated users into chunks + * @param users + * @param chunkSize + * @return + */ + private List chunkArrayList(JSONArray users, int chunkSize) { + if (users != null && users.length() > 0) { + ArrayList chunksList = new ArrayList<>(); + chunksList.add(new JSONArray()); + for (int x = 0; x < users.length(); x++) { + JSONObject user = users.getJSONObject(x); + if (chunksList.get(chunksList.size() - 1).length() == chunkSize) + chunksList.add(new JSONArray()); + chunksList.get(chunksList.size() - 1).put(user); + } + return chunksList; + } else{ + log.error("Federated Users null found : "+users); + return null; + } + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 44709db..bf5557f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -90,4 +90,14 @@ spring.azure.blob.store.account.key=${AZURE_BLOB_STORE_ACCOUNT_KEY:#{"key"}} spring.azure.blob.store.container.name=${AZURE_BLOB_STORE_CONTAINER:#{"container"}} # Template Service Base Url -template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}} \ No newline at end of file +template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}} + +# Email Config +spring.mail.host=${EMAIL_HOST:#{""}} +spring.mail.port=${EMAIL_PORT:#{"587"}} +spring.mail.username=${EMAIL_USERNAME:#{""}} +spring.mail.password=${EMAIL_PASSWORD:#{""}} +spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} + +# Send notifications in chunks +broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}}