Skip to content

Commit

Permalink
Merge pull request #54 from pankajjangid05/development
Browse files Browse the repository at this point in the history
Added logs for kafka topic consume and push count
  • Loading branch information
pankajjangid05 authored May 25, 2023
2 parents 006b580 + 493a503 commit 5328f9d
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,17 @@ public class ReactiveConsumer {

private long notificationProcessedCount;

private long consumeCount;
private long pushCount;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
public void onMessage(@Payload String stringMessage) {
try {
final long startTime = System.nanoTime();
logTimeTaken(startTime, 0, null);
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));
consumeCount++;
log.info("Consume topic count : " + consumeCount);
SenderReceiverInfo from = msg.getFrom();
logTimeTaken(startTime, 1, null);
botService.getBotNodeFromName(msg.getApp()).doOnNext(new Consumer<JsonNode>() {
Expand Down Expand Up @@ -165,9 +170,13 @@ public void accept(XMessage msg) {
} else{
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}
pushCount++;
notificationProcessedCount++;
logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: process-end: %d ms");
logTimeTaken(startTime, 0, "Notification processed : " + notificationProcessedCount + " :: Push count : "
+ pushCount + " :: process-end: %d ms");

} else {
log.info("Calling ODK : " + msg.toString());
getLastMessageID(msg)
.doOnNext(lastMessageID -> {
logTimeTaken(startTime, 4, null);
Expand Down Expand Up @@ -220,7 +229,7 @@ public void accept(Throwable throwable) {

}).subscribe();
} catch (Exception e) {
e.printStackTrace();
log.error("An Error ReactiveConsumer : " + e.getMessage());
}
}

Expand Down

0 comments on commit 5328f9d

Please sign in to comment.