diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index 34c9922..fbb95e3 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -56,15 +56,18 @@ public class OutboundKafkaController { @EventListener(ApplicationStartedEvent.class) public void onMessage() { + reactiveKafkaReceiver .doOnNext(new Consumer<ReceiverRecord<String, String>>() { @Override public void accept(ReceiverRecord<String, String> msg) { log.info("kafka message receieved!"); + final long startTime = System.nanoTime(); + logTimeTaken(startTime, 0, "process-start: %d ms"); XMessage currentXmsg = null; try { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); - sendOutboundMessage(currentXmsg); + sendOutboundMessage(currentXmsg, startTime); } catch (Exception e) { HashMap<String, String> attachments = new HashMap<>(); attachments.put("Exception", ExceptionUtils.getStackTrace(e)); @@ -92,7 +95,7 @@ public void accept(Throwable e) { * @param currentXmsg * @throws Exception */ - public void sendOutboundMessage(XMessage currentXmsg) throws Exception { + public void sendOutboundMessage(XMessage currentXmsg, long startTime) throws Exception { String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); IProvider iprovider = factoryProvider.getProvider(provider, channel); @@ -128,7 +131,8 @@ public void accept(Throwable e) { public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); count++; - log.info("Insert Record in Cass : "+count); +// log.info("Insert Record in Cass : "+count); + logTimeTaken(startTime, 0, "Insert Record in Cass : " + count +" ::: process-end: %d ms"); } }); } catch (Exception e) { @@ -170,4 +174,14 @@ private void sentEmail(XMessage xMessage, String subject, String body, String re // log.info("EmailDetails :" + emailDetails); emailService.sendMailWithAttachment(emailDetails); } + + private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) { + long endTime = System.nanoTime(); + long duration = (endTime - startTime) / 1000000; + if(formatedMsg == null) { + log.info(String.format("CP-%d: %d ms", checkpointID, duration)); + } else { + log.info(String.format(formatedMsg, duration)); + } + } }