diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index addf485..2af07f2 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -100,12 +100,17 @@ public class ReactiveConsumer { private long notificationProcessedCount; + private long consumeCount; + private long pushCount; + @KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { final long startTime = System.nanoTime(); logTimeTaken(startTime, 0, null); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); + consumeCount++; + log.info("Consume topic count : " + consumeCount); SenderReceiverInfo from = msg.getFrom(); logTimeTaken(startTime, 1, null); botService.getBotNodeFromName(msg.getApp()).doOnNext(new Consumer() { @@ -165,9 +170,13 @@ public void accept(XMessage msg) { } else{ kafkaProducer.send(broadcastTransformerTopic, msg.toXML()); } + pushCount++; notificationProcessedCount++; - logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: process-end: %d ms"); + logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: Push count : " + + pushCount + " :: process-end: %d ms"); + } else { + log.info("Calling ODK : " + msg.toString()); getLastMessageID(msg) .doOnNext(lastMessageID -> { logTimeTaken(startTime, 4, null); @@ -220,7 +229,7 @@ public void accept(Throwable throwable) { }).subscribe(); } catch (Exception e) { - e.printStackTrace(); + log.error("An Error ReactiveConsumer : " + e.getMessage()); } }