diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index e99929f..11350ea 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -98,6 +98,8 @@ public class ReactiveConsumer { @Value("${broadcastNotificationChunkSize}") private String broadcastNotificationChunkSize; + private long notificationProcessedCount; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { @@ -129,69 +131,77 @@ public void accept(JsonNode botNode) { public void accept(XMessage msg) { SenderReceiverInfo from = msg.getFrom(); // msg.setFrom(from); - getLastMessageID(msg) - .doOnNext(lastMessageID -> { - logTimeTaken(startTime, 4, null); - msg.setLastMessageID(lastMessageID); - - /* Switch From & To */ - switchFromTo(msg); - - if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { - try { - log.info("final msg.toXML(): " + msg.toXML().toString()); - - if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { - Integer chunkSize = null; + try { + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) { + Integer chunkSize = null; + try { + chunkSize = Integer.parseInt(broadcastNotificationChunkSize); + } catch(NumberFormatException ex){ + chunkSize = null; + } + if(chunkSize != null) { + if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null + && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { + JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); + int totalFederatedUsers = federatedUsers.length(); + if (totalFederatedUsers <= chunkSize) { + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } else { + List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); + int count = 1; + for (JSONArray jsonArray : jsonArrayList) { + log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); + msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + count++; + } + } + } else { + log.error("federatedUsers not found : " + msg.toString()); + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + } else{ + kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + } + notificationProcessedCount++; + log.info("Notification processed : " + notificationProcessedCount); + } else { + getLastMessageID(msg) + .doOnNext(lastMessageID -> { + logTimeTaken(startTime, 4, null); + msg.setLastMessageID(lastMessageID); + + /* Switch From & To */ + switchFromTo(msg); + + if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { - chunkSize = Integer.parseInt(broadcastNotificationChunkSize); - } catch(NumberFormatException ex){ - chunkSize = null; - } - if(chunkSize != null) { - if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null - && msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) { - JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list"); - int totalFederatedUsers = federatedUsers.length(); - if (totalFederatedUsers <= chunkSize) { - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - } else { - List jsonArrayList = chunkArrayList(federatedUsers, chunkSize); - int count = 1; - for (JSONArray jsonArray : jsonArrayList) { - log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count); - msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); - count++; - } - } + log.info("final msg.toXML(): " + msg.toXML().toString()); + if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { + kafkaProducer.send(genericTransformerTopic, msg.toXML()); } else { - log.error("federatedUsers not found : " + msg.toString()); - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + kafkaProducer.send(odkTransformerTopic, msg.toXML()); } - } else{ - kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); + // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); + } catch (JAXBException e) { + e.printStackTrace(); } - - } else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) { - kafkaProducer.send(genericTransformerTopic, msg.toXML()); - } else { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); + logTimeTaken(startTime, 0, "process-end: %d ms"); } - // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); - } catch (JAXBException e) { - e.printStackTrace(); - } - logTimeTaken(startTime, 0, "process-end: %d ms"); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getLastMessageID" + throwable.getMessage()); - } - }) - .subscribe(); + }) + .doOnError(new Consumer() { + @Override + public void accept(Throwable throwable) { + log.error("Error in getLastMessageID" + throwable.getMessage()); + } + }) + .subscribe(); + } + } catch (JAXBException ex) { + log.error("Error while converting toXML() : " + ex.getMessage()); + } catch (Exception ex){ + log.error("An Error Occurred : " + ex.getMessage()); + } } }) .doOnError(new Consumer() {