diff --git a/pom.xml b/pom.xml
index bd1e5d5..81aa3c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,10 +28,6 @@
org.springframework.boot
spring-boot-starter-validation
-
- org.springframework.boot
- spring-boot-devtools
-
org.springframework.boot
spring-boot-starter-web-services
@@ -129,6 +125,34 @@
org.springframework.data
spring-data-cassandra
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.19.1
+
+
+ com.google.guava
+ guava
+ 31.0.1-jre
+
+
+ com.lightstep.opentelemetry
+ opentelemetry-launcher
+ 1.5.0
+
+
+ io.opentelemetry
+ opentelemetry-api
+ 1.7.1
+
+
+ io.opentelemetry
+ opentelemetry-extension-annotations
+ 1.7.1
+
+
diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
index a69a1ad..fcb4f31 100644
--- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
+++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
@@ -20,6 +20,12 @@
import io.fusionauth.domain.api.UserConsentResponse;
import io.fusionauth.domain.api.UserRequest;
import io.fusionauth.domain.api.UserResponse;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import io.fusionauth.domain.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -75,605 +81,626 @@
@Slf4j
public class ReactiveConsumer {
- private final Flux> reactiveKafkaReceiver;
+ private final Flux> reactiveKafkaReceiver;
// @Autowired
// public KieSession kSession;
- @Autowired
- public XMessageRepository xMessageRepository;
-
- @Autowired
- public SimpleProducer kafkaProducer;
-
- @Autowired
- public ReactiveProducer reactiveProducer;
-
- @Value("${odk-transformer}")
- public String odkTransformerTopic;
-
- @Autowired
- public BotService botService;
-
- @Autowired
- public CampaignService campaignService;
-
- @Value("${encryptionKeyString}")
- private String secret;
-
- public AESWrapper encryptor;
-
- private final String DEFAULT_APP_NAME = "Global Bot";
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
-
-
- @EventListener(ApplicationStartedEvent.class)
- public void onMessage() {
- reactiveKafkaReceiver
- .doOnNext(new Consumer>() {
- @Override
- public void accept(ReceiverRecord stringMessage) {
- try {
- final long startTime = System.nanoTime();
- logTimeTaken(startTime, 0);
- XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
- SenderReceiverInfo from = msg.getFrom();
- logTimeTaken(startTime, 1);
- getAppName(msg.getPayload().getText(), msg.getFrom())
- .doOnNext(new Consumer() {
- @Override
- public void accept(String appName) {
- logTimeTaken(startTime, 2);
- msg.setApp(appName);
- fetchAdapterID(appName)
- .doOnNext(new Consumer() {
- @Override
- public void accept(String adapterID) {
- logTimeTaken(startTime, 3);
-
- from.setCampaignID(appName);
- from.setDeviceType(DeviceType.PHONE);
- resolveUserNew(msg)
- .doOnNext(new Consumer() {
- @Override
- public void accept(XMessage msg) {
- SenderReceiverInfo from = msg.getFrom();
- // msg.setFrom(from);
- msg.setApp(appName);
- getLastMessageID(msg)
- .doOnNext(lastMessageID -> {
- logTimeTaken(startTime, 4);
- msg.setLastMessageID(lastMessageID);
- msg.setAdapterId(adapterID);
- if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
- try {
- kafkaProducer.send(odkTransformerTopic, msg.toXML());
- // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML());
- } catch (JAXBException e) {
- e.printStackTrace();
- }
- logTimeTaken(startTime, 15);
- }
- })
- .doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in getLastMessageID" + throwable.getMessage());
- }
- })
- .subscribe();
- }
- })
- .doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in resolveUser" + throwable.getMessage());
- }
- })
- .subscribe();
-
- }
- })
- .doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in fetchAdapterID" + throwable.getMessage());
- }
- })
- .subscribe();
- }
- })
- .doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in getAppName" + throwable.getMessage());
- }
- })
- .subscribe();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- })
- .doOnError(new Consumer() {
- @Override
- public void accept(Throwable e) {
- System.out.println(e.getMessage());
- log.error("KafkaFlux exception", e);
- }
- })
- .subscribe();
- }
-
- private Mono resolveUserNew(XMessage xmsg) {
- try {
- SenderReceiverInfo from = xmsg.getFrom();
- String appName = xmsg.getApp();
-
- String deviceString = from.getDeviceType().toString() + ":" + from.getUserID();
- String encodedBase64Key = encodeKey(secret);
- String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key);
- ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID);
- if (response.wasSuccessful()) {
- from.setDeviceID(response.successResponse.user.id.toString());
- xmsg.setFrom(from);
- return xmsgCampaignForm(xmsg, response.successResponse.user);
- } else {
- return botService.updateUser(deviceString, appName)
- .flatMap(new Function, Mono>() {
- @Override
- public Mono apply(Pair result) {
- if (result.getLeft()) {
- from.setDeviceID(result.getRight());
- xmsg.setFrom(from);
- ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID);
- if (response.wasSuccessful()) {
- return xmsgCampaignForm(xmsg, response.successResponse.user);
- } else {
- return Mono.just(xmsg);
- }
- } else {
- xmsg.setFrom(null);
- return Mono.just(xmsg);
- }
- }
- }).doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in updateUser" + throwable.getMessage());
- }
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("Error in resolveUser" + e.getMessage());
- xmsg.setFrom(null);
- return Mono.just(xmsg);
- }
- }
-
- private Mono xmsgCampaignForm(XMessage xmsg, User user) {
- return campaignService.getCampaignFromNameTransformer(xmsg.getCampaign())
- .map(new Function() {
- @Override
- public XMessage apply(JsonNode campaign) {
- String campaignID = campaign.findValue("id").asText();
- Map formIDs = getCampaignFormIds(campaign);
- String currentFormID = getCurrentFormId(xmsg, campaignID, formIDs, user);
-
- HashMap metaData = new HashMap();
- metaData.put("campaignId", campaignID);
- metaData.put("currentFormID", currentFormID);
-
- saveCurrentFormID(xmsg.getFrom().getUserID(), campaignID,
- currentFormID);
-
- Transformer transf = new Transformer();
- transf.setId("test");
- transf.setMetaData(metaData);
-
- ArrayList transformers = new ArrayList();
- transformers.add(transf);
-
- xmsg.setTransformers(transformers);
-
- try {
- System.out.println("XML:"+xmsg.toXML());
- } catch (JAXBException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ @Autowired
+ public XMessageRepository xMessageRepository;
+
+ @Autowired
+ public SimpleProducer kafkaProducer;
+
+ @Autowired
+ public ReactiveProducer reactiveProducer;
+
+ @Value("${odk-transformer}")
+ public String odkTransformerTopic;
+
+ @Autowired
+ public BotService botService;
+
+ @Autowired
+ public CampaignService campaignService;
+
+ @Value("${encryptionKeyString}")
+ private String secret;
+
+ @Autowired
+ private Tracer tracer;
+
+ public AESWrapper encryptor;
+
+ private final String DEFAULT_APP_NAME = "Global Bot";
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
+
+ @EventListener(ApplicationStartedEvent.class)
+ public void onMessage() {
+ reactiveKafkaReceiver.doOnNext(new Consumer>() {
+ @Override
+ public void accept(ReceiverRecord stringMessage) {
+ try {
+ final long startTime = System.nanoTime();
+ logTimeTaken(startTime, 0);
+ XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
+
+// Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg, null);
+// extracted.getCurrentSpan();
+ Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan();
+ try (Scope scope = rootSpan.makeCurrent()) {
+ Context currentContext = Context.current();
+ SenderReceiverInfo from = msg.getFrom();
+ Span childSpan1 = createChildSpan("fetchAdapterID", currentContext, rootSpan);
+ logTimeTaken(startTime, 1);
+ fetchAdapterID(msg.getApp()).doOnNext(new Consumer() {
+ @Override
+ public void accept(String adapterID) {
+ childSpan1.end();
+ Span childSpan2 = createChildSpan("resolveUserNew", currentContext, rootSpan);
+ logTimeTaken(startTime, 2);
+ from.setCampaignID(msg.getApp());
+ from.setDeviceType(DeviceType.PHONE);
+ resolveUserNew(msg).doOnNext(new Consumer() {
+ @Override
+ public void accept(XMessage msg) {
+ childSpan2.end();
+ Span childSpan3 = createChildSpan("getLastMessageID", currentContext, rootSpan);
+ SenderReceiverInfo from = msg.getFrom();
+ // msg.setFrom(from);
+ getLastMessageID(msg).doOnNext(lastMessageID -> {
+ childSpan3.end();
+ Span childSpan4 = createChildSpan("sendMessageToKafka", currentContext,
+ rootSpan);
+ logTimeTaken(startTime, 3);
+ msg.setLastMessageID(lastMessageID);
+ msg.setAdapterId(adapterID);
+ if (msg.getMessageState().equals(XMessage.MessageState.REPLIED)
+ || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
+ try {
+ kafkaProducer.send(odkTransformerTopic, msg.toXML());
+ childSpan4.end();
+ rootSpan.end();
+ // reactiveProducer.sendMessages(odkTransformerTopic,
+ // msg.toXML());
+ } catch (JAXBException e) {
+ e.printStackTrace();
+ }
+ logTimeTaken(startTime, 15);
+ }
+ }).doOnError(genericError("getLastMessageID", childSpan3)).subscribe();
+ }
+ }).doOnError(genericError("resolveUserNew", childSpan2)).subscribe();
+
+ }
+ }).doOnError(genericError("fetchAdapterID", childSpan1)).subscribe();
+
+ } catch (Throwable e) {
+ genericException(e.getMessage(), rootSpan);
+ } finally {
+// rootSpan.end();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).doOnError(genericError("KafkaFlux", null)).subscribe();
+
+ }
+
+ /**
+ * Create Child Span with current context & parent span
+ *
+ * @param spanName
+ * @param context
+ * @param parentSpan
+ * @return childSpan
+ */
+ private Span createChildSpan(String spanName, Context context, Span parentSpan) {
+ String prefix = "orchestrator-";
+ return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan();
+ }
+
+ /**
+ * Log Exceptions & if span exists, add error to span
+ *
+ * @param eMsg
+ * @param span
+ */
+ private void genericException(String eMsg, Span span) {
+ eMsg = "Exception: " + eMsg;
+ log.error(eMsg);
+ if (span != null) {
+ span.setStatus(StatusCode.ERROR, "Exception: " + eMsg);
+ span.end();
+ }
+ }
+
+ /**
+ * Log Exception & if span exists, add error to span
+ *
+ * @param s
+ * @param span
+ * @return
+ */
+ private Consumer genericError(String s, Span span) {
+ return c -> {
+ String msg = "Error in " + s + "::" + c.getMessage();
+ log.error(msg);
+ if (span != null) {
+ log.info("generic message - span");
+ span.setStatus(StatusCode.ERROR, msg);
+ span.end();
+ }
+ };
+ }
+
+ private Mono resolveUserNew(XMessage xmsg) {
+ try {
+ SenderReceiverInfo from = xmsg.getFrom();
+ String appName = xmsg.getApp();
+
+ String deviceString = from.getDeviceType().toString() + ":" + from.getUserID();
+ String encodedBase64Key = encodeKey(secret);
+ String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key);
+ ClientResponse response = campaignService.fusionAuthClient
+ .retrieveUserByUsername(deviceID);
+ if (response.wasSuccessful()) {
+ from.setDeviceID(response.successResponse.user.id.toString());
+ xmsg.setFrom(from);
+ return xmsgCampaignForm(xmsg, response.successResponse.user);
+ } else {
+ return botService.updateUser(deviceString, appName)
+ .flatMap(new Function, Mono>() {
+ @Override
+ public Mono apply(Pair result) {
+ if (result.getLeft()) {
+ from.setDeviceID(result.getRight());
+ xmsg.setFrom(from);
+ ClientResponse response = campaignService.fusionAuthClient
+ .retrieveUserByUsername(deviceID);
+ if (response.wasSuccessful()) {
+ return xmsgCampaignForm(xmsg, response.successResponse.user);
+ } else {
+ return Mono.just(xmsg);
+ }
+ } else {
+ xmsg.setFrom(null);
+ return Mono.just(xmsg);
+ }
+ }
+ }).doOnError(genericError("updateUser", null));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("Exception in resolveUser" + e.getMessage());
+ xmsg.setFrom(null);
+ return Mono.just(xmsg);
+ }
+ }
+
+ private Mono xmsgCampaignForm(XMessage xmsg, User user) {
+ return campaignService.getCampaignFromNameTransformer(xmsg.getCampaign())
+ .map(new Function() {
+ @Override
+ public XMessage apply(JsonNode campaign) {
+ String campaignID = campaign.findValue("id").asText();
+ Map formIDs = getCampaignFormIds(campaign);
+ String currentFormID = getCurrentFormId(xmsg, campaignID, formIDs, user);
+
+ HashMap metaData = new HashMap();
+ metaData.put("campaignId", campaignID);
+ metaData.put("currentFormID", currentFormID);
+
+ saveCurrentFormID(xmsg.getFrom().getUserID(), campaignID, currentFormID);
+
+ Transformer transf = new Transformer();
+ transf.setId("test");
+ transf.setMetaData(metaData);
+
+ ArrayList transformers = new ArrayList();
+ transformers.add(transf);
+
+ xmsg.setTransformers(transformers);
+
+ try {
+ System.out.println("XML:" + xmsg.toXML());
+ } catch (JAXBException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return xmsg;
}
-
- return xmsg;
- }
- });
- }
- /**
- * Get all form ids from the campaign node
- *
- * @param campaign
- * @return
- */
- private Map getCampaignFormIds(JsonNode campaign) {
- ArrayList formIDs = new ArrayList();
- Map formIDs2 = new HashMap();
- try {
- campaign.findValue("logicIDs").forEach(t -> {
- formIDs2.put(t.asText(), "");
- });
-
- campaign.findValue("logic").forEach(t -> {
- formIDs2.put(t.findValue("id").asText(), t.findValue("formID").asText());
- });
-
- System.out.println("formIDs:"+formIDs2);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return formIDs2;
- }
-
- /**
- * Get the current from id
- *
- * @param xmsg
- * @param campaignID
- * @param formIDs
- * @return
- */
- private String getCurrentFormId(XMessage xmsg, String campaignID, Map formIDs, User user) {
- String currentFormID = "";
-
- Boolean consent = checkUserCampaignConsent(campaignID, user);
-
- /* Fetch current form id from file for user & campaign */
- currentFormID = getCurrentFormIDFromFile(xmsg.getFrom().getUserID(), campaignID);
-
- /* if current form id is empty, then set the first form id as current form id
- * else if current form id is equal to consent form id
- *
- */
- System.out.println("currentFormID:"+currentFormID);
- if(currentFormID == null || currentFormID.isEmpty()) {
- if(!formIDs.isEmpty() && formIDs.size() > 0) {
+ }).doOnError(genericError("getCampaignFromNameTransformer", null));
+ }
+
+ /**
+ * Get all form ids from the campaign node
+ *
+ * @param campaign
+ * @return
+ */
+ private Map getCampaignFormIds(JsonNode campaign) {
+ ArrayList formIDs = new ArrayList();
+ Map formIDs2 = new HashMap();
+ try {
+ campaign.findValue("logicIDs").forEach(t -> {
+ formIDs2.put(t.asText(), "");
+ });
+
+ campaign.findValue("logic").forEach(t -> {
+ formIDs2.put(t.findValue("id").asText(), t.findValue("formID").asText());
+ });
+
+ System.out.println("formIDs:" + formIDs2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return formIDs2;
+ }
+
+ /**
+ * Get the current from id
+ *
+ * @param xmsg
+ * @param campaignID
+ * @param formIDs
+ * @return
+ */
+ private String getCurrentFormId(XMessage xmsg, String campaignID, Map formIDs, User user) {
+ String currentFormID = "";
+
+ Boolean consent = checkUserCampaignConsent(campaignID, user);
+
+ /* Fetch current form id from file for user & campaign */
+ currentFormID = getCurrentFormIDFromFile(xmsg.getFrom().getUserID(), campaignID);
+
+ /*
+ * if current form id is empty, then set the first form id as current form id
+ * else if current form id is equal to consent form id
+ *
+ */
+ System.out.println("currentFormID:" + currentFormID);
+ if (currentFormID == null || currentFormID.isEmpty()) {
+ if (!formIDs.isEmpty() && formIDs.size() > 0) {
currentFormID = formIDs.values().toArray()[0].toString();
}
- }
-
- /* if current form is consent form */
- if(currentFormID.equals(getConsentFormID())) {
- /* if consent already exists, set next form as current one,
- * else check the response for further details */
- if(consent) {
- currentFormID = formIDs.values().toArray()[1].toString();
- } else {
- String response = xmsg.getPayload().getText();
- if(response.equals("1")) {
- //update fusion auth client for consent & set the next form id as current form id
- addUserCampaignConsent(campaignID, user);
- currentFormID = formIDs.values().toArray()[1].toString();
- } else if(response.equals("2")) {
- //drop conversation
- currentFormID = "";
- System.out.println("drop conversation.");
- } else {
- // invalid response, leave the consent form id as current
- }
- }
- }
-
- System.out.println(currentFormID);
-
- return currentFormID;
- }
-
- /**
- * Get current form id set in file for user & campaign
- *
- * @param userID
- * @param campaignID
- * @return
- */
- private String getCurrentFormIDFromFile(String userID, String campaignID) {
- String currentFormID = "";
- Resource resource = new ClassPathResource(getJsonFilePath());
- try {
- ObjectMapper mapper = new ObjectMapper();
-
- InputStream inputStream = resource.getInputStream();
-
- byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
-
- JsonNode rootNode = mapper.readTree(bdata);
-
- if(!rootNode.isEmpty() && rootNode.get(userID) != null
- && rootNode.path(userID).get(campaignID) != null) {
- currentFormID = rootNode.path(userID).get(campaignID).asText();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return currentFormID;
- }
-
- /**
- * Save current form id in file for user & campaign
- *
- * @param userID
- * @param campaignID
- * @param currentFormID
- */
+ }
+
+ /* if current form is consent form */
+ if (currentFormID.equals(getConsentFormID())) {
+ /*
+ * if consent already exists, set next form as current one, else check the
+ * response for further details
+ */
+ if (consent) {
+ currentFormID = formIDs.values().toArray()[1].toString();
+ } else {
+ String response = xmsg.getPayload().getText();
+ if (response.equals("1")) {
+ // update fusion auth client for consent & set the next form id as current form
+ // id
+ addUserCampaignConsent(campaignID, user);
+ currentFormID = formIDs.values().toArray()[1].toString();
+ } else if (response.equals("2")) {
+ // drop conversation
+ currentFormID = "";
+ System.out.println("drop conversation.");
+ } else {
+ // invalid response, leave the consent form id as current
+ }
+ }
+ }
+
+ System.out.println(currentFormID);
+
+ return currentFormID;
+ }
+
+ /**
+ * Get current form id set in file for user & campaign
+ *
+ * @param userID
+ * @param campaignID
+ * @return
+ */
+ private String getCurrentFormIDFromFile(String userID, String campaignID) {
+ String currentFormID = "";
+ Resource resource = new ClassPathResource(getJsonFilePath());
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ InputStream inputStream = resource.getInputStream();
+
+ byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
+
+ JsonNode rootNode = mapper.readTree(bdata);
+
+ if (!rootNode.isEmpty() && rootNode.get(userID) != null && rootNode.path(userID).get(campaignID) != null) {
+ currentFormID = rootNode.path(userID).get(campaignID).asText();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return currentFormID;
+ }
+
+ /**
+ * Save current form id in file for user & campaign
+ *
+ * @param userID
+ * @param campaignID
+ * @param currentFormID
+ */
private void saveCurrentFormID(String userID, String campaignID, String currentFormID) {
- Resource resource = new ClassPathResource(getJsonFilePath());
- try {
- ObjectMapper mapper = new ObjectMapper();
-
- File file = resource.getFile();
- InputStream inputStream = resource.getInputStream();
-
- byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
-
- JsonNode rootNode = mapper.readTree(bdata);
- if(rootNode.isEmpty()) {
- rootNode = mapper.createObjectNode();
- }
-
- if(!rootNode.isEmpty() && rootNode.get(userID) != null) {
- ((ObjectNode) rootNode.path(userID)).put(campaignID, currentFormID);
- } else {
- JsonNode campaignNode = mapper.createObjectNode();
- ((ObjectNode) campaignNode).put(campaignID, currentFormID);
-
- ((ObjectNode) rootNode).put(userID, campaignNode);
- }
-
- System.out.println("Saved File String:"+rootNode.toString());
-
- FileWriter fileWriter = new FileWriter(file);
- fileWriter.write(rootNode.toString());
- fileWriter.flush();
- fileWriter.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private String getJsonFilePath() {
- return "userCurrentForm.json";
- }
-
+ Resource resource = new ClassPathResource(getJsonFilePath());
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ File file = resource.getFile();
+ InputStream inputStream = resource.getInputStream();
+
+ byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
+
+ JsonNode rootNode = mapper.readTree(bdata);
+ if (rootNode.isEmpty()) {
+ rootNode = mapper.createObjectNode();
+ }
+
+ if (!rootNode.isEmpty() && rootNode.get(userID) != null) {
+ ((ObjectNode) rootNode.path(userID)).put(campaignID, currentFormID);
+ } else {
+ JsonNode campaignNode = mapper.createObjectNode();
+ ((ObjectNode) campaignNode).put(campaignID, currentFormID);
+
+ ((ObjectNode) rootNode).put(userID, campaignNode);
+ }
+
+ System.out.println("Saved File String:" + rootNode.toString());
+
+ FileWriter fileWriter = new FileWriter(file);
+ fileWriter.write(rootNode.toString());
+ fileWriter.flush();
+ fileWriter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String getJsonFilePath() {
+ return "userCurrentForm.json";
+ }
+
// private
-
- private String getConsentFormID() {
- return "mandatory-consent-v1";
- }
-
- private Mono resolveUser(SenderReceiverInfo from, String appName) {
- try {
- String deviceString = from.getDeviceType().toString() + ":" + from.getUserID();
- String encodedBase64Key = encodeKey(secret);
- String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key);
- ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID);
- if (response.wasSuccessful()) {
- from.setDeviceID(response.successResponse.user.id.toString());
+
+ private String getConsentFormID() {
+ return "mandatory-consent-v1";
+ }
+
+ private Mono resolveUser(SenderReceiverInfo from, String appName) {
+ try {
+ String deviceString = from.getDeviceType().toString() + ":" + from.getUserID();
+ String encodedBase64Key = encodeKey(secret);
+ String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key);
+ ClientResponse response = campaignService.fusionAuthClient
+ .retrieveUserByUsername(deviceID);
+ if (response.wasSuccessful()) {
+ from.setDeviceID(response.successResponse.user.id.toString());
// checkConsent(from.getCampaignID(), response.successResponse.user);
- return Mono.just(from);
- } else {
- return botService.updateUser(deviceString, appName)
- .flatMap(new Function, Mono>() {
- @Override
- public Mono apply(Pair result) {
- if (result.getLeft()) {
- from.setDeviceID(result.getRight());
- return Mono.just(from);
- } else {
- return Mono.just(null);
- }
- }
- }).doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in updateUser" + throwable.getMessage());
- }
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("Error in resolveUser" + e.getMessage());
- return Mono.just(null);
- }
- }
-
- private Boolean checkUserCampaignConsent(String campaignID, User user) {
- Boolean consent = false;
- try {
- Object consentData = user.data.get("consent");
- ArrayList consentArray = (ArrayList) consentData;
- if(consentArray != null && consentArray.contains(campaignID)) {
- consent = true;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return consent;
- }
-
- private void addUserCampaignConsent(String campaignID, User user)
- {
- try {
- Object consentData = user.data.get("consent");
+ return Mono.just(from);
+ } else {
+ return botService.updateUser(deviceString, appName)
+ .flatMap(new Function, Mono>() {
+ @Override
+ public Mono apply(Pair result) {
+ if (result.getLeft()) {
+ from.setDeviceID(result.getRight());
+ return Mono.just(from);
+ } else {
+ return Mono.just(null);
+ }
+ }
+ }).doOnError(new Consumer() {
+ @Override
+ public void accept(Throwable throwable) {
+ log.error("Error in updateUser" + throwable.getMessage());
+ }
+ });
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("Error in resolveUser" + e.getMessage());
+ return Mono.just(null);
+ }
+ }
+
+ private Boolean checkUserCampaignConsent(String campaignID, User user) {
+ Boolean consent = false;
+ try {
+ Object consentData = user.data.get("consent");
ArrayList consentArray = (ArrayList) consentData;
-
- if(consentArray == null || (
- consentArray != null && !consentArray.contains(campaignID))
- ){
- consentArray.add(campaignID);
-
- user.data.put("consent", consentArray);
-
- updateFAUser(user);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void updateFAUser(User user) {
- System.out.println(user);
- UserRequest r = new UserRequest(user);
-
- ClientResponse response = campaignService.fusionAuthClient.updateUser(user.id, r);
- if(response.wasSuccessful()) {
- System.out.println("user update success");
+ if (consentArray != null && consentArray.contains(campaignID)) {
+ consent = true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return consent;
+ }
+
+ private void addUserCampaignConsent(String campaignID, User user) {
+ try {
+ Object consentData = user.data.get("consent");
+ ArrayList consentArray = (ArrayList) consentData;
+
+ if (consentArray == null || (consentArray != null && !consentArray.contains(campaignID))) {
+ consentArray.add(campaignID);
+
+ user.data.put("consent", consentArray);
+
+ updateFAUser(user);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void updateFAUser(User user) {
+ log.error("User: " + user);
+ UserRequest r = new UserRequest(user);
+
+ ClientResponse response = campaignService.fusionAuthClient.updateUser(user.id, r);
+ if (response.wasSuccessful()) {
+ log.error("user update success");
} else {
- System.out.println("error in user update"+response.errorResponse);
+ log.error("error in user update" + response.errorResponse);
+ }
+ }
+
+ private void logTimeTaken(long startTime, int checkpointID) {
+ long endTime = System.nanoTime();
+ long duration = (endTime - startTime) / 1000000;
+ log.info(String.format("CP-%d: %d ms", checkpointID, duration));
+ }
+
+ private Mono getLastMessageID(XMessage msg) {
+ if (msg.getMessageType().toString().equalsIgnoreCase("text")) {
+ return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT")
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO msg1) {
+ if (msg1.getId() == null) {
+ System.out.println("cError");
+ return "";
+ }
+ return String.valueOf(msg1.getId());
+ }
+ });
+
+ } else if (msg.getMessageType().toString().equalsIgnoreCase("button")) {
+ return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT")
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO lastMessage) {
+ return String.valueOf(lastMessage.getId());
+ }
+ });
+ }
+ return Mono.empty();
+ }
+
+ private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) {
+ return xMessageRepository.findAllByUserIdAndTimestampAfter(userID, yesterday).collectList()
+ .map(new Function, XMessageDAO>() {
+ @Override
+ public XMessageDAO apply(List xMessageDAOS) {
+ if (xMessageDAOS.size() > 0) {
+ List filteredList = new ArrayList<>();
+ for (XMessageDAO xMessageDAO : xMessageDAOS) {
+ if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name())
+ || xMessageDAO.getMessageState().equals(XMessage.MessageState.REPLIED.name()))
+ filteredList.add(xMessageDAO);
+ }
+ if (filteredList.size() > 0) {
+ filteredList.sort(new Comparator() {
+ @Override
+ public int compare(XMessageDAO o1, XMessageDAO o2) {
+ return o1.getTimestamp().compareTo(o2.getTimestamp());
+ }
+ });
+ }
+ return xMessageDAOS.get(0);
+ }
+ return new XMessageDAO();
+ }
+ }).doOnError(genericError("getLatestXMessage", null));
+ }
+
+ private Mono fetchAdapterID(String appName) {
+ return botService.getCurrentAdapter(appName);
+ }
+
+ private Mono getAppName(String text, SenderReceiverInfo from) {
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
+ log.info("Inside getAppName " + text + "::" + from.getUserID());
+ if (text.equals("")) {
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ }
+ } else {
+ try {
+ log.info("Inside getAppName " + text + "::" + from.getUserID());
+ return botService.getCampaignFromStartingMessage(text)
+ .flatMap(new Function>() {
+ @Override
+ public Mono apply(String appName1) {
+ log.info("Inside getCampaignFromStartingMessage => " + appName1);
+ if (appName1 == null || appName1.equals("")) {
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday,
+ XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return (xMessageLast.getApp() == null
+ || xMessageLast.getApp().isEmpty())
+ ? "finalAppName"
+ : xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday,
+ XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return (xMessageLast.getApp() == null
+ || xMessageLast.getApp().isEmpty())
+ ? "finalAppName"
+ : xMessageLast.getApp();
+ }
+ });
+ }
+ }
+ return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName")
+ : Mono.just(appName1);
+ }
+ }).doOnError(genericError("getCampaignFromStartingMessage", null));
+ } catch (Exception e) {
+ log.info("Inside getAppName - exception => " + e.getMessage());
+ try {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ } catch (Exception e2) {
+ return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name())
+ .map(new Function() {
+ @Override
+ public String apply(XMessageDAO xMessageLast) {
+ return xMessageLast.getApp();
+ }
+ });
+ }
+ }
}
- }
-
- private void logTimeTaken(long startTime, int checkpointID) {
- long endTime = System.nanoTime();
- long duration = (endTime - startTime) / 1000000;
- log.info(String.format("CP-%d: %d ms", checkpointID, duration));
- }
-
- private Mono getLastMessageID(XMessage msg) {
- if (msg.getMessageType().toString().equalsIgnoreCase("text")) {
- return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() {
- @Override
- public String apply(XMessageDAO msg1) {
- if (msg1.getId() == null) {
- System.out.println("cError");
- return "";
- }
- return String.valueOf(msg1.getId());
- }
- });
-
- } else if (msg.getMessageType().toString().equalsIgnoreCase("button")) {
- return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() {
- @Override
- public String apply(XMessageDAO lastMessage) {
- return String.valueOf(lastMessage.getId());
- }
- });
-//
-// map(new Function() {
-// @Override
-// public String apply(XMessageDAO lastMessage) {
-// return String.valueOf(lastMessage.getId());
-// }
-// });
- }
- return Mono.empty();
- }
-
- private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) {
- return xMessageRepository
- .findAllByUserIdAndTimestampAfter(userID, yesterday).collectList()
- .map(new Function, XMessageDAO>() {
- @Override
- public XMessageDAO apply(List xMessageDAOS) {
- if (xMessageDAOS.size() > 0) {
- List filteredList = new ArrayList<>();
- for (XMessageDAO xMessageDAO : xMessageDAOS) {
- if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name()) ||
- xMessageDAO.getMessageState().equals(XMessage.MessageState.REPLIED.name()))
- filteredList.add(xMessageDAO);
- }
- if (filteredList.size() > 0) {
- filteredList.sort(new Comparator() {
- @Override
- public int compare(XMessageDAO o1, XMessageDAO o2) {
- return o1.getTimestamp().compareTo(o2.getTimestamp());
- }
- });
- }
- return xMessageDAOS.get(0);
- }
- return new XMessageDAO();
- }
- }).doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in getLatestXMessage" + throwable.getMessage());
- }
- });
- }
-
- private Mono fetchAdapterID(String appName) {
- return botService.getCurrentAdapter(appName);
- }
-
- private Mono getAppName(String text, SenderReceiverInfo from) {
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
- log.info("Inside getAppName " + text + "::" + from.getUserID());
- if (text.equals("")) {
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- }
- } else {
- try {
- log.info("Inside getAppName " + text + "::" + from.getUserID());
- return botService.getCampaignFromStartingMessage(text)
- .flatMap(new Function>() {
- @Override
- public Mono apply(String appName1) {
- log.info("Inside getCampaignFromStartingMessage => " + appName1);
- if (appName1 == null || appName1.equals("")) {
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp();
- }
- });
- }
- }
- return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1);
- }
- }).doOnError(new Consumer() {
- @Override
- public void accept(Throwable throwable) {
- log.error("Error in getCampaignFromStartingMessage" + throwable.getMessage());
- }
- });
- } catch (Exception e) {
- log.info("Inside getAppName - exception => " + e.getMessage());
- try {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- } catch (Exception e2) {
- return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() {
- @Override
- public String apply(XMessageDAO xMessageLast) {
- return xMessageLast.getApp();
- }
- });
- }
- }
- }
- }
+ }
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index eb1f681..8f9c3f0 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -51,3 +51,12 @@ spring.r2dbc.password=${FORMS_DB_PASSWORD}
caffeine.cache.max.size=${CAFFEINE_CACHE_MAX_SIZE:#{1000}}
caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}}
+#Opentelemetry Lighstep Config
+opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER}
+opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION}
+opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE}
+opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN}
+opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT}
+
+log4j2.formatMsgNoLookups=true
+
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..73bfb43
--- /dev/null
+++ b/src/main/resources/log4j2.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+