diff --git a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java index 7403556..0214dea 100644 --- a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java +++ b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java @@ -21,6 +21,12 @@ import com.uci.transformer.telemetry.AssessmentTelemetryBuilder; import com.uci.utils.CampaignService; import com.uci.utils.kafka.SimpleProducer; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.SenderReceiverInfo; @@ -115,6 +121,9 @@ public class ODKConsumerReactive extends TransformerProvider { @Value("${assesment.character.go_to_start}") public String assesGoToStartChar; + @Autowired + public Tracer tracer; + @EventListener(ApplicationStartedEvent.class) public void onMessage() { reactiveKafkaReceiver @@ -122,7 +131,9 @@ public void onMessage() { @Override public void accept(ReceiverRecord stringMessage) { final long startTime = System.nanoTime(); - try { + Span rootSpan = tracer.spanBuilder("transformer-processMessage").startSpan(); + try(Scope scope = rootSpan.makeCurrent()) { + Context currentContext = Context.current(); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); logTimeTaken(startTime, 1); if (msg.getMessageType() == XMessage.MessageType.BROADCAST_TEXT) { @@ -132,25 +143,30 @@ public void accept(List messages) { messages = (ArrayList) messages; for (XMessage msg : messages) { try { + Span childSpan = createChildSpan("broadcastMessageToKafka", currentContext, rootSpan); kafkaProducer.send(outboundTopic, msg.toXML()); - } catch (JAXBException e) { + childSpan.end(); + rootSpan.end(); } catch (JAXBException e) { e.printStackTrace(); } } } }); } else { - transform(msg) + transform(msg) .subscribe(new Consumer() { @Override public void accept(XMessage transformedMessage) { logTimeTaken(startTime, 2); if (transformedMessage != null) { try { - kafkaProducer.send(outboundTopic, transformedMessage.toXML()); + Span childSpan = createChildSpan("sendMessageToKafka", currentContext, rootSpan); + kafkaProducer.send(outboundTopic, transformedMessage.toXML()); long endTime = System.nanoTime(); long duration = (endTime - startTime); log.error("Total time spent in processing form: " + duration / 1000000); + childSpan.end(); + rootSpan.end(); } catch (JAXBException e) { e.printStackTrace(); } @@ -160,8 +176,10 @@ public void accept(XMessage transformedMessage) { } } catch (JAXBException e) { e.printStackTrace(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); + } finally { +// rootSpan.end(); } } }) @@ -246,12 +264,17 @@ private Map getCampaignAndFormIdFromXMessage(XMessage xMessage) @Override public Mono transform(XMessage xMessage) throws Exception { XMessage[] finalXMsg = new XMessage[1]; + Span parentSpan = Span.current(); + Context currentContext = Context.current(); + Span childSpan1 = createChildSpan("getCampaignFromNameTransformer", currentContext, parentSpan); return campaignService .getCampaignFromNameTransformer(xMessage.getApp()) .map(new Function>>>() { @Override public Mono>> apply(JsonNode campaign) { + childSpan1.end(); if (campaign != null) { + Span ChildSpan2 = createChildSpan("getPreviousMetadata", currentContext, parentSpan); // Map data = getCampaignAndFormIdFromXMessage(xMessage); // // String formID = data.get("formID"); @@ -280,9 +303,11 @@ public Mono>> apply(JsonNode campaign) { .map(new Function>>() { @Override public Mono> apply(FormManagerParams previousMeta) { - final ServiceResponse[] response = new ServiceResponse[1]; + ChildSpan2.end(); + final ServiceResponse[] response = new ServiceResponse[1]; MenuManager mm; if (previousMeta.instanceXMlPrevious == null || previousMeta.currentAnswer.equals(assesGoToStartChar) || isStartingMessage) { + Span childSpan3 = createChildSpan("MenuManagerStartProcessForStartingMessage", currentContext, parentSpan); // if (!lastFormID.equals(formID) || previousMeta.instanceXMlPrevious == null || previousMeta.currentAnswer.equals(assesGoToStartChar) || isStartingMessage) { previousMeta.currentAnswer = assesGoToStartChar; ServiceResponse serviceResponse = new MenuManager(null, null, null, formPath, formID, false, questionRepo).start(); @@ -293,14 +318,19 @@ public Mono> apply(FormManagerParams previousMeta) { // ss.getXML(); String instanceXMlPrevious = ss.getXML(); log.debug("Instance value >> " + instanceXMlPrevious); + mm = new MenuManager(null, null, instanceXMlPrevious, formPath, formID, true, questionRepo); response[0] = mm.start(); + childSpan3.end(); } else { + Span childSpan3 = createChildSpan("MenuManagerStartProcess", currentContext, parentSpan); mm = new MenuManager(previousMeta.previousPath, previousMeta.currentAnswer, previousMeta.instanceXMlPrevious, formPath, formID, false, questionRepo); response[0] = mm.start(); + childSpan3.end(); } + Span childSpan4 = createChildSpan("updateQuestionAndAssessment", currentContext, parentSpan); // Save answerData => PreviousQuestion + CurrentAnswer Mono>> updateQuestionAndAssessment = updateQuestionAndAssessment( @@ -314,10 +344,11 @@ public Mono> apply(FormManagerParams previousMeta) { xMessage, response[0].question ); - + childSpan4.end(); if (mm.isGlobal() && response[0].currentIndex.contains("eof__")) { - String nextBotID = mm.getNextBotID(response[0].currentIndex); + Span childSpan5 = createChildSpan("getBotNameByBotID&getFirstFormByBotID", currentContext, parentSpan); + String nextBotID = mm.getNextBotID(response[0].currentIndex); return Mono.zip( campaignService.getBotNameByBotID(nextBotID), @@ -325,29 +356,35 @@ public Mono> apply(FormManagerParams previousMeta) { ).map(new Function, Mono>() { @Override public Mono apply(Tuple2 objects) { + childSpan5.end(); String nextFormID = objects.getT2(); String nextAppName = objects.getT1(); - ServiceResponse serviceResponse = new MenuManager( + Span childSpan6 = createChildSpan("MenuManagerStartProcessForNextForm", currentContext, parentSpan); + ServiceResponse serviceResponse = new MenuManager( null, null, null, getFormPath(nextFormID), nextFormID, false, questionRepo) .start(); + childSpan6.end(); FormUpdation ss = FormUpdation.builder().build(); ss.parse(serviceResponse.currentResponseState); ss.updateAdapterProperties(xMessage.getChannel(), xMessage.getProvider()); String instanceXMlPrevious = "" + ss.getXML(); log.debug("Instance value >> " + instanceXMlPrevious); + Span childSpan7 = createChildSpan("MenuManagerStartProcessForXMLNextForm", currentContext, parentSpan); MenuManager mm2 = new MenuManager(null, null, instanceXMlPrevious, getFormPath(nextFormID), nextFormID, true, questionRepo); ServiceResponse response = mm2.start(); + childSpan7.end(); xMessage.setApp(nextAppName); return decodeXMessage(xMessage, response, nextFormID, updateQuestionAndAssessment); } }); } else { + Span childSpan7 = createChildSpan("MenuManagerStartProcessForXMLNextForm", currentContext, parentSpan); return Mono.just(decodeXMessage(xMessage, response[0], formID, updateQuestionAndAssessment)); } } @@ -716,4 +753,48 @@ private void logTimeTaken(long startTime, int checkpointID) { long duration = (endTime - startTime) / 1000000; log.info(String.format("CP-%d: %d ms", checkpointID, duration)); } + + /** + * Create Child Span with current context & parent span + * @param spanName + * @param context + * @param parentSpan + * @return childSpan + */ + private Span createChildSpan(String spanName, Context context, Span parentSpan) { + String prefix = "transformer-"; + return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan(); + } + + /** + * Log Exceptions & if span exists, add error to span + * @param eMsg + * @param span + */ + private void genericException(String eMsg, Span span) { + eMsg = "Exception: " + eMsg; + log.error(eMsg); + if(span != null) { + span.setStatus(StatusCode.ERROR, "Exception: " + eMsg); + span.end(); + } + } + + /** + * Log Exception & if span exists, add error to span + * @param s + * @param span + * @return + */ + private Consumer genericError(String s, Span span) { + return c -> { + String msg = "Error in " + s + "::" + c.getMessage(); + log.error(msg); + if (span != null) { + log.info("generic message - span"); + span.setStatus(StatusCode.ERROR, msg); + span.end(); + } + }; + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ca1832c..6464508 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -80,3 +80,13 @@ spring.data.cassandra.port=${CASSANDRA_PORT} spring.data.cassandra.keyspace-name=${CASSANDRA_KEYSPACE} spring.data.cassandra.local-datacenter=datacenter1 +#Opentelemetry Lighstep Config +opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} +opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} +opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} +opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} + +#for log4j2 vulnerability +log4j2.formatMsgNoLookups=true +