Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log4j2 vulnaribility changes #27

Open
wants to merge 2 commits into
base: release-4.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import com.uci.transformer.telemetry.AssessmentTelemetryBuilder;
import com.uci.utils.CampaignService;
import com.uci.utils.kafka.SimpleProducer;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import messagerosa.core.model.SenderReceiverInfo;
Expand Down Expand Up @@ -115,14 +121,19 @@ public class ODKConsumerReactive extends TransformerProvider {
@Value("${assesment.character.go_to_start}")
public String assesGoToStartChar;

@Autowired
public Tracer tracer;

@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
final long startTime = System.nanoTime();
try {
Span rootSpan = tracer.spanBuilder("transformer-processMessage").startSpan();
try(Scope scope = rootSpan.makeCurrent()) {
Context currentContext = Context.current();
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
logTimeTaken(startTime, 1);
if (msg.getMessageType() == XMessage.MessageType.BROADCAST_TEXT) {
Expand All @@ -132,25 +143,30 @@ public void accept(List<XMessage> messages) {
messages = (ArrayList<XMessage>) messages;
for (XMessage msg : messages) {
try {
Span childSpan = createChildSpan("broadcastMessageToKafka", currentContext, rootSpan);
kafkaProducer.send(outboundTopic, msg.toXML());
} catch (JAXBException e) {
childSpan.end();
rootSpan.end(); } catch (JAXBException e) {
e.printStackTrace();
}
}
}
});
} else {
transform(msg)
transform(msg)
.subscribe(new Consumer<XMessage>() {
@Override
public void accept(XMessage transformedMessage) {
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
kafkaProducer.send(outboundTopic, transformedMessage.toXML());
Span childSpan = createChildSpan("sendMessageToKafka", currentContext, rootSpan);
kafkaProducer.send(outboundTopic, transformedMessage.toXML());
long endTime = System.nanoTime();
long duration = (endTime - startTime);
log.error("Total time spent in processing form: " + duration / 1000000);
childSpan.end();
rootSpan.end();
} catch (JAXBException e) {
e.printStackTrace();
}
Expand All @@ -160,8 +176,10 @@ public void accept(XMessage transformedMessage) {
}
} catch (JAXBException e) {
e.printStackTrace();
} catch (Exception e) {
} catch (Throwable e) {
e.printStackTrace();
} finally {
// rootSpan.end();
}
}
})
Expand Down Expand Up @@ -246,12 +264,17 @@ private Map<String, String> getCampaignAndFormIdFromXMessage(XMessage xMessage)
@Override
public Mono<XMessage> transform(XMessage xMessage) throws Exception {
XMessage[] finalXMsg = new XMessage[1];
Span parentSpan = Span.current();
Context currentContext = Context.current();
Span childSpan1 = createChildSpan("getCampaignFromNameTransformer", currentContext, parentSpan);
return campaignService
.getCampaignFromNameTransformer(xMessage.getApp())
.map(new Function<JsonNode, Mono<Mono<Mono<XMessage>>>>() {
@Override
public Mono<Mono<Mono<XMessage>>> apply(JsonNode campaign) {
childSpan1.end();
if (campaign != null) {
Span ChildSpan2 = createChildSpan("getPreviousMetadata", currentContext, parentSpan);
// Map<String, String> data = getCampaignAndFormIdFromXMessage(xMessage);
//
// String formID = data.get("formID");
Expand Down Expand Up @@ -280,9 +303,11 @@ public Mono<Mono<Mono<XMessage>>> apply(JsonNode campaign) {
.map(new Function<FormManagerParams, Mono<Mono<XMessage>>>() {
@Override
public Mono<Mono<XMessage>> apply(FormManagerParams previousMeta) {
final ServiceResponse[] response = new ServiceResponse[1];
ChildSpan2.end();
final ServiceResponse[] response = new ServiceResponse[1];
MenuManager mm;
if (previousMeta.instanceXMlPrevious == null || previousMeta.currentAnswer.equals(assesGoToStartChar) || isStartingMessage) {
Span childSpan3 = createChildSpan("MenuManagerStartProcessForStartingMessage", currentContext, parentSpan);
// if (!lastFormID.equals(formID) || previousMeta.instanceXMlPrevious == null || previousMeta.currentAnswer.equals(assesGoToStartChar) || isStartingMessage) {
previousMeta.currentAnswer = assesGoToStartChar;
ServiceResponse serviceResponse = new MenuManager(null, null, null, formPath, formID, false, questionRepo).start();
Expand All @@ -293,14 +318,19 @@ public Mono<Mono<XMessage>> apply(FormManagerParams previousMeta) {
// ss.getXML();
String instanceXMlPrevious = ss.getXML();
log.debug("Instance value >> " + instanceXMlPrevious);

mm = new MenuManager(null, null, instanceXMlPrevious, formPath, formID, true, questionRepo);
response[0] = mm.start();
childSpan3.end();
} else {
Span childSpan3 = createChildSpan("MenuManagerStartProcess", currentContext, parentSpan);
mm = new MenuManager(previousMeta.previousPath, previousMeta.currentAnswer,
previousMeta.instanceXMlPrevious, formPath, formID, false, questionRepo);
response[0] = mm.start();
childSpan3.end();
}

Span childSpan4 = createChildSpan("updateQuestionAndAssessment", currentContext, parentSpan);
// Save answerData => PreviousQuestion + CurrentAnswer
Mono<Pair<Boolean, List<Question>>> updateQuestionAndAssessment =
updateQuestionAndAssessment(
Expand All @@ -314,40 +344,47 @@ public Mono<Mono<XMessage>> apply(FormManagerParams previousMeta) {
xMessage,
response[0].question
);

childSpan4.end();

if (mm.isGlobal() && response[0].currentIndex.contains("eof__")) {
String nextBotID = mm.getNextBotID(response[0].currentIndex);
Span childSpan5 = createChildSpan("getBotNameByBotID&getFirstFormByBotID", currentContext, parentSpan);
String nextBotID = mm.getNextBotID(response[0].currentIndex);

return Mono.zip(
campaignService.getBotNameByBotID(nextBotID),
campaignService.getFirstFormByBotID(nextBotID)
).map(new Function<Tuple2<String, String>, Mono<XMessage>>() {
@Override
public Mono<XMessage> apply(Tuple2<String, String> objects) {
childSpan5.end();
String nextFormID = objects.getT2();
String nextAppName = objects.getT1();

ServiceResponse serviceResponse = new MenuManager(
Span childSpan6 = createChildSpan("MenuManagerStartProcessForNextForm", currentContext, parentSpan);
ServiceResponse serviceResponse = new MenuManager(
null, null, null,
getFormPath(nextFormID), nextFormID,
false, questionRepo)
.start();
childSpan6.end();
FormUpdation ss = FormUpdation.builder().build();
ss.parse(serviceResponse.currentResponseState);
ss.updateAdapterProperties(xMessage.getChannel(), xMessage.getProvider());
String instanceXMlPrevious = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
ss.getXML();
log.debug("Instance value >> " + instanceXMlPrevious);
Span childSpan7 = createChildSpan("MenuManagerStartProcessForXMLNextForm", currentContext, parentSpan);
MenuManager mm2 = new MenuManager(null, null,
instanceXMlPrevious, getFormPath(nextFormID), nextFormID, true,
questionRepo);
ServiceResponse response = mm2.start();
childSpan7.end();
xMessage.setApp(nextAppName);
return decodeXMessage(xMessage, response, nextFormID, updateQuestionAndAssessment);
}
});
} else {
Span childSpan7 = createChildSpan("MenuManagerStartProcessForXMLNextForm", currentContext, parentSpan);
return Mono.just(decodeXMessage(xMessage, response[0], formID, updateQuestionAndAssessment));
}
}
Expand Down Expand Up @@ -716,4 +753,48 @@ private void logTimeTaken(long startTime, int checkpointID) {
long duration = (endTime - startTime) / 1000000;
log.info(String.format("CP-%d: %d ms", checkpointID, duration));
}

/**
* Create Child Span with current context & parent span
* @param spanName
* @param context
* @param parentSpan
* @return childSpan
*/
private Span createChildSpan(String spanName, Context context, Span parentSpan) {
String prefix = "transformer-";
return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan();
}

/**
* Log Exceptions & if span exists, add error to span
* @param eMsg
* @param span
*/
private void genericException(String eMsg, Span span) {
eMsg = "Exception: " + eMsg;
log.error(eMsg);
if(span != null) {
span.setStatus(StatusCode.ERROR, "Exception: " + eMsg);
span.end();
}
}

/**
* Log Exception & if span exists, add error to span
* @param s
* @param span
* @return
*/
private Consumer<Throwable> genericError(String s, Span span) {
return c -> {
String msg = "Error in " + s + "::" + c.getMessage();
log.error(msg);
if (span != null) {
log.info("generic message - span");
span.setStatus(StatusCode.ERROR, msg);
span.end();
}
};
}
}
10 changes: 10 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ spring.data.cassandra.port=${CASSANDRA_PORT}
spring.data.cassandra.keyspace-name=${CASSANDRA_KEYSPACE}
spring.data.cassandra.local-datacenter=datacenter1

#Opentelemetry Lighstep Config
opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER}
opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION}
opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE}
opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN}
opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT}

#for log4j2 vulnerability
log4j2.formatMsgNoLookups=true