diff --git a/.github/workflows/docker-build-push-dev-master.yml b/.github/workflows/docker-build-push-dev-master.yml new file mode 100644 index 0000000..6b58d58 --- /dev/null +++ b/.github/workflows/docker-build-push-dev-master.yml @@ -0,0 +1,32 @@ +name: Docker Build +on: + push: + branches: + - master +jobs: + docker-build-push: + runs-on: ubuntu-20.04 + timeout-minutes: 40 + steps: + - name: Check out the repo + uses: actions/checkout@v3 + - name: Extract version from pom.xml + id: version + run: | + echo ::set-output name=VERSION::$(grep -A 1 'orchestrator' pom.xml | grep -oP '(?<=).*?(?=)') + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Log in to Docker Hub + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{secrets.DOCKER_HUB_USERNAME}} + password: ${{secrets.DOCKER_HUB_ACCESS_TOKEN}} + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: "." + push: true + tags: samagragovernance/orchestrator:v${{ steps.version.outputs.VERSION }} + build-args: | + username=${{ secrets.USERNAME }}, + token=${{ secrets.TOKEN }} \ No newline at end of file diff --git a/.github/workflows/docker-build-push-dev.yml b/.github/workflows/docker-build-push-dev.yml new file mode 100644 index 0000000..a954289 --- /dev/null +++ b/.github/workflows/docker-build-push-dev.yml @@ -0,0 +1,32 @@ +name: Docker Build +on: + push: + branches: + - development +jobs: + docker-build-push: + runs-on: ubuntu-20.04 + timeout-minutes: 40 + steps: + - name: Check out the repo + uses: actions/checkout@v3 + - name: Extract version from pom.xml + id: version + run: | + echo ::set-output name=VERSION::$(grep -A 1 'orchestrator' pom.xml | grep -oP '(?<=).*?(?=)') + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Log in to Docker Hub + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{secrets.DOCKER_HUB_USERNAME}} + password: ${{secrets.DOCKER_HUB_ACCESS_TOKEN}} + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: "." + push: true + tags: samagragovernance/orchestrator:v${{ steps.version.outputs.VERSION }}-SNAPSHOT + build-args: | + username=${{ secrets.USERNAME }}, + token=${{ secrets.TOKEN }} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index bcb91b3..e49a792 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,11 +28,11 @@ ADD /src $HOME/src RUN mvn package -s $HOME/settings.xml -DskipTests=true # Package stage -FROM openjdk:11 +FROM ibm-semeru-runtimes:open-11.0.18_10-jre ENV HOME=/home/app ENV export $(cat .env | xargs) WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx1024m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] diff --git a/pom.xml b/pom.xml index 53ac13a..7433841 100644 --- a/pom.xml +++ b/pom.xml @@ -11,15 +11,15 @@ com.uci orchestrator - 2.2.1 + 2.2.9 orchestrator Demo project for Spring Boot 11 - 2.2.1 - 2.2.1 - 2.2.1 + 2.2.4 + 2.2.4 + 2.2.4 diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 5900adb..23f7ea5 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -3,11 +3,13 @@ import com.github.benmanes.caffeine.cache.Cache; import com.uci.orchestrator.Drools.DroolsBeanFactory; import com.uci.utils.BotService; +import com.uci.utils.dto.BotServiceParams; import com.uci.utils.kafka.ReactiveProducer; import io.fusionauth.client.FusionAuthClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.kie.api.io.Resource; import org.kie.api.runtime.KieSession; import org.kie.internal.io.ResourceFactory; @@ -16,9 +18,7 @@ import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.*; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; @@ -43,9 +43,9 @@ public class AppConfigOrchestrator { @Value("${campaign.url}") public String CAMPAIGN_URL; - + @Value("${campaign.admin.token}") - public String CAMPAIGN_ADMIN_TOKEN; + public String CAMPAIGN_ADMIN_TOKEN; @Value("${fusionauth.url}") public String FUSIONAUTH_URL; @@ -88,6 +88,8 @@ Map kafkaConsumerConfiguration() { configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + configuration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + configuration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "1800000"); return configuration; } @@ -120,7 +122,7 @@ SenderOptions kafkaSenderOptions() { @Bean Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { - return KafkaReceiver.create(kafkaReceiverOptions).receive(); + return KafkaReceiver.create(kafkaReceiverOptions).receive().doOnNext(r -> r.receiverOffset().acknowledge()); } @Bean @@ -132,21 +134,22 @@ KafkaSender reactiveKafkaSender(SenderOptions ReactiveProducer kafkaReactiveProducer() { return new ReactiveProducer(); } - + @Bean - ProducerFactory producerFactory(){ - ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); - return producerFactory; + ProducerFactory producerFactory() { + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); + return producerFactory; } - + @Bean KafkaTemplate kafkaTemplate() { - KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); - return (KafkaTemplate) kafkaTemplate; + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); + return (KafkaTemplate) kafkaTemplate; } /** * Create process outbound topic, if does not exists + * * @return */ @Bean @@ -156,6 +159,7 @@ public NewTopic createProcessOutboundTopic() { /** * Create broadcast transformer topic, if does not exists + * * @return */ @Bean @@ -165,10 +169,16 @@ public NewTopic createBroadcastTransformerTopic() { /** * Create generic transformer topic, if does not exists + * * @return */ @Bean public NewTopic createGenericTransformerTopic() { return new NewTopic(genericTransformerTopic, 1, (short) 1); } + + @Bean + public BotServiceParams getBotServiceParams() { + return new BotServiceParams(); + } } diff --git a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java index cd079be..a0392fc 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java @@ -22,15 +22,33 @@ public class OuboundConsumer { @Value("${outbound}") private String outboundTopic; + @Value("${notificationOutbound}") + public String notificationOutbound; + @Autowired public SimpleProducer kafkaProducer; + private long consumeCount, pushNotificationCount, pushOtherCount; + + @KafkaListener(id = "${processOutbound}", topics = "${processOutbound}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { + consumeCount++; + log.info("OutboundConsumer:onMessage:: Consume Topic Count: " + consumeCount); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - kafkaProducer.send(outboundTopic, msg.toXML()); - } catch(JAXBException e) { + if (msg != null && msg.getProvider() != null && msg.getProvider().equalsIgnoreCase("firebase") + && msg.getChannel().equalsIgnoreCase("web")) { + pushNotificationCount++; + log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount + " UserId : " + msg.getTo().getUserID()); + kafkaProducer.send(notificationOutbound, msg.toXML()); + } else { + pushOtherCount++; + log.info("OutboundConsumer:onMessage:: Other push to kafka topic count: " + pushOtherCount); + kafkaProducer.send(outboundTopic, msg.toXML()); + } + + } catch (JAXBException e) { e.printStackTrace(); } } diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index addf485..dd3a55f 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @@ -43,11 +44,7 @@ import javax.xml.bind.JAXBException; import java.io.ByteArrayInputStream; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.Comparator; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -71,7 +68,7 @@ public class ReactiveConsumer { @Value("${odk-transformer}") public String odkTransformerTopic; - + @Value("${broadcast-transformer}") public String broadcastTransformerTopic; @@ -80,7 +77,7 @@ public class ReactiveConsumer { @Autowired public BotService botService; - + @Autowired private UserService userService; @@ -89,7 +86,7 @@ public class ReactiveConsumer { @Autowired private RedisCacheService redisCacheService; - + public AESWrapper encryptor; private final String DEFAULT_APP_NAME = "Global Bot"; @@ -100,23 +97,83 @@ public class ReactiveConsumer { private long notificationProcessedCount; + private long consumeCount; + private long pushCount; + + private HashSet federatedUsers = new HashSet<>(); + + private long insertSetCount, existSetCount, existingFederatedUsers; + @Value("${notification-kafka-cache}") + private String notificationKafkaCache; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); + if (msg != null && msg.getProvider().equalsIgnoreCase("firebase")) { + consumeCount++; + log.info("Consume topic by Orchestrator count : " + consumeCount); + + /** + * Kafka Duplication check + * If messageId found in cache we don't send notifications + * We need to resolve this issue from kafka side this is temporary solution + */ + if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null && notificationKafkaCache != null && !notificationKafkaCache.isEmpty()) { + String messageId = msg.getMessageId().getChannelMessageId(); + + if (!redisCacheService.isKeyExists(notificationKafkaCache)) { + Set messageIdSet = new HashSet<>(); + messageIdSet.add(messageId); + redisCacheService.setCache(notificationKafkaCache, messageIdSet); + insertSetCount++; + log.info("ReactiveConsumer:First MessageId Insert in Redis count : " + insertSetCount + " MessageId : " + messageId); + } else { + Set messageIdSet = (Set) redisCacheService.getCache(notificationKafkaCache); + if (messageIdSet.contains(messageId)) { + existSetCount++; + log.info("ReactiveConsumer:Already Consumed : " + existSetCount + " MessageId : " + messageId); + return; + } else { + messageIdSet.add(messageId); + redisCacheService.setCache(notificationKafkaCache, messageIdSet); + insertSetCount++; + log.info("ReactiveConsumer:MessageId Insert in Redis count : " + insertSetCount + " MessageId : " + messageId); + } + } + } else { + log.error("ReactiveConsumer:MessageId Not Found : " + msg.getMessageId() + " or Notification Cache Name Empty : " + notificationKafkaCache); + } + + // This code for kafka duplication problem +// if (msg.getMessageId() != null && msg.getMessageId().getChannelMessageId() != null) { +// String messageId = msg.getMessageId().getChannelMessageId(); +// if (messageIdSet.contains(messageId)) { +// existSetCount++; +// log.info("ReactiveConsumer:Already Counsumed : " + existSetCount + " MessageId : " + messageId); +// return; +// } else { +// insertSetCount++; +// log.info("ReactiveConsumer:Insert in set count : " + insertSetCount + " MessageId : " + messageId); +// messageIdSet.add(messageId); +// } +// } +// log.info("ReactiveConsumer: Total MessageId Set : " + messageIdSet.size()); + } + SenderReceiverInfo from = msg.getFrom(); logTimeTaken(startTime, 1, null); botService.getBotNodeFromName(msg.getApp()).doOnNext(new Consumer() { @Override public void accept(JsonNode botNode) { - if(botNode != null && !botNode.isEmpty()) { + if (botNode != null && !botNode.isEmpty()) { from.setCampaignID(msg.getApp()); - if(from.getDeviceType() == null) { + if (from.getDeviceType() == null) { from.setDeviceType(DeviceType.PHONE); } - if(msg.getAdapterId() == null || msg.getAdapterId().isEmpty()) { + if (msg.getAdapterId() == null || msg.getAdapterId().isEmpty()) { msg.setAdapterId(BotUtil.getBotNodeAdapterId(botNode)); } /* Set XMessage Transformers */ @@ -125,200 +182,247 @@ public void accept(JsonNode botNode) { String appId = botNode.get("id").asText(); JsonNode firstTransformer = botNode.findValues("transformers").get(0); - resolveUser(message, appId) - .doOnNext(new Consumer() { - @Override - public void accept(XMessage msg) { - SenderReceiverInfo from = msg.getFrom(); - // msg.setFrom(from); - try { - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + if (message != null && message.getProviderURI().equals("firebase") && message.getChannelURI().equals("web")) { + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + try { + log.info("ReactiveConsumer:broadcastNotificationChunkSize : " + broadcastNotificationChunkSize); + /* Switch From & To */ + switchFromTo(msg); + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch (NumberFormatException ex) { + chunkSize = null; + } + if (chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + totalFederatedUsers); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 1; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : " + count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + jsonArray.length()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; + } + } + } else { + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); + } + } else { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + log.info("ReactiveConsumer:Pushed Federated Users to Kafka Topic: " + federatedUsers.length()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + log.error("ReactiveConsumer:federatedUsers not found in xMessage: " + msg); + } + } + pushCount++; + notificationProcessedCount++; + logTimeTaken(startTime, 0, "Notification processed by orchestrator: " + notificationProcessedCount + " :: Push count : " + + pushCount + " :: orchestrator-notification-process-end-time: %d ms"); + } catch (Exception ex) { + log.error("ReactiveConsumer:Notification Triggering Process:Error in pushing xMessage to kafka: " + ex.getMessage()); + } + } else { + log.error("ReactiveConsumer:onMessage:: Invalid Type Found : " + firstTransformer.findValue("type")); + } + } else { + log.info("ReactiveConsumer:ResolveUser Calling: " + message); + resolveUser(message, appId) + .doOnNext(new Consumer() { + @Override + public void accept(XMessage msg) { + SenderReceiverInfo from = msg.getFrom(); + // msg.setFrom(from); + try { +// getLastMessageID(msg) +// .doOnNext(lastMessageID -> { + logTimeTaken(startTime, 4, null); +// msg.setLastMessageID(lastMessageID); + /* Switch From & To */ switchFromTo(msg); - Integer chunkSize = null; - try { - chunkSize = Integer.parseInt(broadcastNotificationChunkSize); - } catch(NumberFormatException ex){ - chunkSize = null; - } - if(chunkSize != null) { - if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null - && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { - JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); - int totalFederatedUsers = federatedUsers.length(); - if (totalFederatedUsers <= chunkSize) { - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + + if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { + try { + log.info("final msg.toXML(): " + msg.toXML().toString()); + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + kafkaProducer.send(genericTransformerTopic, msg.toXML()); } else { - List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); - int count = 1; - for (JSONArray jsonArray : jsonArrayList) { - log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); - msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - count++; - } + kafkaProducer.send(odkTransformerTopic, msg.toXML()); } - } else { - log.error("federatedUsers not found : " + msg.toString()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); + } catch (JAXBException e) { + e.printStackTrace(); } - } else{ - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + logTimeTaken(startTime, 0, "process-end: %d ms"); + } else { + log.error("ReactiveConsumer:onMessage:: MessageSate Invalid Found : " + msg.getMessageState()); } - notificationProcessedCount++; - logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: process-end: %d ms"); - } else { - getLastMessageID(msg) - .doOnNext(lastMessageID -> { - logTimeTaken(startTime, 4, null); - msg.setLastMessageID(lastMessageID); - - /* Switch From & To */ - switchFromTo(msg); - - if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { - try { - log.info("final msg.toXML(): " + msg.toXML().toString()); - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { - kafkaProducer.send(genericTransformerTopic, msg.toXML()); - } else { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); - } - // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); - } catch (JAXBException e) { - e.printStackTrace(); - } - logTimeTaken(startTime, 0, "process-end: %d ms"); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getLastMessageID" + throwable.getMessage()); - } - }) - .subscribe(); +// }) +// .doOnError(new Consumer() { +// @Override +// public void accept(Throwable throwable) { +// log.error("Error in getLastMessageID" + throwable.getMessage()); +// } +// }) +// .subscribe(); + } catch (Exception ex) { + log.error("ReactiveConsumer:ODK and Generic Bot Processing:Exception: " + ex.getMessage()); } - } catch (JAXBException ex) { - log.error("Error while converting toXML() : " + ex.getMessage()); - } catch (Exception ex){ - log.error("An Error Occurred : " + ex.getMessage()); + } - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in resolveUser" + throwable.getMessage()); - } - }).subscribe(); + }) + .doOnError(new Consumer() { + @Override + public void accept(Throwable throwable) { + log.error("Error in resolveUser" + throwable.getMessage()); + } + }).subscribe(); + } } else { - log.error("Bot node not found by name: "+msg.getApp()); + log.error("Bot node not found by name: " + msg.getApp()); } } }).subscribe(); } catch (Exception e) { - e.printStackTrace(); + log.error("An Error ReactiveConsumer : " + e.getMessage()); } } /** * Set Transformer in XMessage with transformer required data in meta + * * @param xMessage * @param botNode * @return XMessage */ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) { - ArrayList transformers = new ArrayList(); - - ArrayList transformerList = (ArrayList) botNode.findValues("transformers"); - transformerList.forEach(transformerTmp -> { - JsonNode transformerNode = (JsonNode) transformerTmp; - int i = 0; - while (transformerNode.get(i) != null && transformerNode.get(i).path("meta") != null) { - JsonNode transformer = transformerNode.get(i); - JsonNode transformerMeta = transformer.path("meta") != null - ? transformer.path("meta") : null; - log.info("transformer:" + transformer); - - HashMap metaData = new HashMap(); - /* Bot Data */ - metaData.put("startingMessage", BotUtil.getBotNodeData(botNode, "startingMessage")); - metaData.put("botId", BotUtil.getBotNodeData(botNode, "id")); - metaData.put("botOwnerID", BotUtil.getBotNodeData(botNode, "ownerID")); - metaData.put("botOwnerOrgID", BotUtil.getBotNodeData(botNode, "ownerOrgID")); - - /* Transformer Data */ - metaData.put("id", transformer.get("id").asText()); - metaData.put("type", transformerMeta.get("type") != null - && !transformerMeta.get("type").asText().isEmpty() - ? transformerMeta.get("type").asText() - : ""); - metaData.put("formID", transformerMeta.findValue("formID") != null - && !transformerMeta.findValue("formID").asText().isEmpty() - ? transformerMeta.findValue("formID").asText() - : ""); - if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - if(xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")){ - log.info("page number orch : "+ xMessage.getFrom().getMeta().get("page")); - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page"))); - } else{ - metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null)); + try { + ArrayList transformers = new ArrayList(); + + ArrayList transformerList = (ArrayList) botNode.findValues("transformers"); + transformerList.forEach(transformerTmp -> { + JsonNode transformerNode = (JsonNode) transformerTmp; + int i = 0; + while (transformerNode.get(i) != null && transformerNode.get(i).path("meta") != null) { + JsonNode transformer = transformerNode.get(i); + JsonNode transformerMeta = transformer.path("meta") != null + ? transformer.path("meta") : null; + log.info("transformer:" + transformer); + + HashMap metaData = new HashMap(); + /* Bot Data */ + metaData.put("startingMessage", BotUtil.getBotNodeData(botNode, "startingMessage")); + metaData.put("botId", BotUtil.getBotNodeData(botNode, "id")); + metaData.put("botOwnerID", BotUtil.getBotNodeData(botNode, "ownerID")); + metaData.put("botOwnerOrgID", BotUtil.getBotNodeData(botNode, "ownerOrgID")); + + /* Transformer Data */ + metaData.put("id", transformer.get("id").asText()); + metaData.put("type", transformerMeta.get("type") != null + && !transformerMeta.get("type").asText().isEmpty() + ? transformerMeta.get("type").asText() + : ""); + metaData.put("formID", transformerMeta.findValue("formID") != null + && !transformerMeta.findValue("formID").asText().isEmpty() + ? transformerMeta.findValue("formID").asText() + : ""); + if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + if (xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")) { + log.info("page number orch : " + xMessage.getFrom().getMeta().get("page")); + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta())); + } else { + metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta())); + } } - } - if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { - metaData.put("hiddenFields", transformerMeta.findValue("hiddenFields").toString()); - } + if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) { + metaData.put("hiddenFields", transformerMeta.findValue("hiddenFields").toString()); + } - if (transformer.findValue("serviceClass") != null && !transformer.findValue("serviceClass").asText().isEmpty()) { - String serviceClass = transformer.findValue("serviceClass").toString(); - if (serviceClass != null && !serviceClass.isEmpty() && serviceClass.contains("\"")) { - serviceClass = serviceClass.replaceAll("\"", ""); + if (transformer.findValue("serviceClass") != null && !transformer.findValue("serviceClass").asText().isEmpty()) { + String serviceClass = transformer.findValue("serviceClass").toString(); + if (serviceClass != null && !serviceClass.isEmpty() && serviceClass.contains("\"")) { + serviceClass = serviceClass.replaceAll("\"", ""); + } + metaData.put("serviceClass", serviceClass); } - metaData.put("serviceClass", serviceClass); - } - if (transformerMeta.get("templateId") != null && !transformerMeta.get("templateId").asText().isEmpty()) { - metaData.put("templateId", transformerMeta.get("templateId").asText()); - } + if (transformerMeta.get("templateId") != null && !transformerMeta.get("templateId").asText().isEmpty()) { + metaData.put("templateId", transformerMeta.get("templateId").asText()); + } - if (transformerMeta.get("title") != null && !transformerMeta.get("title").asText().isEmpty()) { - metaData.put("title", transformerMeta.get("title").asText()); - } + if (transformerMeta.get("title") != null && !transformerMeta.get("title").asText().isEmpty()) { + metaData.put("title", transformerMeta.get("title").asText()); + } - if (transformer.get("type") != null && transformer.get("type").asText().equals(BotUtil.transformerTypeGeneric)) { - metaData.put("url", transformer.findValue("url").asText()); - } + if (transformer.get("type") != null && transformer.get("type").asText().equals(BotUtil.transformerTypeGeneric)) { + metaData.put("url", transformer.findValue("url").asText()); + } - Transformer transf = new Transformer(); - transf.setId(transformer.get("id").asText()); - transf.setMetaData(metaData); + Transformer transf = new Transformer(); + transf.setId(transformer.get("id").asText()); + transf.setMetaData(metaData); - transformers.add(transf); - i++; - } - }); - xMessage.setTransformers(transformers); - return xMessage; + transformers.add(transf); + i++; + } + }); + xMessage.setTransformers(transformers); + return xMessage; + } catch (Exception ex) { + log.error("Error:setXMessageTransformers::Exception:" + ex.getMessage()); + return null; + } } /** * Get Federated Users Data for Broadcast transformer + * * @param botNode * @param transformer * @return Federated users as json string */ - private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) { - String botId = botNode.get("id").asText(); + private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Map meta) { + String botId = botNode.get("id").asText(); + + /* Get federated users from federation services */ + JSONArray users = userService.getUsersFromFederatedServers(botId, meta); + for (int i = 0; i < users.length(); i++) { + JSONObject jsonObject = (JSONObject) users.get(i); + if (jsonObject != null && !jsonObject.isNull("phoneNo")) { + String phoneNo = jsonObject.getString("phoneNo"); + if (federatedUsers.contains(phoneNo)) { + existingFederatedUsers++; + log.info("ReactiveConsumer:getFederatedUsersMeta:: Duplicate Phone Number found : count: " + existingFederatedUsers + " Phone No : " + phoneNo); + } +// else { +// log.info("ReactiveConsumer:getFederatedUsersMeta::Inserting User in set : " + phoneNo); +// federatedUsers.add(phoneNo); +// } + } +// else { +// log.error("ReactiveConsumer:getFederatedUsersMeta::No Federated Users Found: " + users.get(i).toString()); +// } + } - /* Get federated users from federation services */ - JSONArray users = userService.getUsersFromFederatedServers(botId, page); + log.info("ReactiveConsumer:getFederatedUsersMeta::Count: " + (users == null ? "user not found" : users.length()) + " >> Set count: " + federatedUsers.size()); /* Check if users, & related meta data exists in transformer */ - if(users != null && transformer.get("meta") != null + if (users != null && transformer.get("meta") != null && transformer.get("meta").get("templateType") != null && transformer.get("meta").get("body") != null) { ObjectNode transformerMeta = (ObjectNode) transformer.get("meta"); @@ -329,67 +433,77 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, Str node.put("body", transformerMeta.get("body").asText()); node.put("type", transformerMeta.get("templateType").asText()); - ArrayNode sampleData = mapper.createArrayNode(); - for (int i = 0; i < users.length(); i++) { - ObjectNode userData = mapper.createObjectNode(); - if(transformerMeta.get("params") != null && !transformerMeta.get("params").toString().isEmpty()){ + ArrayNode sampleData = mapper.createArrayNode(); + for (int i = 0; i < users.length(); i++) { + ObjectNode userData = mapper.createObjectNode(); + if (transformerMeta.get("params") != null && !transformerMeta.get("params").toString().isEmpty()) { JSONArray paramArr = new JSONArray(transformerMeta.get("params").toString()); - for(int k=0; k usersMessage = userService.getUsersMessageByTemplate(node); - /* Fetch user messages by template from template service */ - ArrayList usersMessage = userService.getUsersMessageByTemplate(node); +// for (int i = 0; i < usersMessage.size(); i++) { +// JSONObject jsonObject = usersMessage.get(i); +// if(jsonObject != null && !jsonObject.isNull("")){ +// +// } +// } - log.info("usersMessage: "+usersMessage); + log.info("ReactiveConsumer:getUsersMessageByTemplate::Count: " + usersMessage.size()); - /* Set User messages against the user phone */ - ObjectNode federatedUsersMeta = mapper.createObjectNode(); - ArrayNode userMetaData = mapper.createArrayNode(); + /* Set User messages against the user phone */ + ObjectNode federatedUsersMeta = mapper.createObjectNode(); + ArrayNode userMetaData = mapper.createArrayNode(); usersMessage.forEach(userMsg -> { - int j = Integer.parseInt(userMsg.get("__index").toString()); + int j = Integer.parseInt(userMsg.get("__index").toString()); JSONObject userObj = ((JSONObject) users.get(j)); - String userPhone = userObj.getString("phoneNo"); + String userPhone = userObj.getString("phoneNo"); - ObjectNode map = mapper.createObjectNode(); - map.put("phone", userPhone); - map.put("message", userMsg.get("body").toString()); - try{ + ObjectNode map = mapper.createObjectNode(); + map.put("phone", userPhone); + map.put("message", userMsg.get("body").toString()); + try { /* FCM Token */ - if(userObj.get("fcmToken") != null) { + if (userObj.get("fcmToken") != null) { map.put("fcmToken", userObj.getString("fcmToken")); } /* FCM - If clickActionUrl found in userObj, use it, override previous one */ - if(userObj.get("fcmClickActionUrl") != null) { + if (userObj.get("fcmClickActionUrl") != null) { map.put("fcmClickActionUrl", userObj.getString("fcmClickActionUrl")); } - if(transformerMeta.get("data") != null){ + if (transformerMeta.get("data") != null) { map.put("data", transformerMeta.get("data")); } } catch (Exception e) { - // + log.error("ErrorParsingUserObj:getFederatedUsersMeta::Exception: " + e.getMessage()); } userMetaData.add(map); - log.info("index: "+j+", body: "+userMsg.get("body").toString()+", phone:"+userPhone); - }); - + log.info("index: " + j + ", body: " + userMsg.get("body").toString() + ", phone:" + userPhone); + }); + federatedUsersMeta.put("list", userMetaData); return federatedUsersMeta.toString(); + } else { + log.error("ReactiveConsumer:getFederatedUsersMetaElse::Users not found"); } return ""; } /** * Resolve User - Fetch user if exists or register it in Fusion Auth Client + * * @param xmsg * @param appId * @return @@ -399,17 +513,17 @@ private Mono resolveUser(XMessage xmsg, String appId) { SenderReceiverInfo from = xmsg.getFrom(); String appName = xmsg.getApp(); Boolean found = false; - + UUID appID = UUID.fromString(appId); - + String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); String encodedBase64Key = encodeKey(secret); String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); - log.info("deviceString: "+deviceString+", encyprted deviceString: "+deviceID); + log.info("ReactiveConsumer:resolveUser::Calling:deviceString: " + deviceString + ", encyprted deviceString: " + deviceID); String userID = getFAUserIdForApp(deviceID, appID); - + if (userID != null && !userID.isEmpty()) { - log.info("Found FA user id"); + log.info("ReactiveConsumer:resolveUser::Found FA user id: " + userID); from.setDeviceID(userID); from.setEncryptedDeviceID(deviceID); xmsg.setFrom(from); @@ -419,14 +533,13 @@ private Mono resolveUser(XMessage xmsg, String appId) { .flatMap(new Function, Mono>() { @Override public Mono apply(Pair result) { - log.info("FA update user"); if (result.getLeft()) { from.setDeviceID(result.getRight()); from.setEncryptedDeviceID(deviceID); xmsg.setFrom(from); ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); if (response.wasSuccessful() && isUserRegistered(response, appID)) { - redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), response.successResponse.user.id.toString()); + redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), response.successResponse.user.id.toString()); return Mono.just(xmsg); } else { return Mono.just(xmsg); @@ -439,86 +552,90 @@ public Mono apply(Pair result) { }).doOnError(new Consumer() { @Override public void accept(Throwable throwable) { - log.error("Error in updateUser" + throwable.getMessage()); + log.error("ReactiveConsumer:resolveUser::Calling update user: " + throwable.getMessage()); } }); } } catch (Exception e) { e.printStackTrace(); - log.error("Error in resolveUser" + e.getMessage()); + log.error("ReactiveConsumer:resolveUser::Error in resolving user: " + e.getMessage()); xmsg.setFrom(null); return Mono.just(xmsg); } } - + /** * Get Fusion Auth User's UUID for App + * * @param deviceID * @param appID * @return */ private String getFAUserIdForApp(String deviceID, UUID appID) { - String userID = null; + String userID = null; + + Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); + userID = result != null ? result.toString() : null; - Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID)); - userID = result != null ? result.toString() : null; + if (userID == null || userID.isEmpty()) { + ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); - if(userID == null || userID.isEmpty()) { - ClientResponse response = botService.fusionAuthClient.retrieveUserByUsername(deviceID); - if (response.wasSuccessful() && isUserRegistered(response, appID)) { - userID = response.successResponse.user.id.toString(); - redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), userID); + userID = response.successResponse.user.id.toString(); + redisCacheService.setFAUserIDForAppCache(getFACacheName(deviceID, appID), userID); } - } + } return userID; } - + /** * Check if FA user is registered for appid + * * @param response * @param appID * @return */ private Boolean isUserRegistered(ClientResponse response, UUID appID) { - List registrations = response.successResponse.user.getRegistrations(); - for(int i=0; i registrations = response.successResponse.user.getRegistrations(); + for (int i = 0; i < registrations.size(); i++) { + if (registrations.get(i).applicationId.equals(appID)) { + return true; + } + } + return false; } - + private String getFACacheName(String deviceID, UUID appID) { - return deviceID+"-"+appID.toString(); + return deviceID + "-" + appID.toString(); } /** * Update fusion auth user data + * * @param user */ private void updateFAUser(User user) { System.out.println(user); UserRequest r = new UserRequest(user); - + ClientResponse response = botService.fusionAuthClient.updateUser(user.id, r); - if(response.wasSuccessful()) { + if (response.wasSuccessful()) { System.out.println("user update success"); } else { - System.out.println("error in user update"+response.errorResponse); + System.out.println("error in user update" + response.errorResponse); } } /** * Log time taken between two checkpoints + * * @param startTime * @param checkpointID */ private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) { long endTime = System.nanoTime(); long duration = (endTime - startTime) / 1000000; - if(formatedMsg == null) { + if (formatedMsg == null) { log.info(String.format("CP-%d: %d ms", checkpointID, duration)); } else { log.info(String.format(formatedMsg, duration)); @@ -527,6 +644,7 @@ private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) /** * Get Last XMessage ID of user + * * @param msg * @return */ @@ -551,13 +669,14 @@ public String apply(XMessageDAO lastMessage) { } }); } else { - log.error("UserId not found : "+msg.toString()); + log.error("UserId not found : " + msg.toString()); } return Mono.empty(); } /** * Get Latest XMessage of a user + * * @param userID * @param yesterday * @param messageState @@ -598,6 +717,7 @@ public void accept(Throwable throwable) { /** * Switch from & To in XMessage + * * @param xMessage */ private void switchFromTo(XMessage xMessage) { @@ -609,6 +729,7 @@ private void switchFromTo(XMessage xMessage) { /** * Convert Federated users into chunks + * * @param users * @param chunkSize * @return @@ -624,8 +745,8 @@ private List chunkArrayList(JSONArray users, int chunkSize) { chunksList.get(chunksList.size() - 1).put(user); } return chunksList; - } else{ - log.error("Federated Users null found : "+users); + } else { + log.error("Federated Users null found : " + users); return null; } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bf5557f..1e4fd12 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -13,6 +13,7 @@ broadcast-transformer=${KAFKA_BROADCAST_TRANSFORMER_TOPIC:#{"broadcast-transform generic-transformer=${KAFKA_GENERIC_TRANSFORMER_TOPIC:#{"generic-transformer"}} processOutbound=${KAFKA_PROCESS_OUTBOUND} outbound=${KAFKA_OUTBOUND_TOPIC} +notificationOutbound=${KAFKA_NOTIFICATION_TOPIC} # Cassandra #spring.data.cassandra.contactpoints=${CASSANDRA_URL} @@ -38,6 +39,10 @@ spring.data.cassandra.pool.idle-timeout=20s spring.data.cassandra.pool.pool-timeout=20s logging.level.com.datastax.driver.core.QueryLogger.NORMAL=TRACE logging.level.com.datastax.driver.core.QueryLogger.SLOW=TRACE +spring.data.cassandra.advanced.control-connection.timeout=10s +spring.data.cassandra.advanced.metadata.schema.request-timeout=30s +spring.data.cassandra.basic.request.timeout=30s + server.port=8686 campaign.url = ${CAMPAIGN_URL} @@ -71,6 +76,8 @@ spring.redis.number.port=${REDIS_NUMBER_PORT:#{6379}} spring.redis.sentinel.master= # Name of Redis server. spring.redis.sentinel.nodes= # Comma-separated list of host:port pairs. spring.redis.timeout=0 +# This time in seconds and we are default set 60*60 = 3600 -> 1 Hour +redis.key.timeout=${REDIS_KEY_TIMEOUT:#{3600}} #Env spring.profile.env=${ENV} @@ -101,3 +108,11 @@ spring.mail.recipient=${RECIPIENT_EMAILS:#{""}} # Send notifications in chunks broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}} + +# Added Redis Cache for Kafka Duplication in Firebase(for NL) +notification-kafka-cache=${NOTIFICATION_KAFKA_CACHE:#{""}} + +# BotService WebClient Configurations +webclient.interval=${WEBCLIENT_INTERVAL:#{5000}} +webclient.retryMaxAttempts=${WEBCLIENT_RETRY_MAX_ATTEMPTS:#{3}} +webclient.retryMinBackoff=${WEBCLIENT_RETRY_MIN_BACK_OFF:#{5}} diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 7fdc8dd..9fa196c 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -7,7 +7,7 @@ class="ch.qos.logback.core.ConsoleAppender"> - %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}:%line): %msg%n%throwable + %green(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}:%line): %msg%n%throwable