From 493a5033cc9aef7ad2b015e520d0da3d6e7c9dd0 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 25 May 2023 14:12:34 +0530 Subject: [PATCH 01/28] Added logs for kafka topic consume and push count --- .../uci/orchestrator/Consumer/ReactiveConsumer.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index addf485..2af07f2 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -100,12 +100,17 @@ public class ReactiveConsumer { private long notificationProcessedCount; + private long consumeCount; + private long pushCount; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); + consumeCount++; + log.info("Consume topic count : " + consumeCount); SenderReceiverInfo from = msg.getFrom(); logTimeTaken(startTime, 1, null); botService.getBotNodeFromName(msg.getApp()).doOnNext(new Consumer() { @@ -165,9 +170,13 @@ public void accept(XMessage msg) { } else{ kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } + pushCount++; notificationProcessedCount++; - logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: process-end: %d ms"); + logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: Push count : " + + pushCount + " :: process-end: %d ms"); + } else { + log.info("Calling ODK : " + msg.toString()); getLastMessageID(msg) .doOnNext(lastMessageID -> { logTimeTaken(startTime, 4, null); @@ -220,7 +229,7 @@ public void accept(Throwable throwable) { }).subscribe(); } catch (Exception e) { - e.printStackTrace(); + log.error("An Error ReactiveConsumer : " + e.getMessage()); } } From b41a31d33267bf7e8e68cf7430a72236837c0cd0 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Tue, 30 May 2023 13:44:03 +0530 Subject: [PATCH 02/28] Added memory cap flag to JVM --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index bcb91b3..3090fc1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,4 @@ WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx250m","-jar","app.jar"] From c46b7cdc6fcf63840867780bec95fc1d4f038d35 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Tue, 30 May 2023 22:58:42 +0530 Subject: [PATCH 03/28] change jvm version and set optimization flags --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3090fc1..f5bd6db 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,11 +28,11 @@ ADD /src $HOME/src RUN mvn package -s $HOME/settings.xml -DskipTests=true # Package stage -FROM openjdk:11 +FROM ibm-semeru-runtimes:open-11.0.18_10-jre ENV HOME=/home/app ENV export $(cat .env | xargs) WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-Xmx250m","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx250m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] From 60d352aa7f1e64a061a24dfe86aa1e10c8efbf91 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Wed, 31 May 2023 14:01:39 +0530 Subject: [PATCH 04/28] Version update --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d937f99..d4a4613 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.uci orchestrator - 2.2.0-SNAPSHOT + 2.2.6 orchestrator Demo project for Spring Boot From 02b28520a4036ebb9f356e3d7665ca8dda51808d Mon Sep 17 00:00:00 2001 From: Anvansh <32684077+RyanWalker277@users.noreply.github.com> Date: Wed, 31 May 2023 17:54:43 +0530 Subject: [PATCH 05/28] Build and push action (#57) --- .../docker-build-push-dev-master.yml | 32 +++++++++++++++++++ .github/workflows/docker-build-push-dev.yml | 32 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 .github/workflows/docker-build-push-dev-master.yml create mode 100644 .github/workflows/docker-build-push-dev.yml diff --git a/.github/workflows/docker-build-push-dev-master.yml b/.github/workflows/docker-build-push-dev-master.yml new file mode 100644 index 0000000..6b58d58 --- /dev/null +++ b/.github/workflows/docker-build-push-dev-master.yml @@ -0,0 +1,32 @@ +name: Docker Build +on: + push: + branches: + - master +jobs: + docker-build-push: + runs-on: ubuntu-20.04 + timeout-minutes: 40 + steps: + - name: Check out the repo + uses: actions/checkout@v3 + - name: Extract version from pom.xml + id: version + run: | + echo ::set-output name=VERSION::$(grep -A 1 'orchestrator' pom.xml | grep -oP '(?<=).*?(?=)') + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Log in to Docker Hub + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{secrets.DOCKER_HUB_USERNAME}} + password: ${{secrets.DOCKER_HUB_ACCESS_TOKEN}} + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: "." + push: true + tags: samagragovernance/orchestrator:v${{ steps.version.outputs.VERSION }} + build-args: | + username=${{ secrets.USERNAME }}, + token=${{ secrets.TOKEN }} \ No newline at end of file diff --git a/.github/workflows/docker-build-push-dev.yml b/.github/workflows/docker-build-push-dev.yml new file mode 100644 index 0000000..a954289 --- /dev/null +++ b/.github/workflows/docker-build-push-dev.yml @@ -0,0 +1,32 @@ +name: Docker Build +on: + push: + branches: + - development +jobs: + docker-build-push: + runs-on: ubuntu-20.04 + timeout-minutes: 40 + steps: + - name: Check out the repo + uses: actions/checkout@v3 + - name: Extract version from pom.xml + id: version + run: | + echo ::set-output name=VERSION::$(grep -A 1 'orchestrator' pom.xml | grep -oP '(?<=).*?(?=)') + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Log in to Docker Hub + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{secrets.DOCKER_HUB_USERNAME}} + password: ${{secrets.DOCKER_HUB_ACCESS_TOKEN}} + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: "." + push: true + tags: samagragovernance/orchestrator:v${{ steps.version.outputs.VERSION }}-SNAPSHOT + build-args: | + username=${{ secrets.USERNAME }}, + token=${{ secrets.TOKEN }} \ No newline at end of file From ca6651506461db8b3aa1b260910ed010345d7a3b Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Wed, 31 May 2023 19:19:38 +0530 Subject: [PATCH 06/28] Added logs for axiom --- .../Consumer/ReactiveConsumer.java | 341 ++++++++++-------- 1 file changed, 185 insertions(+), 156 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 2af07f2..7482c8d 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -71,7 +71,7 @@ public class ReactiveConsumer { @Value("${odk-transformer}") public String odkTransformerTopic; - + @Value("${broadcast-transformer}") public String broadcastTransformerTopic; @@ -80,7 +80,7 @@ public class ReactiveConsumer { @Autowired public BotService botService; - + @Autowired private UserService userService; @@ -89,7 +89,7 @@ public class ReactiveConsumer { @Autowired private RedisCacheService redisCacheService; - + public AESWrapper encryptor; private final String DEFAULT_APP_NAME = "Global Bot"; @@ -109,19 +109,23 @@ public void onMessage(@Payload String stringMessage) { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - consumeCount++; - log.info("Consume topic count : " + consumeCount); + + if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { + consumeCount++; + log.info("Consume topic by Orchestrator count : " + consumeCount); + } + SenderReceiverInfo from = msg.getFrom(); logTimeTaken(startTime, 1, null); botService.getBotNodeFromName(msg.getApp()).doOnNext(new Consumer() { @Override public void accept(JsonNode botNode) { - if(botNode != null && !botNode.isEmpty()) { + if (botNode != null && !botNode.isEmpty()) { from.setCampaignID(msg.getApp()); - if(from.getDeviceType() == null) { + if (from.getDeviceType() == null) { from.setDeviceType(DeviceType.PHONE); } - if(msg.getAdapterId() == null || msg.getAdapterId().isEmpty()) { + if (msg.getAdapterId() == null || msg.getAdapterId().isEmpty()) { msg.setAdapterId(BotUtil.getBotNodeAdapterId(botNode)); } /* Set XMessage Transformers */ @@ -136,46 +140,57 @@ public void accept(JsonNode botNode) { public void accept(XMessage msg) { SenderReceiverInfo from = msg.getFrom(); // msg.setFrom(from); - try { - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + try { /* Switch From & To */ switchFromTo(msg); Integer chunkSize = null; try { chunkSize = Integer.parseInt(broadcastNotificationChunkSize); - } catch(NumberFormatException ex){ + } catch (NumberFormatException ex) { chunkSize = null; } - if(chunkSize != null) { + if (chunkSize != null) { if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); int totalFederatedUsers = federatedUsers.length(); if (totalFederatedUsers <= chunkSize) { + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+totalFederatedUsers); kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } else { List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); int count = 1; for (JSONArray jsonArray : jsonArrayList) { - log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count); msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+jsonArray.length()); kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); count++; } } } else { - log.error("federatedUsers not found : " + msg.toString()); + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); + } + } else { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + federatedUsers.length()); kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); } - } else{ - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } pushCount++; notificationProcessedCount++; - logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: Push count : " - + pushCount + " :: process-end: %d ms"); - - } else { + logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : " + + pushCount + " :: orchestrator-notification-process-end-time: %d ms"); + } catch (Exception ex) { + log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: "+ex.getMessage()); + } + } else { + try { log.info("Calling ODK : " + msg.toString()); getLastMessageID(msg) .doOnNext(lastMessageID -> { @@ -207,12 +222,11 @@ public void accept(Throwable throwable) { } }) .subscribe(); + } catch (Exception ex) { + log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: "+ex.getMessage()); } - } catch (JAXBException ex) { - log.error("Error while converting toXML() : " + ex.getMessage()); - } catch (Exception ex){ - log.error("An Error Occurred : " + ex.getMessage()); } + } }) .doOnError(new Consumer() { @@ -222,7 +236,7 @@ public void accept(Throwable throwable) { } }).subscribe(); } else { - log.error("Bot node not found by name: "+msg.getApp()); + log.error("Bot node not found by name: " + msg.getApp()); } } @@ -235,99 +249,106 @@ public void accept(Throwable throwable) { /** * Set Transformer in XMessage with transformer required data in meta + * * @param xMessage * @param botNode * @return XMessage */ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { - ArrayList transformers = new ArrayList(); - - ArrayList transformerList = (ArrayList) botNode.findValues("transformers"); - transformerList.forEach(transformerTmp -> { - JsonNode transformerNode = (JsonNode) transformerTmp; - int i = 0; - while (transformerNode.get(i) != null && transformerNode.get(i).path("meta") != null) { - JsonNode transformer = transformerNode.get(i); - JsonNode transformerMeta = transformer.path("meta") != null - ? transformer.path("meta") : null; - log.info("transformer:" + transformer); - - HashMap metaData = new HashMap(); - /* Bot Data */ - metaData.put("startingMessage", BotUtil.getBotNodeData(botNode, "startingMessage")); - metaData.put("botId", BotUtil.getBotNodeData(botNode, "id")); - metaData.put("botOwnerID", BotUtil.getBotNodeData(botNode, "ownerID")); - metaData.put("botOwnerOrgID", BotUtil.getBotNodeData(botNode, "ownerOrgID")); - - /* Transformer Data */ - metaData.put("id", transformer.get("id").asText()); - metaData.put("type", transformerMeta.get("type") != null - && !transformerMeta.get("type").asText().isEmpty() - ? transformerMeta.get("type").asText() - : ""); - metaData.put("formID", transformerMeta.findValue("formID") != null - && !transformerMeta.findValue("formID").asText().isEmpty() - ? transformerMeta.findValue("formID").asText() - : ""); - if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - if(xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")){ - log.info("page number orch : "+ xMessage.getFrom().getMeta().get("page")); - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); - } else{ - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + try { + ArrayList transformers = new ArrayList(); + + ArrayList transformerList = (ArrayList) botNode.findValues("transformers"); + transformerList.forEach(transformerTmp -> { + JsonNode transformerNode = (JsonNode) transformerTmp; + int i = 0; + while (transformerNode.get(i) != null && transformerNode.get(i).path("meta") != null) { + JsonNode transformer = transformerNode.get(i); + JsonNode transformerMeta = transformer.path("meta") != null + ? transformer.path("meta") : null; + log.info("transformer:" + transformer); + + HashMap metaData = new HashMap(); + /* Bot Data */ + metaData.put("startingMessage", BotUtil.getBotNodeData(botNode, "startingMessage")); + metaData.put("botId", BotUtil.getBotNodeData(botNode, "id")); + metaData.put("botOwnerID", BotUtil.getBotNodeData(botNode, "ownerID")); + metaData.put("botOwnerOrgID", BotUtil.getBotNodeData(botNode, "ownerOrgID")); + + /* Transformer Data */ + metaData.put("id", transformer.get("id").asText()); + metaData.put("type", transformerMeta.get("type") != null + && !transformerMeta.get("type").asText().isEmpty() + ? transformerMeta.get("type").asText() + : ""); + metaData.put("formID", transformerMeta.findValue("formID") != null + && !transformerMeta.findValue("formID").asText().isEmpty() + ? transformerMeta.findValue("formID").asText() + : ""); + if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + if (xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")) { + log.info("page number orch : " + xMessage.getFrom().getMeta().get("page")); + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); + } else { + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + } } - } - if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { - metaData.put("hiddenFields", transformerMeta.findValue("hiddenFields").toString()); - } + if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { + metaData.put("hiddenFields", transformerMeta.findValue("hiddenFields").toString()); + } - if (transformer.findValue("serviceClass") != null && !transformer.findValue("serviceClass").asText().isEmpty()) { - String serviceClass = transformer.findValue("serviceClass").toString(); - if (serviceClass != null && !serviceClass.isEmpty() && serviceClass.contains("\"")) { - serviceClass = serviceClass.replaceAll("\"", ""); + if (transformer.findValue("serviceClass") != null && !transformer.findValue("serviceClass").asText().isEmpty()) { + String serviceClass = transformer.findValue("serviceClass").toString(); + if (serviceClass != null && !serviceClass.isEmpty() && serviceClass.contains("\"")) { + serviceClass = serviceClass.replaceAll("\"", ""); + } + metaData.put("serviceClass", serviceClass); } - metaData.put("serviceClass", serviceClass); - } - if (transformerMeta.get("templateId") != null && !transformerMeta.get("templateId").asText().isEmpty()) { - metaData.put("templateId", transformerMeta.get("templateId").asText()); - } + if (transformerMeta.get("templateId") != null && !transformerMeta.get("templateId").asText().isEmpty()) { + metaData.put("templateId", transformerMeta.get("templateId").asText()); + } - if (transformerMeta.get("title") != null && !transformerMeta.get("title").asText().isEmpty()) { - metaData.put("title", transformerMeta.get("title").asText()); - } + if (transformerMeta.get("title") != null && !transformerMeta.get("title").asText().isEmpty()) { + metaData.put("title", transformerMeta.get("title").asText()); + } - if (transformer.get("type") != null && transformer.get("type").asText().equals(BotUtil.transformerTypeGeneric)) { - metaData.put("url", transformer.findValue("url").asText()); - } + if (transformer.get("type") != null && transformer.get("type").asText().equals(BotUtil.transformerTypeGeneric)) { + metaData.put("url", transformer.findValue("url").asText()); + } - Transformer transf = new Transformer(); - transf.setId(transformer.get("id").asText()); - transf.setMetaData(metaData); + Transformer transf = new Transformer(); + transf.setId(transformer.get("id").asText()); + transf.setMetaData(metaData); - transformers.add(transf); - i++; - } - }); - xMessage.setTransformers(transformers); - return xMessage; + transformers.add(transf); + i++; + } + }); + xMessage.setTransformers(transformers); + return xMessage; + } catch (Exception ex) { + log.error("Error:setXMessageTransformers::Exception:" + ex.getMessage()); + return null; + } } /** * Get Federated Users Data for Broadcast transformer + * * @param botNode * @param transformer * @return Federated users as json string */ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) { - String botId = botNode.get("id").asText(); + String botId = botNode.get("id").asText(); - /* Get federated users from federation services */ + /* Get federated users from federation services */ JSONArray users = userService.getUsersFromFederatedServers(botId, page); /* Check if users, & related meta data exists in transformer */ - if(users != null && transformer.get("meta") != null + if (users != null && transformer.get("meta") != null && transformer.get("meta").get("templateType") != null && transformer.get("meta").get("body") != null) { ObjectNode transformerMeta = (ObjectNode) transformer.get("meta"); @@ -338,58 +359,58 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str node.put("body", transformerMeta.get("body").asText()); node.put("type", transformerMeta.get("templateType").asText()); - ArrayNode sampleData = mapper.createArrayNode(); - for (int i = 0; i < users.length(); i++) { - ObjectNode userData = mapper.createObjectNode(); - if(transformerMeta.get("params") != null && !transformerMeta.get("params").toString().isEmpty()){ + ArrayNode sampleData = mapper.createArrayNode(); + for (int i = 0; i < users.length(); i++) { + ObjectNode userData = mapper.createObjectNode(); + if (transformerMeta.get("params") != null && !transformerMeta.get("params").toString().isEmpty()) { JSONArray paramArr = new JSONArray(transformerMeta.get("params").toString()); - for(int k=0; k usersMessage = userService.getUsersMessageByTemplate(node); + /* Fetch user messages by template from template service */ + ArrayList usersMessage = userService.getUsersMessageByTemplate(node); - log.info("usersMessage: "+usersMessage); + log.info("ReactiveConsumer:getUsersMessageByTemplate::Count: " + usersMessage.size()); - /* Set User messages against the user phone */ - ObjectNode federatedUsersMeta = mapper.createObjectNode(); - ArrayNode userMetaData = mapper.createArrayNode(); + /* Set User messages against the user phone */ + ObjectNode federatedUsersMeta = mapper.createObjectNode(); + ArrayNode userMetaData = mapper.createArrayNode(); usersMessage.forEach(userMsg -> { - int j = Integer.parseInt(userMsg.get("__index").toString()); + int j = Integer.parseInt(userMsg.get("__index").toString()); JSONObject userObj = ((JSONObject) users.get(j)); - String userPhone = userObj.getString("phoneNo"); + String userPhone = userObj.getString("phoneNo"); - ObjectNode map = mapper.createObjectNode(); - map.put("phone", userPhone); - map.put("message", userMsg.get("body").toString()); - try{ + ObjectNode map = mapper.createObjectNode(); + map.put("phone", userPhone); + map.put("message", userMsg.get("body").toString()); + try { /* FCM Token */ - if(userObj.get("fcmToken") != null) { + if (userObj.get("fcmToken") != null) { map.put("fcmToken", userObj.getString("fcmToken")); } /* FCM - If clickActionUrl found in userObj, use it, override previous one */ - if(userObj.get("fcmClickActionUrl") != null) { + if (userObj.get("fcmClickActionUrl") != null) { map.put("fcmClickActionUrl", userObj.getString("fcmClickActionUrl")); } - if(transformerMeta.get("data") != null){ + if (transformerMeta.get("data") != null) { map.put("data", transformerMeta.get("data")); } } catch (Exception e) { - // + log.error("ErrorParsingUserObj:getFederatedUsersMeta::Exception: " + e.getMessage()); } userMetaData.add(map); - log.info("index: "+j+", body: "+userMsg.get("body").toString()+", phone:"+userPhone); - }); - + log.info("index: " + j + ", body: " + userMsg.get("body").toString() + ", phone:" + userPhone); + }); + federatedUsersMeta.put("list", userMetaData); return federatedUsersMeta.toString(); @@ -399,6 +420,7 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str /** * Resolve User - Fetch user if exists or register it in Fusion Auth Client + * * @param xmsg * @param appId * @return @@ -408,17 +430,17 @@ private Mono resolveUser(XMessage xmsg, String appId) { SenderReceiverInfo from = xmsg.getFrom(); String appName = xmsg.getApp(); Boolean found = false; - + UUID appID = UUID.fromString(appId); - + String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); String encodedBase64Key = encodeKey(secret); String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); - log.info("deviceString: "+deviceString+", encyprted deviceString: "+deviceID); + log.info("ReactiveConsumer:resolveUser::Calling:deviceString: " + deviceString + ", encyprted deviceString: " + deviceID); String userID = getFAUserIdForApp(deviceID, appID); - + if (userID != null && !userID.isEmpty()) { - log.info("Found FA user id"); + log.info("ReactiveConsumer:resolveUser::Found FA user id: " + userID); from.setDeviceID(userID); from.setEncryptedDeviceID(deviceID); xmsg.setFrom(from); @@ -428,14 +450,13 @@ private Mono resolveUser(XMessage xmsg, String appId) { .flatMap(new Function, Mono>() { @Override public Mono apply(Pair result) { - log.info("FA update user"); if (result.getLeft()) { from.setDeviceID(result.getRight()); from.setEncryptedDeviceID(deviceID); xmsg.setFrom(from); ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); if (response.wasSuccessful() && isUserRegistered(response, appID)) { - redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), response.successResponse.user.id.toString()); + redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), response.successResponse.user.id.toString()); return Mono.just(xmsg); } else { return Mono.just(xmsg); @@ -448,86 +469,90 @@ public Mono apply(Pair result) { }).doOnError(new Consumer() { @Override public void accept(Throwable throwable) { - log.error("Error in updateUser" + throwable.getMessage()); + log.error("ReactiveConsumer:resolveUser::Calling update user: " + throwable.getMessage()); } }); } } catch (Exception e) { e.printStackTrace(); - log.error("Error in resolveUser" + e.getMessage()); + log.error("ReactiveConsumer:resolveUser::Error in resolving user: " + e.getMessage()); xmsg.setFrom(null); return Mono.just(xmsg); } } - + /** * Get Fusion Auth User's UUID for App + * * @param deviceID * @param appID * @return */ private String getFAUserIdForApp(String deviceID, UUID appID) { - String userID = null; + String userID = null; - Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); - userID = result != null ? result.toString() : null; + Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); + userID = result != null ? result.toString() : null; + + if (userID == null || userID.isEmpty()) { + ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); - if(userID == null || userID.isEmpty()) { - ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); - if (response.wasSuccessful() && isUserRegistered(response, appID)) { - userID = response.successResponse.user.id.toString(); - redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), userID); + userID = response.successResponse.user.id.toString(); + redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), userID); } - } + } return userID; } - + /** * Check if FA user is registered for appid + * * @param response * @param appID * @return */ private Boolean isUserRegistered(ClientResponse response, UUID appID) { - List registrations = response.successResponse.user.getRegistrations(); - for(int i=0; i registrations = response.successResponse.user.getRegistrations(); + for (int i = 0; i < registrations.size(); i++) { + if (registrations.get(i).applicationId.equals(appID)) { + return true; + } + } + return false; } - + private String getFACacheName(String deviceID, UUID appID) { - return deviceID+"-"+appID.toString(); + return deviceID + "-" + appID.toString(); } /** * Update fusion auth user data + * * @param user */ private void updateFAUser(User user) { System.out.println(user); UserRequest r = new UserRequest(user); - + ClientResponse response = botService.fusionAuthClient.updateUser(user.id, r); - if(response.wasSuccessful()) { + if (response.wasSuccessful()) { System.out.println("user update success"); } else { - System.out.println("error in user update"+response.errorResponse); + System.out.println("error in user update" + response.errorResponse); } } /** * Log time taken between two checkpoints + * * @param startTime * @param checkpointID */ private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) { long endTime = System.nanoTime(); long duration = (endTime - startTime) / 1000000; - if(formatedMsg == null) { + if (formatedMsg == null) { log.info(String.format("CP-%d: %d ms", checkpointID, duration)); } else { log.info(String.format(formatedMsg, duration)); @@ -536,6 +561,7 @@ private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) /** * Get Last XMessage ID of user + * * @param msg * @return */ @@ -560,13 +586,14 @@ public String apply(XMessageDAO lastMessage) { } }); } else { - log.error("UserId not found : "+msg.toString()); + log.error("UserId not found : " + msg.toString()); } return Mono.empty(); } /** * Get Latest XMessage of a user + * * @param userID * @param yesterday * @param messageState @@ -607,6 +634,7 @@ public void accept(Throwable throwable) { /** * Switch from & To in XMessage + * * @param xMessage */ private void switchFromTo(XMessage xMessage) { @@ -618,6 +646,7 @@ private void switchFromTo(XMessage xMessage) { /** * Convert Federated users into chunks + * * @param users * @param chunkSize * @return @@ -633,8 +662,8 @@ private List chunkArrayList(JSONArray users, int chunkSize) { chunksList.get(chunksList.size() - 1).put(user); } return chunksList; - } else{ - log.error("Federated Users null found : "+users); + } else { + log.error("Federated Users null found : " + users); return null; } } From d9b65d7b58b516b02e75cdc0317a0dbfda3a9d4f Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 2 Jun 2023 15:00:38 +0530 Subject: [PATCH 07/28] Change build version --- pom.xml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index d4a4613..c0e8317 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,9 @@ 11 + 2.2.2 + 2.2.2 + 2.2.2 @@ -109,7 +112,7 @@ com.uci dao - 2.2.0-SNAPSHOT + ${dao.version} org.springframework.boot @@ -122,12 +125,12 @@ com.uci message-rosa - 2.2.0-SNAPSHOT + ${messagerosa.version} com.uci utils - 2.2.0-SNAPSHOT + ${utils.version} org.springframework.boot From 4e8d8446a873e62748208b3940669508fd74e1d1 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Fri, 2 Jun 2023 16:41:42 +0530 Subject: [PATCH 08/28] Increase JVM heap space --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f5bd6db..2acce99 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,4 @@ WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-Xmx250m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx350m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] From 4bd859ef0e9b4ba0e75980eaf5a08e59945cf2e5 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 2 Jun 2023 20:11:34 +0530 Subject: [PATCH 09/28] Upgrade version for prod --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c0e8317..d990a60 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.uci orchestrator - 2.2.6 + 2.2.7 orchestrator Demo project for Spring Boot From bedecb496afccd92092d7aa3ab6e6c2f0da9b7ba Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Tue, 6 Jun 2023 14:21:50 +0530 Subject: [PATCH 10/28] Update jvm size and Notification part separate --- Dockerfile | 2 +- .../Consumer/OuboundConsumer.java | 22 +++++++++++++++++-- src/main/resources/application.properties | 1 + 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2acce99..e49a792 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,4 @@ WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-Xmx350m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx1024m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] diff --git a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java index cd079be..64a1430 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java @@ -22,15 +22,33 @@ public class OuboundConsumer { @Value("${outbound}") private String outboundTopic; + @Value("${notificationOutbound}") + public String notificationOutbound; + @Autowired public SimpleProducer kafkaProducer; + private long consumeCount, pushNotificationCount, pushOtherCount; + + @KafkaListener(id = "${processOutbound}", topics = "${processOutbound}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { + consumeCount++; + log.info("OutboundConsumer:onMessage:: Consume Topic Count: " + consumeCount); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - kafkaProducer.send(outboundTopic, msg.toXML()); - } catch(JAXBException e) { + if (msg != null && msg.getProvider() != null && msg.getProvider().equalsIgnoreCase("firebase") + && msg.getChannel().equalsIgnoreCase("web")) { + pushNotificationCount++; + log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount); + kafkaProducer.send(notificationOutbound, msg.toXML()); + } else { + pushOtherCount++; + log.info("OutboundConsumer:onMessage:: Other push to kafka topic count: " + pushOtherCount); + kafkaProducer.send(outboundTopic, msg.toXML()); + } + + } catch (JAXBException e) { e.printStackTrace(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bf5557f..68451c8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -13,6 +13,7 @@ broadcast-transformer=${KAFKA_BROADCAST_TRANSFORMER_TOPIC:#{"broadcast-transform generic-transformer=${KAFKA_GENERIC_TRANSFORMER_TOPIC:#{"generic-transformer"}} processOutbound=${KAFKA_PROCESS_OUTBOUND} outbound=${KAFKA_OUTBOUND_TOPIC} +notificationOutbound=${KAFKA_NOTIFICATION_TOPIC} # Cassandra #spring.data.cassandra.contactpoints=${CASSANDRA_URL} From e15e02cf6465609042310d1eed95cb1131c0c211 Mon Sep 17 00:00:00 2001 From: Pankaj Jangid <103931276+pankajjangid05@users.noreply.github.com> Date: Mon, 12 Jun 2023 15:44:55 +0530 Subject: [PATCH 11/28] Update build version (#59) --- pom.xml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index d990a60..8694f1e 100644 --- a/pom.xml +++ b/pom.xml @@ -11,15 +11,15 @@ com.uci orchestrator - 2.2.7 + 2.2.8 orchestrator Demo project for Spring Boot 11 - 2.2.2 - 2.2.2 - 2.2.2 + 2.2.3 + 2.2.3 + 2.2.3 From bb331730d089dae4ff45e5872afa7363f92c9a89 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Wed, 14 Jun 2023 18:41:47 +0530 Subject: [PATCH 12/28] [Hot Fix] - Updated Logback File --- src/main/resources/logback-spring.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 7fdc8dd..9fa196c 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -7,7 +7,7 @@ class="ch.qos.logback.core.ConsoleAppender"> - %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}:%line): %msg%n%throwable + %green(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}:%line): %msg%n%throwable From fdd61dd70f5968303bba4a437581d108940d4c38 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 13:56:07 +0530 Subject: [PATCH 13/28] [Hot Fix] - Kafka config change and change the buffer time --- .../com/uci/orchestrator/Application/AppConfigOrchestrator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 5900adb..75c54a0 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -88,6 +88,7 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); return configuration; } From f2f066f46dd9ef34e323647d09747c911d86a114 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 14:01:44 +0530 Subject: [PATCH 14/28] [Hot Fix] - Kafka config change --- .../com/uci/orchestrator/Application/AppConfigOrchestrator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 75c54a0..fe5e7ba 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -88,7 +88,7 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return configuration; } From dec35c15d6780335e18561987b36b6d869ea1cd0 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 15:12:55 +0530 Subject: [PATCH 15/28] [Hot Fix] - Changes in kafka configuration --- .../Application/AppConfigOrchestrator.java | 2 +- .../orchestrator/Consumer/ReactiveConsumer.java | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index fe5e7ba..aeefacf 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -88,7 +88,7 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return configuration; } diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 7482c8d..9a6bb21 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @@ -103,13 +104,13 @@ public class ReactiveConsumer { private long consumeCount; private long pushCount; - @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) - public void onMessage(@Payload String stringMessage) { + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}, containerFactory = "kafkaManualAckListenerContainerFactory") + public void onMessage(@Payload String stringMessage, Acknowledgment acknowledgment) { try { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - + acknowledgment.acknowledge(); if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { consumeCount++; log.info("Consume topic by Orchestrator count : " + consumeCount); @@ -142,6 +143,7 @@ public void accept(XMessage msg) { // msg.setFrom(from); if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { try { + log.info("broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); /* Switch From & To */ switchFromTo(msg); Integer chunkSize = null; @@ -156,7 +158,7 @@ public void accept(XMessage msg) { JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); int totalFederatedUsers = federatedUsers.length(); if (totalFederatedUsers <= chunkSize) { - log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+totalFederatedUsers); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + totalFederatedUsers); kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } else { List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); @@ -164,7 +166,7 @@ public void accept(XMessage msg) { for (JSONArray jsonArray : jsonArrayList) { log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count); msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); - log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: "+jsonArray.length()); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + jsonArray.length()); kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); count++; } @@ -187,7 +189,7 @@ public void accept(XMessage msg) { logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : " + pushCount + " :: orchestrator-notification-process-end-time: %d ms"); } catch (Exception ex) { - log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: "+ex.getMessage()); + log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: " + ex.getMessage()); } } else { try { @@ -223,7 +225,7 @@ public void accept(Throwable throwable) { }) .subscribe(); } catch (Exception ex) { - log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: "+ex.getMessage()); + log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: " + ex.getMessage()); } } From af473d240d81fd2d0a9bae2e9f8dadac1dd69b68 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 15:32:28 +0530 Subject: [PATCH 16/28] [Hot Fix] - Changes in kafka configuration manual ack --- .../java/com/uci/orchestrator/Consumer/ReactiveConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 9a6bb21..efea9ef 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -104,7 +104,7 @@ public class ReactiveConsumer { private long consumeCount; private long pushCount; - @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}, containerFactory = "kafkaManualAckListenerContainerFactory") + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage, Acknowledgment acknowledgment) { try { final long startTime = System.nanoTime(); From 21479c85a4e2394da4d93cf1c03d43274f9b4070 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 15:52:56 +0530 Subject: [PATCH 17/28] [Hot Fix] - Changes in kafka configuration --- .../orchestrator/Application/AppConfigOrchestrator.java | 7 +++---- .../com/uci/orchestrator/Consumer/ReactiveConsumer.java | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index aeefacf..953951c 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.kie.api.io.Resource; import org.kie.api.runtime.KieSession; import org.kie.internal.io.ResourceFactory; @@ -16,9 +17,7 @@ import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.*; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; @@ -121,7 +120,7 @@ SenderOptions kafkaSenderOptions() { @Bean Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { - return KafkaReceiver.create(kafkaReceiverOptions).receive(); + return KafkaReceiver.create(kafkaReceiverOptions).receive().doOnNext(r -> r.receiverOffset().acknowledge()); } @Bean diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index efea9ef..adbb8d6 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -105,12 +105,11 @@ public class ReactiveConsumer { private long pushCount; @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) - public void onMessage(@Payload String stringMessage, Acknowledgment acknowledgment) { + public void onMessage(@Payload String stringMessage) { try { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - acknowledgment.acknowledge(); if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { consumeCount++; log.info("Consume topic by Orchestrator count : " + consumeCount); From 366fe378c34bd8dc207f782bc2b69f3469d77183 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 20:20:15 +0530 Subject: [PATCH 18/28] [Bug Fix] - Added logs to identify duplication of kafka topic --- .../Consumer/ReactiveConsumer.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index adbb8d6..bc85907 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -44,11 +44,7 @@ import javax.xml.bind.JAXBException; import java.io.ByteArrayInputStream; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.Comparator; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -104,6 +100,10 @@ public class ReactiveConsumer { private long consumeCount; private long pushCount; + private Set messageIdSet = new HashSet<>(); + + private long insertSetCount, existSetCount; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { @@ -113,6 +113,18 @@ public void onMessage(@Payload String stringMessage) { if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { consumeCount++; log.info("Consume topic by Orchestrator count : " + consumeCount); + if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) { + String messageId = msg.getMessageId().getChannelMessageId(); + if (messageIdSet.contains(messageId)) { + existSetCount++; + log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId); + } else { + insertSetCount++; + log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId); + messageIdSet.add(messageId); + } + } + log.info("ReactiveConsumer: Total MessageId Set : " + messageIdSet.size()); } SenderReceiverInfo from = msg.getFrom(); @@ -142,7 +154,7 @@ public void accept(XMessage msg) { // msg.setFrom(from); if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { try { - log.info("broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); + log.info("ReactiveConsumer:broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); /* Switch From & To */ switchFromTo(msg); Integer chunkSize = null; From 2afccf0d16c25522dbd56edaa268da3a8cd475b7 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 15 Jun 2023 21:09:04 +0530 Subject: [PATCH 19/28] [Bug Fix] - Added logs to identify duplication of kafka topic --- .../java/com/uci/orchestrator/Consumer/ReactiveConsumer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index bc85907..b7652b0 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -118,6 +118,7 @@ public void onMessage(@Payload String stringMessage) { if (messageIdSet.contains(messageId)) { existSetCount++; log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId); + return; } else { insertSetCount++; log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId); From da8cdc2396a3f44fca73cb15f3622f3c675b7fae Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 16 Jun 2023 11:36:01 +0530 Subject: [PATCH 20/28] [Bug Fix] - Added logs to identify duplication of kafka topic --- .../Consumer/OuboundConsumer.java | 2 +- .../Consumer/ReactiveConsumer.java | 30 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java index 64a1430..a0392fc 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java @@ -40,7 +40,7 @@ public void onMessage(@Payload String stringMessage) { if (msg != null && msg.getProvider() != null && msg.getProvider().equalsIgnoreCase("firebase") && msg.getChannel().equalsIgnoreCase("web")) { pushNotificationCount++; - log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount); + log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount + " UserId : " + msg.getTo().getUserID()); kafkaProducer.send(notificationOutbound, msg.toXML()); } else { pushOtherCount++; diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index b7652b0..e7af226 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -101,8 +101,9 @@ public class ReactiveConsumer { private long pushCount; private Set messageIdSet = new HashSet<>(); + private HashSet federatedUsers = new HashSet<>(); - private long insertSetCount, existSetCount; + private long insertSetCount, existSetCount, existingFederatedUsers; @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { @@ -113,6 +114,7 @@ public void onMessage(@Payload String stringMessage) { if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { consumeCount++; log.info("Consume topic by Orchestrator count : " + consumeCount); + // This code for kafka duplication problem if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) { String messageId = msg.getMessageId().getChannelMessageId(); if (messageIdSet.contains(messageId)) { @@ -360,6 +362,23 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str /* Get federated users from federation services */ JSONArray users = userService.getUsersFromFederatedServers(botId, page); + for (int i = 0; i < users.length(); i++) { + JSONObject jsonObject = (JSONObject) users.get(i); + if (jsonObject != null && !jsonObject.isNull("phoneNo")) { + String phoneNo = jsonObject.getString("phoneNo"); + if (federatedUsers.contains(phoneNo)) { + existingFederatedUsers++; + log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); + } else { + log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo); + federatedUsers.add(phoneNo); + } + } else { + log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString()); + } + } + + log.info("ReactiveConsumer:getFederatedUsersMeta::Count: " + (users == null ? "user not found" : users.length()) + " >> Set count: " + federatedUsers.size()); /* Check if users, & related meta data exists in transformer */ if (users != null && transformer.get("meta") != null @@ -392,6 +411,13 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str /* Fetch user messages by template from template service */ ArrayList usersMessage = userService.getUsersMessageByTemplate(node); +// for (int i = 0; i < usersMessage.size(); i++) { +// JSONObject jsonObject = usersMessage.get(i); +// if(jsonObject != null && !jsonObject.isNull("")){ +// +// } +// } + log.info("ReactiveConsumer:getUsersMessageByTemplate::Count: " + usersMessage.size()); /* Set User messages against the user phone */ @@ -428,6 +454,8 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str federatedUsersMeta.put("list", userMetaData); return federatedUsersMeta.toString(); + } else { + log.error("ReactiveConsumer:getFederatedUsersMetaElse::Users not found"); } return ""; } From 9ae68f0069b3103e40358140379d1c71dc993709 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 16 Jun 2023 17:20:27 +0530 Subject: [PATCH 21/28] [Bug Fix] - Config change for testing kafka duplication --- .../com/uci/orchestrator/Application/AppConfigOrchestrator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 953951c..1918615 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -88,6 +88,7 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + configuration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "120000"); return configuration; } From 3d90702a949f9adfa9a97c578e9c146cf1eb2ac2 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Fri, 16 Jun 2023 17:47:51 +0530 Subject: [PATCH 22/28] [Bug Fix] - Config change for testing kafka duplication --- .../com/uci/orchestrator/Application/AppConfigOrchestrator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 1918615..e765bca 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -88,7 +88,7 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - configuration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "120000"); + configuration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "1800000"); return configuration; } From a0543c1847832193ff29c1ab6e8bf01337e840ca Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Tue, 20 Jun 2023 15:06:18 +0530 Subject: [PATCH 23/28] Build Version Change --- pom.xml | 6 +- .../Consumer/ReactiveConsumer.java | 92 ++++++++++--------- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/pom.xml b/pom.xml index 8694f1e..f4596e8 100644 --- a/pom.xml +++ b/pom.xml @@ -17,9 +17,9 @@ 11 - 2.2.3 - 2.2.3 - 2.2.3 + 2.2.3-SNAPSHOT + 2.2.3-SNAPSHOT + 2.2.3-SNAPSHOT diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index e7af226..a016d7d 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -208,36 +208,38 @@ public void accept(XMessage msg) { } else { try { log.info("Calling ODK : " + msg.toString()); - getLastMessageID(msg) - .doOnNext(lastMessageID -> { - logTimeTaken(startTime, 4, null); - msg.setLastMessageID(lastMessageID); - - /* Switch From & To */ - switchFromTo(msg); - - if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { - try { - log.info("final msg.toXML(): " + msg.toXML().toString()); - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { - kafkaProducer.send(genericTransformerTopic, msg.toXML()); - } else { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); - } - // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); - } catch (JAXBException e) { - e.printStackTrace(); - } - logTimeTaken(startTime, 0, "process-end: %d ms"); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getLastMessageID" + throwable.getMessage()); - } - }) - .subscribe(); +// getLastMessageID(msg) +// .doOnNext(lastMessageID -> { + logTimeTaken(startTime, 4, null); +// msg.setLastMessageID(lastMessageID); + + /* Switch From & To */ + switchFromTo(msg); + + if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { + try { + log.info("final msg.toXML(): " + msg.toXML().toString()); + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + kafkaProducer.send(genericTransformerTopic, msg.toXML()); + } else { + kafkaProducer.send(odkTransformerTopic, msg.toXML()); + } + // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); + } catch (JAXBException e) { + e.printStackTrace(); + } + logTimeTaken(startTime, 0, "process-end: %d ms"); + } else { + log.error("ReactiveConsumer:onMessage:: MessageSate Invalid Found : " + msg.getMessageState()); + } +// }) +// .doOnError(new Consumer() { +// @Override +// public void accept(Throwable throwable) { +// log.error("Error in getLastMessageID" + throwable.getMessage()); +// } +// }) +// .subscribe(); } catch (Exception ex) { log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: " + ex.getMessage()); } @@ -362,21 +364,21 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str /* Get federated users from federation services */ JSONArray users = userService.getUsersFromFederatedServers(botId, page); - for (int i = 0; i < users.length(); i++) { - JSONObject jsonObject = (JSONObject) users.get(i); - if (jsonObject != null && !jsonObject.isNull("phoneNo")) { - String phoneNo = jsonObject.getString("phoneNo"); - if (federatedUsers.contains(phoneNo)) { - existingFederatedUsers++; - log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); - } else { - log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo); - federatedUsers.add(phoneNo); - } - } else { - log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString()); - } - } +// for (int i = 0; i < users.length(); i++) { +// JSONObject jsonObject = (JSONObject) users.get(i); +// if (jsonObject != null && !jsonObject.isNull("phoneNo")) { +// String phoneNo = jsonObject.getString("phoneNo"); +// if (federatedUsers.contains(phoneNo)) { +// existingFederatedUsers++; +// log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); +// } else { +// log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo); +// federatedUsers.add(phoneNo); +// } +// } else { +// log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString()); +// } +// } log.info("ReactiveConsumer:getFederatedUsersMeta::Count: " + (users == null ? "user not found" : users.length()) + " >> Set count: " + federatedUsers.size()); From d86aaae48b769efb5ea7f4bad023e12feaea77c3 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Wed, 21 Jun 2023 16:00:47 +0530 Subject: [PATCH 24/28] [Bug Fix] - Added redis for kafka duplication problem --- .../Consumer/ReactiveConsumer.java | 209 +++++++++++------- src/main/resources/application.properties | 3 + 2 files changed, 127 insertions(+), 85 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index a016d7d..26b7a44 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -100,10 +100,11 @@ public class ReactiveConsumer { private long consumeCount; private long pushCount; - private Set messageIdSet = new HashSet<>(); private HashSet federatedUsers = new HashSet<>(); private long insertSetCount, existSetCount, existingFederatedUsers; + @Value("${notification-kafka-cache}") + private String notificationKafkaCache; @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { @@ -114,20 +115,52 @@ public void onMessage(@Payload String stringMessage) { if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { consumeCount++; log.info("Consume topic by Orchestrator count : " + consumeCount); - // This code for kafka duplication problem - if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) { + + /** + * Kafka Duplication check + * If messageId found in cache we don't send notifications + * We need to resolve this issue from kafka side this is temporary solution + */ + if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null && notificationKafkaCache != null) { String messageId = msg.getMessageId().getChannelMessageId(); - if (messageIdSet.contains(messageId)) { - existSetCount++; - log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId); - return; - } else { - insertSetCount++; - log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId); + + if (!redisCacheService.isKeyExists(notificationKafkaCache)) { + Set messageIdSet = new HashSet<>(); messageIdSet.add(messageId); + redisCacheService.setCache(notificationKafkaCache, messageIdSet); + insertSetCount++; + log.info("ReactiveConsumer:First MessageId Insert in Redis count : " + insertSetCount + " MessageId : " + messageId); + } else { + Set messageIdSet = (Set) redisCacheService.getCache(notificationKafkaCache); + if (messageIdSet.contains(messageId)) { + existSetCount++; + log.info("ReactiveConsumer:Already Consumed : " + existSetCount + " MessageId : " + messageId); + return; + } else { + messageIdSet.add(messageId); + redisCacheService.setCache(notificationKafkaCache, messageIdSet); + insertSetCount++; + log.info("ReactiveConsumer:MessageId Insert in Redis count : " + insertSetCount + " MessageId : " + messageId); + } } + } else { + log.error("ReactiveConsumer:MessageId Not Found : " + msg.getMessageId() + " or Notification Cache Name Empty : " + notificationKafkaCache); } - log.info("ReactiveConsumer: Total MessageId Set : " + messageIdSet.size()); + + // This code for kafka duplication problem +// if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) { +// String messageId = msg.getMessageId().getChannelMessageId(); +// if (messageIdSet.contains(messageId)) { +// existSetCount++; +// log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId); +// return; +// } else { +// insertSetCount++; +// log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId); +// messageIdSet.add(messageId); +// } +// } +// log.info("ReactiveConsumer: Total MessageId Set : " + messageIdSet.size()); } SenderReceiverInfo from = msg.getFrom(); @@ -149,65 +182,69 @@ public void accept(JsonNode botNode) { String appId = botNode.get("id").asText(); JsonNode firstTransformer = botNode.findValues("transformers").get(0); - resolveUser(message, appId) - .doOnNext(new Consumer() { - @Override - public void accept(XMessage msg) { - SenderReceiverInfo from = msg.getFrom(); - // msg.setFrom(from); - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - try { - log.info("ReactiveConsumer:broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); - /* Switch From & To */ - switchFromTo(msg); - Integer chunkSize = null; - try { - chunkSize = Integer.parseInt(broadcastNotificationChunkSize); - } catch (NumberFormatException ex) { - chunkSize = null; - } - if (chunkSize != null) { - if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null - && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { - JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); - int totalFederatedUsers = federatedUsers.length(); - if (totalFederatedUsers <= chunkSize) { - log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + totalFederatedUsers); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else { - List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); - int count = 1; - for (JSONArray jsonArray : jsonArrayList) { - log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count); - msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); - log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + jsonArray.length()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - count++; - } - } - } else { - log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); - } - } else { - if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null - && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { - JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); - log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + federatedUsers.length()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else { - log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); - } + if (message != null && message.getProviderURI().equals("firebase") && message.getChannelURI().equals("web")) { + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + try { + log.info("ReactiveConsumer:broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); + /* Switch From & To */ + switchFromTo(msg); + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch (NumberFormatException ex) { + chunkSize = null; + } + if (chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + totalFederatedUsers); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 1; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + jsonArray.length()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; } - pushCount++; - notificationProcessedCount++; - logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : " - + pushCount + " :: orchestrator-notification-process-end-time: %d ms"); - } catch (Exception ex) { - log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: " + ex.getMessage()); } } else { + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); + } + } else { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + federatedUsers.length()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); + } + } + pushCount++; + notificationProcessedCount++; + logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : " + + pushCount + " :: orchestrator-notification-process-end-time: %d ms"); + } catch (Exception ex) { + log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: " + ex.getMessage()); + } + } else { + log.error("ReactiveConsumer:onMessage:: Invalid Type Found : " + firstTransformer.findValue("type")); + } + } else { + log.info("ReactiveConsumer:ResolveUser Calling: " + message); + resolveUser(message, appId) + .doOnNext(new Consumer() { + @Override + public void accept(XMessage msg) { + SenderReceiverInfo from = msg.getFrom(); + // msg.setFrom(from); try { - log.info("Calling ODK : " + msg.toString()); // getLastMessageID(msg) // .doOnNext(lastMessageID -> { logTimeTaken(startTime, 4, null); @@ -243,16 +280,16 @@ public void accept(XMessage msg) { } catch (Exception ex) { log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: " + ex.getMessage()); } - } - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in resolveUser" + throwable.getMessage()); - } - }).subscribe(); + } + }) + .doOnError(new Consumer() { + @Override + public void accept(Throwable throwable) { + log.error("Error in resolveUser" + throwable.getMessage()); + } + }).subscribe(); + } } else { log.error("Bot node not found by name: " + msg.getApp()); } @@ -364,21 +401,23 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str /* Get federated users from federation services */ JSONArray users = userService.getUsersFromFederatedServers(botId, page); -// for (int i = 0; i < users.length(); i++) { -// JSONObject jsonObject = (JSONObject) users.get(i); -// if (jsonObject != null && !jsonObject.isNull("phoneNo")) { -// String phoneNo = jsonObject.getString("phoneNo"); -// if (federatedUsers.contains(phoneNo)) { -// existingFederatedUsers++; -// log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); -// } else { + for (int i = 0; i < users.length(); i++) { + JSONObject jsonObject = (JSONObject) users.get(i); + if (jsonObject != null && !jsonObject.isNull("phoneNo")) { + String phoneNo = jsonObject.getString("phoneNo"); + if (federatedUsers.contains(phoneNo)) { + existingFederatedUsers++; + log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); + } +// else { // log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo); // federatedUsers.add(phoneNo); // } -// } else { + } +// else { // log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString()); // } -// } + } log.info("ReactiveConsumer:getFederatedUsersMeta::Count: " + (users == null ? "user not found" : users.length()) + " >> Set count: " + federatedUsers.size()); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 68451c8..0c32926 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -102,3 +102,6 @@ spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} # Send notifications in chunks broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}} + +# Added Redis Cache for Kafka Duplication in Firebase(for NL) +notification-kafka-cache=${NOTIFICATION_KAFKA_CACHE:#{""}} From 97d652ae98275ec0e60ace5c86c04d472e049ebe Mon Sep 17 00:00:00 2001 From: Pankaj Jangid <103931276+pankajjangid05@users.noreply.github.com> Date: Fri, 23 Jun 2023 19:53:39 +0530 Subject: [PATCH 25/28] Added Conversation Authorization Headers for Federated Users Request (#63) --- .../uci/orchestrator/Consumer/ReactiveConsumer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 26b7a44..dd3a55f 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -121,7 +121,7 @@ public void onMessage(@Payload String stringMessage) { * If messageId found in cache we don't send notifications * We need to resolve this issue from kafka side this is temporary solution */ - if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null && notificationKafkaCache != null) { + if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null && notificationKafkaCache != null && !notificationKafkaCache.isEmpty()) { String messageId = msg.getMessageId().getChannelMessageId(); if (!redisCacheService.isKeyExists(notificationKafkaCache)) { @@ -343,9 +343,9 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { if (xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")) { log.info("page number orch : " + xMessage.getFrom().getMeta().get("page")); - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta())); } else { - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta())); } } @@ -396,11 +396,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { * @param transformer * @return Federated users as json string */ - private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) { + private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Map meta) { String botId = botNode.get("id").asText(); /* Get federated users from federation services */ - JSONArray users = userService.getUsersFromFederatedServers(botId, page); + JSONArray users = userService.getUsersFromFederatedServers(botId, meta); for (int i = 0; i < users.length(); i++) { JSONObject jsonObject = (JSONObject) users.get(i); if (jsonObject != null && !jsonObject.isNull("phoneNo")) { From becf481e470198a00231996d4a3b7a20829252da Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Mon, 26 Jun 2023 12:33:36 +0530 Subject: [PATCH 26/28] Get configuration values from application file --- .../Application/AppConfigOrchestrator.java | 27 ++++++++++++------- src/main/resources/application.properties | 10 +++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index e765bca..23f7ea5 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -3,6 +3,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.uci.orchestrator.Drools.DroolsBeanFactory; import com.uci.utils.BotService; +import com.uci.utils.dto.BotServiceParams; import com.uci.utils.kafka.ReactiveProducer; import io.fusionauth.client.FusionAuthClient; import org.apache.kafka.clients.admin.NewTopic; @@ -42,9 +43,9 @@ public class AppConfigOrchestrator { @Value("${campaign.url}") public String CAMPAIGN_URL; - + @Value("${campaign.admin.token}") - public String CAMPAIGN_ADMIN_TOKEN; + public String CAMPAIGN_ADMIN_TOKEN; @Value("${fusionauth.url}") public String FUSIONAUTH_URL; @@ -133,21 +134,22 @@ KafkaSender reactiveKafkaSender(SenderOptions ReactiveProducer kafkaReactiveProducer() { return new ReactiveProducer(); } - + @Bean - ProducerFactory producerFactory(){ - ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); - return producerFactory; + ProducerFactory producerFactory() { + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); + return producerFactory; } - + @Bean KafkaTemplate kafkaTemplate() { - KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); - return (KafkaTemplate) kafkaTemplate; + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); + return (KafkaTemplate) kafkaTemplate; } /** * Create process outbound topic, if does not exists + * * @return */ @Bean @@ -157,6 +159,7 @@ public NewTopic createProcessOutboundTopic() { /** * Create broadcast transformer topic, if does not exists + * * @return */ @Bean @@ -166,10 +169,16 @@ public NewTopic createBroadcastTransformerTopic() { /** * Create generic transformer topic, if does not exists + * * @return */ @Bean public NewTopic createGenericTransformerTopic() { return new NewTopic(genericTransformerTopic, 1, (short) 1); } + + @Bean + public BotServiceParams getBotServiceParams() { + return new BotServiceParams(); + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0c32926..7352cd6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -39,6 +39,9 @@ spring.data.cassandra.pool.idle-timeout=20s spring.data.cassandra.pool.pool-timeout=20s logging.level.com.datastax.driver.core.QueryLogger.NORMAL=TRACE logging.level.com.datastax.driver.core.QueryLogger.SLOW=TRACE +spring.data.cassandra.advanced.control-connection.timeout=10s +spring.data.cassandra.advanced.metadata.schema.request-timeout=30s +spring.data.cassandra.basic.request.timeout=30s server.port=8686 campaign.url = ${CAMPAIGN_URL} @@ -72,6 +75,8 @@ spring.redis.number.port=${REDIS_NUMBER_PORT:#{6379}} spring.redis.sentinel.master= # Name of Redis server. spring.redis.sentinel.nodes= # Comma-separated list of host:port pairs. spring.redis.timeout=0 +# This time in seconds and we are default set 60*60 = 3600 -> 1 Hour +redis.key.timeout=${REDIS_KEY_TIMEOUT:#{3600}} #Env spring.profile.env=${ENV} @@ -105,3 +110,8 @@ broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}} # Added Redis Cache for Kafka Duplication in Firebase(for NL) notification-kafka-cache=${NOTIFICATION_KAFKA_CACHE:#{""}} + +# BotService WebClient Configurations +webclient.interval=${WEBCLIENT_INTERVAL:#{5000}} +webclient.retryMaxAttempts=${WEBCLIENT_RETRY_MAX_ATTEMPTS:#{3}} +webclient.retryMinBackoff=${WEBCLIENT_RETRY_MIN_BACK_OFF:#{5}} From c8443a47fb1c5dfb69599af96593797f4985d51a Mon Sep 17 00:00:00 2001 From: Pankaj Jangid <103931276+pankajjangid05@users.noreply.github.com> Date: Mon, 26 Jun 2023 15:11:00 +0530 Subject: [PATCH 27/28] Added Conversation Authorization Headers for Federated Users Request (#64) From f633483fb43b7f28491969ac572e7dd2fa8cca5a Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Mon, 26 Jun 2023 16:40:53 +0530 Subject: [PATCH 28/28] Hot Fix --- src/main/resources/application.properties | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7352cd6..1e4fd12 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -43,6 +43,7 @@ spring.data.cassandra.advanced.control-connection.timeout=10s spring.data.cassandra.advanced.metadata.schema.request-timeout=30s spring.data.cassandra.basic.request.timeout=30s + server.port=8686 campaign.url = ${CAMPAIGN_URL} campaign.admin.token = EXnYOvDx4KFqcQkdXqI38MHgFvnJcxMS