Skip to content

Commit

Permalink
Merge pull request #36 from pankajjangid05/development
Browse files Browse the repository at this point in the history
Add logs to check process complete time
  • Loading branch information
pankajjangid05 authored May 18, 2023
2 parents 154deed + c73426e commit 9a439ed
Showing 1 changed file with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ public class OutboundKafkaController {

@EventListener(ApplicationStartedEvent.class)
public void onMessage() {

reactiveKafkaReceiver
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> msg) {
log.info("kafka message receieved!");
final long startTime = System.nanoTime();
logTimeTaken(startTime, 0, "process-start: %d ms");
XMessage currentXmsg = null;
try {
currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes()));
sendOutboundMessage(currentXmsg);
sendOutboundMessage(currentXmsg, startTime);
} catch (Exception e) {
HashMap<String, String> attachments = new HashMap<>();
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
Expand Down Expand Up @@ -92,7 +95,7 @@ public void accept(Throwable e) {
* @param currentXmsg
* @throws Exception
*/
public void sendOutboundMessage(XMessage currentXmsg) throws Exception {
public void sendOutboundMessage(XMessage currentXmsg, long startTime) throws Exception {
String channel = currentXmsg.getChannelURI();
String provider = currentXmsg.getProviderURI();
IProvider iprovider = factoryProvider.getProvider(provider, channel);
Expand Down Expand Up @@ -128,7 +131,8 @@ public void accept(Throwable e) {
public void accept(XMessageDAO xMessageDAO) {
log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId());
count++;
log.info("Insert Record in Cass : "+count);
// log.info("Insert Record in Cass : "+count);
logTimeTaken(startTime, 0, "Insert Record in Cass : " + count +" ::: process-end: %d ms");
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -170,4 +174,14 @@ private void sentEmail(XMessage xMessage, String subject, String body, String re
// log.info("EmailDetails :" + emailDetails);
emailService.sendMailWithAttachment(emailDetails);
}

private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) {
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1000000;
if(formatedMsg == null) {
log.info(String.format("CP-%d: %d ms", checkpointID, duration));
} else {
log.info(String.format(formatedMsg, duration));
}
}
}

0 comments on commit 9a439ed

Please sign in to comment.