Skip to content

Commit

Permalink
Merge branch 'development' of https://github.com/samagra-comms/outbound
Browse files Browse the repository at this point in the history
… into prod-development
  • Loading branch information
pankajjangid05 committed May 18, 2023
2 parents a05bcba + 9a439ed commit 9787884
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ public class OutboundKafkaController {
@Value("${spring.mail.recipient}")
private String recipient;

private static long count = 0;

@EventListener(ApplicationStartedEvent.class)
public void onMessage() {

reactiveKafkaReceiver
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> msg) {
log.info("kafka message receieved!");
final long startTime = System.nanoTime();
logTimeTaken(startTime, 0, "process-start: %d ms");
XMessage currentXmsg = null;
try {
currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes()));
sendOutboundMessage(currentXmsg);
sendOutboundMessage(currentXmsg, startTime);
} catch (Exception e) {
HashMap<String, String> attachments = new HashMap<>();
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
Expand Down Expand Up @@ -90,7 +95,7 @@ public void accept(Throwable e) {
* @param currentXmsg
* @throws Exception
*/
public void sendOutboundMessage(XMessage currentXmsg) throws Exception {
public void sendOutboundMessage(XMessage currentXmsg, long startTime) throws Exception {
String channel = currentXmsg.getChannelURI();
String provider = currentXmsg.getProviderURI();
IProvider iprovider = factoryProvider.getProvider(provider, channel);
Expand Down Expand Up @@ -125,6 +130,9 @@ public void accept(Throwable e) {
@Override
public void accept(XMessageDAO xMessageDAO) {
log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId());
count++;
// log.info("Insert Record in Cass : "+count);
logTimeTaken(startTime, 0, "Insert Record in Cass : " + count +" ::: process-end: %d ms");
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -166,4 +174,14 @@ private void sentEmail(XMessage xMessage, String subject, String body, String re
// log.info("EmailDetails :" + emailDetails);
emailService.sendMailWithAttachment(emailDetails);
}

private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) {
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1000000;
if(formatedMsg == null) {
log.info(String.format("CP-%d: %d ms", checkpointID, duration));
} else {
log.info(String.format(formatedMsg, duration));
}
}
}

0 comments on commit 9787884

Please sign in to comment.