From 3a83da88e54f537b7161b067c4b117eb08fe91ee Mon Sep 17 00:00:00 2001 From: Surabhi Date: Fri, 3 Dec 2021 10:06:28 +0530 Subject: [PATCH 1/8] lightstep opentelemetry - tracing on bot messages --- pom.xml | 32 +- .../Consumer/ReactiveConsumer.java | 1224 +++++++++-------- src/main/resources/application.properties | 7 + src/main/resources/log4j2.xml | 14 + 4 files changed, 685 insertions(+), 592 deletions(-) create mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index bd1e5d5..81aa3c8 100644 --- a/pom.xml +++ b/pom.xml @@ -28,10 +28,6 @@ org.springframework.boot spring-boot-starter-validation - - org.springframework.boot - spring-boot-devtools - org.springframework.boot spring-boot-starter-web-services @@ -129,6 +125,34 @@ org.springframework.data spring-data-cassandra + + + + com.google.protobuf + protobuf-java + 3.19.1 + + + com.google.guava + guava + 31.0.1-jre + + + com.lightstep.opentelemetry + opentelemetry-launcher + 1.5.0 + + + io.opentelemetry + opentelemetry-api + 1.7.1 + + + io.opentelemetry + opentelemetry-extension-annotations + 1.7.1 + + diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index a69a1ad..24e8d88 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -20,6 +20,12 @@ import io.fusionauth.domain.api.UserConsentResponse; import io.fusionauth.domain.api.UserRequest; import io.fusionauth.domain.api.UserResponse; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.fusionauth.domain.User; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -75,605 +81,647 @@ @Slf4j public class ReactiveConsumer { - private final Flux> reactiveKafkaReceiver; + private final Flux> reactiveKafkaReceiver; // @Autowired // public KieSession kSession; - @Autowired - public XMessageRepository xMessageRepository; - - @Autowired - public SimpleProducer kafkaProducer; - - @Autowired - public ReactiveProducer reactiveProducer; - - @Value("${odk-transformer}") - public String odkTransformerTopic; - - @Autowired - public BotService botService; - - @Autowired - public CampaignService campaignService; - - @Value("${encryptionKeyString}") - private String secret; - - public AESWrapper encryptor; - - private final String DEFAULT_APP_NAME = "Global Bot"; - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - - - @EventListener(ApplicationStartedEvent.class) - public void onMessage() { - reactiveKafkaReceiver - .doOnNext(new Consumer>() { - @Override - public void accept(ReceiverRecord stringMessage) { - try { - final long startTime = System.nanoTime(); - logTimeTaken(startTime, 0); - XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); - SenderReceiverInfo from = msg.getFrom(); - logTimeTaken(startTime, 1); - getAppName(msg.getPayload().getText(), msg.getFrom()) - .doOnNext(new Consumer() { - @Override - public void accept(String appName) { - logTimeTaken(startTime, 2); - msg.setApp(appName); - fetchAdapterID(appName) - .doOnNext(new Consumer() { - @Override - public void accept(String adapterID) { - logTimeTaken(startTime, 3); - - from.setCampaignID(appName); - from.setDeviceType(DeviceType.PHONE); - resolveUserNew(msg) - .doOnNext(new Consumer() { - @Override - public void accept(XMessage msg) { - SenderReceiverInfo from = msg.getFrom(); - // msg.setFrom(from); - msg.setApp(appName); - getLastMessageID(msg) - .doOnNext(lastMessageID -> { - logTimeTaken(startTime, 4); - msg.setLastMessageID(lastMessageID); - msg.setAdapterId(adapterID); - if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { - try { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); - // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); - } catch (JAXBException e) { - e.printStackTrace(); - } - logTimeTaken(startTime, 15); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getLastMessageID" + throwable.getMessage()); - } - }) - .subscribe(); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in resolveUser" + throwable.getMessage()); - } - }) - .subscribe(); - - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in fetchAdapterID" + throwable.getMessage()); - } - }) - .subscribe(); - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getAppName" + throwable.getMessage()); - } - }) - .subscribe(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }) - .doOnError(new Consumer() { - @Override - public void accept(Throwable e) { - System.out.println(e.getMessage()); - log.error("KafkaFlux exception", e); - } - }) - .subscribe(); - } - - private Mono resolveUserNew(XMessage xmsg) { - try { - SenderReceiverInfo from = xmsg.getFrom(); - String appName = xmsg.getApp(); - - String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); - String encodedBase64Key = encodeKey(secret); - String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); - ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID); - if (response.wasSuccessful()) { - from.setDeviceID(response.successResponse.user.id.toString()); - xmsg.setFrom(from); - return xmsgCampaignForm(xmsg, response.successResponse.user); - } else { - return botService.updateUser(deviceString, appName) - .flatMap(new Function, Mono>() { - @Override - public Mono apply(Pair result) { - if (result.getLeft()) { - from.setDeviceID(result.getRight()); - xmsg.setFrom(from); - ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID); - if (response.wasSuccessful()) { - return xmsgCampaignForm(xmsg, response.successResponse.user); - } else { - return Mono.just(xmsg); - } - } else { - xmsg.setFrom(null); - return Mono.just(xmsg); - } - } - }).doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in updateUser" + throwable.getMessage()); - } - }); - } - } catch (Exception e) { - e.printStackTrace(); - log.error("Error in resolveUser" + e.getMessage()); - xmsg.setFrom(null); - return Mono.just(xmsg); - } - } - - private Mono xmsgCampaignForm(XMessage xmsg, User user) { - return campaignService.getCampaignFromNameTransformer(xmsg.getCampaign()) - .map(new Function() { - @Override - public XMessage apply(JsonNode campaign) { - String campaignID = campaign.findValue("id").asText(); - Map formIDs = getCampaignFormIds(campaign); - String currentFormID = getCurrentFormId(xmsg, campaignID, formIDs, user); - - HashMap metaData = new HashMap(); - metaData.put("campaignId", campaignID); - metaData.put("currentFormID", currentFormID); - - saveCurrentFormID(xmsg.getFrom().getUserID(), campaignID, - currentFormID); - - Transformer transf = new Transformer(); - transf.setId("test"); - transf.setMetaData(metaData); - - ArrayList transformers = new ArrayList(); - transformers.add(transf); - - xmsg.setTransformers(transformers); - - try { - System.out.println("XML:"+xmsg.toXML()); - } catch (JAXBException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + @Autowired + public XMessageRepository xMessageRepository; + + @Autowired + public SimpleProducer kafkaProducer; + + @Autowired + public ReactiveProducer reactiveProducer; + + @Value("${odk-transformer}") + public String odkTransformerTopic; + + @Autowired + public BotService botService; + + @Autowired + public CampaignService campaignService; + + @Value("${encryptionKeyString}") + private String secret; + + @Autowired + private Tracer tracer; + + public AESWrapper encryptor; + + private final String DEFAULT_APP_NAME = "Global Bot"; + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + + @EventListener(ApplicationStartedEvent.class) + public void onMessage() { + reactiveKafkaReceiver.doOnNext(new Consumer>() { + @Override + public void accept(ReceiverRecord stringMessage) { + try { + final long startTime = System.nanoTime(); + logTimeTaken(startTime, 0); + XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); + +// Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg, null); +// extracted.getCurrentSpan(); + Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan(); + try (Scope scope = rootSpan.makeCurrent()) { + Context currentContext = Context.current(); + Span childSpan1 = createChildSpan("getAppName", currentContext, rootSpan); + SenderReceiverInfo from = msg.getFrom(); + logTimeTaken(startTime, 1); + getAppName(msg.getPayload().getText(), msg.getFrom()).doOnNext(new Consumer() { + @Override + public void accept(String appName) { + childSpan1.end(); + Span childSpan2 = createChildSpan("fetchAdapterID", currentContext, rootSpan); + logTimeTaken(startTime, 2); + msg.setApp(appName); + fetchAdapterID(appName).doOnNext(new Consumer() { + @Override + public void accept(String adapterID) { + childSpan2.end(); + Span childSpan3 = createChildSpan("resolveUserNew", currentContext, rootSpan); + logTimeTaken(startTime, 3); + from.setCampaignID(appName); + from.setDeviceType(DeviceType.PHONE); + resolveUserNew(msg).doOnNext(new Consumer() { + @Override + public void accept(XMessage msg) { + childSpan3.end(); + Span childSpan4 = createChildSpan("getLastMessageID", currentContext, rootSpan); + SenderReceiverInfo from = msg.getFrom(); + // msg.setFrom(from); + msg.setApp(appName); + getLastMessageID(msg).doOnNext(lastMessageID -> { + childSpan4.end(); + Span childSpan5 = createChildSpan("sendMessageToKafka", currentContext, rootSpan); + logTimeTaken(startTime, 4); + msg.setLastMessageID(lastMessageID); + msg.setAdapterId(adapterID); + if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) + || msg.getMessageState() + .equals(XMessage.MessageState.OPTED_IN)) { + try { + kafkaProducer.send(odkTransformerTopic, msg.toXML()); + childSpan5.end(); + rootSpan.end(); + // reactiveProducer.sendMessages(odkTransformerTopic, + // msg.toXML()); + } catch (JAXBException e) { + e.printStackTrace(); + } + logTimeTaken(startTime, 15); + } + }) + .doOnError(genericError("getLastMessageID", childSpan4)) + .subscribe(); + } + }) + .doOnError(genericError("resolveUserNew", childSpan3)) + .subscribe(); + + } + }) + .doOnError(genericError("fetchAdapterID", childSpan2)) + .subscribe(); + } + }) + .doOnError(genericError("getAppName", childSpan1)) + .subscribe(); + + } catch(Throwable e) { + genericException(e.getMessage(), rootSpan); + } finally { +// rootSpan.end(); } - - return xmsg; - } - }); - } - /** - * Get all form ids from the campaign node - * - * @param campaign - * @return - */ - private Map getCampaignFormIds(JsonNode campaign) { - ArrayList formIDs = new ArrayList(); - Map formIDs2 = new HashMap(); - try { - campaign.findValue("logicIDs").forEach(t -> { - formIDs2.put(t.asText(), ""); - }); - - campaign.findValue("logic").forEach(t -> { - formIDs2.put(t.findValue("id").asText(), t.findValue("formID").asText()); - }); - - System.out.println("formIDs:"+formIDs2); - } catch (Exception e) { - e.printStackTrace(); - } - return formIDs2; - } - - /** - * Get the current from id - * - * @param xmsg - * @param campaignID - * @param formIDs - * @return - */ - private String getCurrentFormId(XMessage xmsg, String campaignID, Map formIDs, User user) { - String currentFormID = ""; - - Boolean consent = checkUserCampaignConsent(campaignID, user); - - /* Fetch current form id from file for user & campaign */ - currentFormID = getCurrentFormIDFromFile(xmsg.getFrom().getUserID(), campaignID); - - /* if current form id is empty, then set the first form id as current form id - * else if current form id is equal to consent form id - * - */ - System.out.println("currentFormID:"+currentFormID); - if(currentFormID == null || currentFormID.isEmpty()) { - if(!formIDs.isEmpty() && formIDs.size() > 0) { + } catch (Exception e) { + e.printStackTrace(); + } + } + }) + .doOnError(genericError("KafkaFlux", null)) + .subscribe(); + + } + + /** + * Create Child Span with current context & parent span + * @param spanName + * @param context + * @param parentSpan + * @return childSpan + */ + private Span createChildSpan(String spanName, Context context, Span parentSpan) { + String prefix = "orchestrator-"; + return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan(); + } + + /** + * Log Exceptions & if span exists, add error to span + * @param eMsg + * @param span + */ + private void genericException(String eMsg, Span span) { + eMsg = "Exception: " + eMsg; + log.error(eMsg); + if(span != null) { + span.setStatus(StatusCode.ERROR, "Exception: " + eMsg); + span.end(); + } + } + + /** + * Log Exception & if span exists, add error to span + * @param s + * @param span + * @return + */ + private Consumer genericError(String s, Span span) { + return c -> { + String msg = "Error in " + s + "::" + c.getMessage(); + log.error(msg); + if (span != null) { + log.info("generic message - span"); + span.setStatus(StatusCode.ERROR, msg); + span.end(); + } + }; + } + + private Mono resolveUserNew(XMessage xmsg) { + try { + SenderReceiverInfo from = xmsg.getFrom(); + String appName = xmsg.getApp(); + + String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); + String encodedBase64Key = encodeKey(secret); + String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); + ClientResponse response = campaignService.fusionAuthClient + .retrieveUserByUsername(deviceID); + if (response.wasSuccessful()) { + from.setDeviceID(response.successResponse.user.id.toString()); + xmsg.setFrom(from); + return xmsgCampaignForm(xmsg, response.successResponse.user); + } else { + return botService.updateUser(deviceString, appName) + .flatMap(new Function, Mono>() { + @Override + public Mono apply(Pair result) { + if (result.getLeft()) { + from.setDeviceID(result.getRight()); + xmsg.setFrom(from); + ClientResponse response = campaignService.fusionAuthClient + .retrieveUserByUsername(deviceID); + if (response.wasSuccessful()) { + return xmsgCampaignForm(xmsg, response.successResponse.user); + } else { + return Mono.just(xmsg); + } + } else { + xmsg.setFrom(null); + return Mono.just(xmsg); + } + } + }) + .doOnError(genericError("updateUser", null)); + } + } catch (Exception e) { + e.printStackTrace(); + log.error("Exception in resolveUser" + e.getMessage()); + xmsg.setFrom(null); + return Mono.just(xmsg); + } + } + + private Mono xmsgCampaignForm(XMessage xmsg, User user) { + return campaignService.getCampaignFromNameTransformer(xmsg.getCampaign()) + .map(new Function() { + @Override + public XMessage apply(JsonNode campaign) { + String campaignID = campaign.findValue("id").asText(); + Map formIDs = getCampaignFormIds(campaign); + String currentFormID = getCurrentFormId(xmsg, campaignID, formIDs, user); + + HashMap metaData = new HashMap(); + metaData.put("campaignId", campaignID); + metaData.put("currentFormID", currentFormID); + + saveCurrentFormID(xmsg.getFrom().getUserID(), campaignID, currentFormID); + + Transformer transf = new Transformer(); + transf.setId("test"); + transf.setMetaData(metaData); + + ArrayList transformers = new ArrayList(); + transformers.add(transf); + + xmsg.setTransformers(transformers); + + try { + System.out.println("XML:" + xmsg.toXML()); + } catch (JAXBException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return xmsg; + } + }) + .doOnError(genericError("getCampaignFromNameTransformer", null)); + } + + /** + * Get all form ids from the campaign node + * + * @param campaign + * @return + */ + private Map getCampaignFormIds(JsonNode campaign) { + ArrayList formIDs = new ArrayList(); + Map formIDs2 = new HashMap(); + try { + campaign.findValue("logicIDs").forEach(t -> { + formIDs2.put(t.asText(), ""); + }); + + campaign.findValue("logic").forEach(t -> { + formIDs2.put(t.findValue("id").asText(), t.findValue("formID").asText()); + }); + + System.out.println("formIDs:" + formIDs2); + } catch (Exception e) { + e.printStackTrace(); + } + return formIDs2; + } + + /** + * Get the current from id + * + * @param xmsg + * @param campaignID + * @param formIDs + * @return + */ + private String getCurrentFormId(XMessage xmsg, String campaignID, Map formIDs, User user) { + String currentFormID = ""; + + Boolean consent = checkUserCampaignConsent(campaignID, user); + + /* Fetch current form id from file for user & campaign */ + currentFormID = getCurrentFormIDFromFile(xmsg.getFrom().getUserID(), campaignID); + + /* + * if current form id is empty, then set the first form id as current form id + * else if current form id is equal to consent form id + * + */ + System.out.println("currentFormID:" + currentFormID); + if (currentFormID == null || currentFormID.isEmpty()) { + if (!formIDs.isEmpty() && formIDs.size() > 0) { currentFormID = formIDs.values().toArray()[0].toString(); } - } - - /* if current form is consent form */ - if(currentFormID.equals(getConsentFormID())) { - /* if consent already exists, set next form as current one, - * else check the response for further details */ - if(consent) { - currentFormID = formIDs.values().toArray()[1].toString(); - } else { - String response = xmsg.getPayload().getText(); - if(response.equals("1")) { - //update fusion auth client for consent & set the next form id as current form id - addUserCampaignConsent(campaignID, user); - currentFormID = formIDs.values().toArray()[1].toString(); - } else if(response.equals("2")) { - //drop conversation - currentFormID = ""; - System.out.println("drop conversation."); - } else { - // invalid response, leave the consent form id as current - } - } - } - - System.out.println(currentFormID); - - return currentFormID; - } - - /** - * Get current form id set in file for user & campaign - * - * @param userID - * @param campaignID - * @return - */ - private String getCurrentFormIDFromFile(String userID, String campaignID) { - String currentFormID = ""; - Resource resource = new ClassPathResource(getJsonFilePath()); - try { - ObjectMapper mapper = new ObjectMapper(); - - InputStream inputStream = resource.getInputStream(); - - byte[] bdata = FileCopyUtils.copyToByteArray(inputStream); - - JsonNode rootNode = mapper.readTree(bdata); - - if(!rootNode.isEmpty() && rootNode.get(userID) != null - && rootNode.path(userID).get(campaignID) != null) { - currentFormID = rootNode.path(userID).get(campaignID).asText(); - } - } catch (IOException e) { - e.printStackTrace(); - } - return currentFormID; - } - - /** - * Save current form id in file for user & campaign - * - * @param userID - * @param campaignID - * @param currentFormID - */ + } + + /* if current form is consent form */ + if (currentFormID.equals(getConsentFormID())) { + /* + * if consent already exists, set next form as current one, else check the + * response for further details + */ + if (consent) { + currentFormID = formIDs.values().toArray()[1].toString(); + } else { + String response = xmsg.getPayload().getText(); + if (response.equals("1")) { + // update fusion auth client for consent & set the next form id as current form + // id + addUserCampaignConsent(campaignID, user); + currentFormID = formIDs.values().toArray()[1].toString(); + } else if (response.equals("2")) { + // drop conversation + currentFormID = ""; + System.out.println("drop conversation."); + } else { + // invalid response, leave the consent form id as current + } + } + } + + System.out.println(currentFormID); + + return currentFormID; + } + + /** + * Get current form id set in file for user & campaign + * + * @param userID + * @param campaignID + * @return + */ + private String getCurrentFormIDFromFile(String userID, String campaignID) { + String currentFormID = ""; + Resource resource = new ClassPathResource(getJsonFilePath()); + try { + ObjectMapper mapper = new ObjectMapper(); + + InputStream inputStream = resource.getInputStream(); + + byte[] bdata = FileCopyUtils.copyToByteArray(inputStream); + + JsonNode rootNode = mapper.readTree(bdata); + + if (!rootNode.isEmpty() && rootNode.get(userID) != null && rootNode.path(userID).get(campaignID) != null) { + currentFormID = rootNode.path(userID).get(campaignID).asText(); + } + } catch (IOException e) { + e.printStackTrace(); + } + return currentFormID; + } + + /** + * Save current form id in file for user & campaign + * + * @param userID + * @param campaignID + * @param currentFormID + */ private void saveCurrentFormID(String userID, String campaignID, String currentFormID) { - Resource resource = new ClassPathResource(getJsonFilePath()); - try { - ObjectMapper mapper = new ObjectMapper(); - - File file = resource.getFile(); - InputStream inputStream = resource.getInputStream(); - - byte[] bdata = FileCopyUtils.copyToByteArray(inputStream); - - JsonNode rootNode = mapper.readTree(bdata); - if(rootNode.isEmpty()) { - rootNode = mapper.createObjectNode(); - } - - if(!rootNode.isEmpty() && rootNode.get(userID) != null) { - ((ObjectNode) rootNode.path(userID)).put(campaignID, currentFormID); - } else { - JsonNode campaignNode = mapper.createObjectNode(); - ((ObjectNode) campaignNode).put(campaignID, currentFormID); - - ((ObjectNode) rootNode).put(userID, campaignNode); - } - - System.out.println("Saved File String:"+rootNode.toString()); - - FileWriter fileWriter = new FileWriter(file); - fileWriter.write(rootNode.toString()); - fileWriter.flush(); - fileWriter.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private String getJsonFilePath() { - return "userCurrentForm.json"; - } - + Resource resource = new ClassPathResource(getJsonFilePath()); + try { + ObjectMapper mapper = new ObjectMapper(); + + File file = resource.getFile(); + InputStream inputStream = resource.getInputStream(); + + byte[] bdata = FileCopyUtils.copyToByteArray(inputStream); + + JsonNode rootNode = mapper.readTree(bdata); + if (rootNode.isEmpty()) { + rootNode = mapper.createObjectNode(); + } + + if (!rootNode.isEmpty() && rootNode.get(userID) != null) { + ((ObjectNode) rootNode.path(userID)).put(campaignID, currentFormID); + } else { + JsonNode campaignNode = mapper.createObjectNode(); + ((ObjectNode) campaignNode).put(campaignID, currentFormID); + + ((ObjectNode) rootNode).put(userID, campaignNode); + } + + System.out.println("Saved File String:" + rootNode.toString()); + + FileWriter fileWriter = new FileWriter(file); + fileWriter.write(rootNode.toString()); + fileWriter.flush(); + fileWriter.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private String getJsonFilePath() { + return "userCurrentForm.json"; + } + // private - - private String getConsentFormID() { - return "mandatory-consent-v1"; - } - - private Mono resolveUser(SenderReceiverInfo from, String appName) { - try { - String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); - String encodedBase64Key = encodeKey(secret); - String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); - ClientResponse response = campaignService.fusionAuthClient.retrieveUserByUsername(deviceID); - if (response.wasSuccessful()) { - from.setDeviceID(response.successResponse.user.id.toString()); + + private String getConsentFormID() { + return "mandatory-consent-v1"; + } + + private Mono resolveUser(SenderReceiverInfo from, String appName) { + try { + String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); + String encodedBase64Key = encodeKey(secret); + String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); + ClientResponse response = campaignService.fusionAuthClient + .retrieveUserByUsername(deviceID); + if (response.wasSuccessful()) { + from.setDeviceID(response.successResponse.user.id.toString()); // checkConsent(from.getCampaignID(), response.successResponse.user); - return Mono.just(from); - } else { - return botService.updateUser(deviceString, appName) - .flatMap(new Function, Mono>() { - @Override - public Mono apply(Pair result) { - if (result.getLeft()) { - from.setDeviceID(result.getRight()); - return Mono.just(from); - } else { - return Mono.just(null); - } - } - }).doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in updateUser" + throwable.getMessage()); - } - }); - } - } catch (Exception e) { - e.printStackTrace(); - log.error("Error in resolveUser" + e.getMessage()); - return Mono.just(null); - } - } - - private Boolean checkUserCampaignConsent(String campaignID, User user) { - Boolean consent = false; - try { - Object consentData = user.data.get("consent"); - ArrayList consentArray = (ArrayList) consentData; - if(consentArray != null && consentArray.contains(campaignID)) { - consent = true; - } - } catch (Exception e) { - e.printStackTrace(); - } - return consent; - } - - private void addUserCampaignConsent(String campaignID, User user) - { - try { - Object consentData = user.data.get("consent"); + return Mono.just(from); + } else { + return botService.updateUser(deviceString, appName) + .flatMap(new Function, Mono>() { + @Override + public Mono apply(Pair result) { + if (result.getLeft()) { + from.setDeviceID(result.getRight()); + return Mono.just(from); + } else { + return Mono.just(null); + } + } + }).doOnError(new Consumer() { + @Override + public void accept(Throwable throwable) { + log.error("Error in updateUser" + throwable.getMessage()); + } + }); + } + } catch (Exception e) { + e.printStackTrace(); + log.error("Error in resolveUser" + e.getMessage()); + return Mono.just(null); + } + } + + private Boolean checkUserCampaignConsent(String campaignID, User user) { + Boolean consent = false; + try { + Object consentData = user.data.get("consent"); ArrayList consentArray = (ArrayList) consentData; - - if(consentArray == null || ( - consentArray != null && !consentArray.contains(campaignID)) - ){ - consentArray.add(campaignID); - - user.data.put("consent", consentArray); - - updateFAUser(user); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void updateFAUser(User user) { - System.out.println(user); - UserRequest r = new UserRequest(user); - - ClientResponse response = campaignService.fusionAuthClient.updateUser(user.id, r); - if(response.wasSuccessful()) { - System.out.println("user update success"); + if (consentArray != null && consentArray.contains(campaignID)) { + consent = true; + } + } catch (Exception e) { + e.printStackTrace(); + } + return consent; + } + + private void addUserCampaignConsent(String campaignID, User user) { + try { + Object consentData = user.data.get("consent"); + ArrayList consentArray = (ArrayList) consentData; + + if (consentArray == null || (consentArray != null && !consentArray.contains(campaignID))) { + consentArray.add(campaignID); + + user.data.put("consent", consentArray); + + updateFAUser(user); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void updateFAUser(User user) { + log.error("User: "+user); + UserRequest r = new UserRequest(user); + + ClientResponse response = campaignService.fusionAuthClient.updateUser(user.id, r); + if (response.wasSuccessful()) { + log.error("user update success"); } else { - System.out.println("error in user update"+response.errorResponse); + log.error("error in user update" + response.errorResponse); + } + } + + private void logTimeTaken(long startTime, int checkpointID) { + long endTime = System.nanoTime(); + long duration = (endTime - startTime) / 1000000; + log.info(String.format("CP-%d: %d ms", checkpointID, duration)); + } + + private Mono getLastMessageID(XMessage msg) { + if (msg.getMessageType().toString().equalsIgnoreCase("text")) { + return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT") + .map(new Function() { + @Override + public String apply(XMessageDAO msg1) { + if (msg1.getId() == null) { + System.out.println("cError"); + return ""; + } + return String.valueOf(msg1.getId()); + } + }); + + } else if (msg.getMessageType().toString().equalsIgnoreCase("button")) { + return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT") + .map(new Function() { + @Override + public String apply(XMessageDAO lastMessage) { + return String.valueOf(lastMessage.getId()); + } + }); + } + return Mono.empty(); + } + + private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { + return xMessageRepository.findAllByUserIdAndTimestampAfter(userID, yesterday).collectList() + .map(new Function, XMessageDAO>() { + @Override + public XMessageDAO apply(List xMessageDAOS) { + if (xMessageDAOS.size() > 0) { + List filteredList = new ArrayList<>(); + for (XMessageDAO xMessageDAO : xMessageDAOS) { + if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name()) + || xMessageDAO.getMessageState().equals(XMessage.MessageState.REPLIED.name())) + filteredList.add(xMessageDAO); + } + if (filteredList.size() > 0) { + filteredList.sort(new Comparator() { + @Override + public int compare(XMessageDAO o1, XMessageDAO o2) { + return o1.getTimestamp().compareTo(o2.getTimestamp()); + } + }); + } + return xMessageDAOS.get(0); + } + return new XMessageDAO(); + } + }) + .doOnError(genericError("getLatestXMessage", null)); + } + + private Mono fetchAdapterID(String appName) { + return botService.getCurrentAdapter(appName); + } + + private Mono getAppName(String text, SenderReceiverInfo from) { + LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + log.info("Inside getAppName " + text + "::" + from.getUserID()); + if (text.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } else { + try { + log.info("Inside getAppName " + text + "::" + from.getUserID()); + return botService.getCampaignFromStartingMessage(text) + .flatMap(new Function>() { + @Override + public Mono apply(String appName1) { + log.info("Inside getCampaignFromStartingMessage => " + appName1); + if (appName1 == null || appName1.equals("")) { + try { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, + XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return (xMessageLast.getApp() == null + || xMessageLast.getApp().isEmpty()) + ? "finalAppName" + : xMessageLast.getApp(); + } + }); + } + } + return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") + : Mono.just(appName1); + } + }) + .doOnError(genericError("getCampaignFromStartingMessage" , null)); + } catch (Exception e) { + log.info("Inside getAppName - exception => " + e.getMessage()); + try { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } catch (Exception e2) { + return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()) + .map(new Function() { + @Override + public String apply(XMessageDAO xMessageLast) { + return xMessageLast.getApp(); + } + }); + } + } } - } - - private void logTimeTaken(long startTime, int checkpointID) { - long endTime = System.nanoTime(); - long duration = (endTime - startTime) / 1000000; - log.info(String.format("CP-%d: %d ms", checkpointID, duration)); - } - - private Mono getLastMessageID(XMessage msg) { - if (msg.getMessageType().toString().equalsIgnoreCase("text")) { - return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { - @Override - public String apply(XMessageDAO msg1) { - if (msg1.getId() == null) { - System.out.println("cError"); - return ""; - } - return String.valueOf(msg1.getId()); - } - }); - - } else if (msg.getMessageType().toString().equalsIgnoreCase("button")) { - return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function() { - @Override - public String apply(XMessageDAO lastMessage) { - return String.valueOf(lastMessage.getId()); - } - }); -// -// map(new Function() { -// @Override -// public String apply(XMessageDAO lastMessage) { -// return String.valueOf(lastMessage.getId()); -// } -// }); - } - return Mono.empty(); - } - - private Mono getLatestXMessage(String userID, LocalDateTime yesterday, String messageState) { - return xMessageRepository - .findAllByUserIdAndTimestampAfter(userID, yesterday).collectList() - .map(new Function, XMessageDAO>() { - @Override - public XMessageDAO apply(List xMessageDAOS) { - if (xMessageDAOS.size() > 0) { - List filteredList = new ArrayList<>(); - for (XMessageDAO xMessageDAO : xMessageDAOS) { - if (xMessageDAO.getMessageState().equals(XMessage.MessageState.SENT.name()) || - xMessageDAO.getMessageState().equals(XMessage.MessageState.REPLIED.name())) - filteredList.add(xMessageDAO); - } - if (filteredList.size() > 0) { - filteredList.sort(new Comparator() { - @Override - public int compare(XMessageDAO o1, XMessageDAO o2) { - return o1.getTimestamp().compareTo(o2.getTimestamp()); - } - }); - } - return xMessageDAOS.get(0); - } - return new XMessageDAO(); - } - }).doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getLatestXMessage" + throwable.getMessage()); - } - }); - } - - private Mono fetchAdapterID(String appName) { - return botService.getCurrentAdapter(appName); - } - - private Mono getAppName(String text, SenderReceiverInfo from) { - LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); - log.info("Inside getAppName " + text + "::" + from.getUserID()); - if (text.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } else { - try { - log.info("Inside getAppName " + text + "::" + from.getUserID()); - return botService.getCampaignFromStartingMessage(text) - .flatMap(new Function>() { - @Override - public Mono apply(String appName1) { - log.info("Inside getCampaignFromStartingMessage => " + appName1); - if (appName1 == null || appName1.equals("")) { - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return (xMessageLast.getApp() == null || xMessageLast.getApp().isEmpty()) ? "finalAppName" : xMessageLast.getApp(); - } - }); - } - } - return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1); - } - }).doOnError(new Consumer() { - @Override - public void accept(Throwable throwable) { - log.error("Error in getCampaignFromStartingMessage" + throwable.getMessage()); - } - }); - } catch (Exception e) { - log.info("Inside getAppName - exception => " + e.getMessage()); - try { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } catch (Exception e2) { - return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function() { - @Override - public String apply(XMessageDAO xMessageLast) { - return xMessageLast.getApp(); - } - }); - } - } - } - } + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index eb1f681..848383c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,3 +51,10 @@ spring.r2dbc.password=${FORMS_DB_PASSWORD} caffeine.cache.max.size=${CAFFEINE_CACHE_MAX_SIZE:#{1000}} caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} +#Opentelemetry Lighstep Config +opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} +opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} +opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} +opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} + diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..73bfb43 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + From 51307f1588b8194d3388f0f2f3db49f45b01c6cf Mon Sep 17 00:00:00 2001 From: Surabhi Date: Fri, 3 Dec 2021 11:29:00 +0530 Subject: [PATCH 2/8] remove getAppName request - get app name from xmsg --- .../Consumer/ReactiveConsumer.java | 115 +++++++----------- 1 file changed, 47 insertions(+), 68 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 24e8d88..fcb4f31 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -130,70 +130,52 @@ public void accept(ReceiverRecord stringMessage) { Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan(); try (Scope scope = rootSpan.makeCurrent()) { Context currentContext = Context.current(); - Span childSpan1 = createChildSpan("getAppName", currentContext, rootSpan); SenderReceiverInfo from = msg.getFrom(); + Span childSpan1 = createChildSpan("fetchAdapterID", currentContext, rootSpan); logTimeTaken(startTime, 1); - getAppName(msg.getPayload().getText(), msg.getFrom()).doOnNext(new Consumer() { + fetchAdapterID(msg.getApp()).doOnNext(new Consumer() { @Override - public void accept(String appName) { + public void accept(String adapterID) { childSpan1.end(); - Span childSpan2 = createChildSpan("fetchAdapterID", currentContext, rootSpan); + Span childSpan2 = createChildSpan("resolveUserNew", currentContext, rootSpan); logTimeTaken(startTime, 2); - msg.setApp(appName); - fetchAdapterID(appName).doOnNext(new Consumer() { + from.setCampaignID(msg.getApp()); + from.setDeviceType(DeviceType.PHONE); + resolveUserNew(msg).doOnNext(new Consumer() { @Override - public void accept(String adapterID) { + public void accept(XMessage msg) { childSpan2.end(); - Span childSpan3 = createChildSpan("resolveUserNew", currentContext, rootSpan); - logTimeTaken(startTime, 3); - from.setCampaignID(appName); - from.setDeviceType(DeviceType.PHONE); - resolveUserNew(msg).doOnNext(new Consumer() { - @Override - public void accept(XMessage msg) { - childSpan3.end(); - Span childSpan4 = createChildSpan("getLastMessageID", currentContext, rootSpan); - SenderReceiverInfo from = msg.getFrom(); - // msg.setFrom(from); - msg.setApp(appName); - getLastMessageID(msg).doOnNext(lastMessageID -> { + Span childSpan3 = createChildSpan("getLastMessageID", currentContext, rootSpan); + SenderReceiverInfo from = msg.getFrom(); + // msg.setFrom(from); + getLastMessageID(msg).doOnNext(lastMessageID -> { + childSpan3.end(); + Span childSpan4 = createChildSpan("sendMessageToKafka", currentContext, + rootSpan); + logTimeTaken(startTime, 3); + msg.setLastMessageID(lastMessageID); + msg.setAdapterId(adapterID); + if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) + || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { + try { + kafkaProducer.send(odkTransformerTopic, msg.toXML()); childSpan4.end(); - Span childSpan5 = createChildSpan("sendMessageToKafka", currentContext, rootSpan); - logTimeTaken(startTime, 4); - msg.setLastMessageID(lastMessageID); - msg.setAdapterId(adapterID); - if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) - || msg.getMessageState() - .equals(XMessage.MessageState.OPTED_IN)) { - try { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); - childSpan5.end(); - rootSpan.end(); - // reactiveProducer.sendMessages(odkTransformerTopic, - // msg.toXML()); - } catch (JAXBException e) { - e.printStackTrace(); - } - logTimeTaken(startTime, 15); - } - }) - .doOnError(genericError("getLastMessageID", childSpan4)) - .subscribe(); + rootSpan.end(); + // reactiveProducer.sendMessages(odkTransformerTopic, + // msg.toXML()); + } catch (JAXBException e) { + e.printStackTrace(); + } + logTimeTaken(startTime, 15); } - }) - .doOnError(genericError("resolveUserNew", childSpan3)) - .subscribe(); - + }).doOnError(genericError("getLastMessageID", childSpan3)).subscribe(); } - }) - .doOnError(genericError("fetchAdapterID", childSpan2)) - .subscribe(); + }).doOnError(genericError("resolveUserNew", childSpan2)).subscribe(); + } - }) - .doOnError(genericError("getAppName", childSpan1)) - .subscribe(); + }).doOnError(genericError("fetchAdapterID", childSpan1)).subscribe(); - } catch(Throwable e) { + } catch (Throwable e) { genericException(e.getMessage(), rootSpan); } finally { // rootSpan.end(); @@ -202,14 +184,13 @@ public void accept(XMessage msg) { e.printStackTrace(); } } - }) - .doOnError(genericError("KafkaFlux", null)) - .subscribe(); + }).doOnError(genericError("KafkaFlux", null)).subscribe(); } - + /** * Create Child Span with current context & parent span + * * @param spanName * @param context * @param parentSpan @@ -219,23 +200,25 @@ private Span createChildSpan(String spanName, Context context, Span parentSpan) String prefix = "orchestrator-"; return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan(); } - + /** * Log Exceptions & if span exists, add error to span + * * @param eMsg * @param span */ private void genericException(String eMsg, Span span) { eMsg = "Exception: " + eMsg; log.error(eMsg); - if(span != null) { + if (span != null) { span.setStatus(StatusCode.ERROR, "Exception: " + eMsg); span.end(); } } /** - * Log Exception & if span exists, add error to span + * Log Exception & if span exists, add error to span + * * @param s * @param span * @return @@ -286,8 +269,7 @@ public Mono apply(Pair result) { return Mono.just(xmsg); } } - }) - .doOnError(genericError("updateUser", null)); + }).doOnError(genericError("updateUser", null)); } } catch (Exception e) { e.printStackTrace(); @@ -330,8 +312,7 @@ public XMessage apply(JsonNode campaign) { return xmsg; } - }) - .doOnError(genericError("getCampaignFromNameTransformer", null)); + }).doOnError(genericError("getCampaignFromNameTransformer", null)); } /** @@ -566,7 +547,7 @@ private void addUserCampaignConsent(String campaignID, User user) { } private void updateFAUser(User user) { - log.error("User: "+user); + log.error("User: " + user); UserRequest r = new UserRequest(user); ClientResponse response = campaignService.fusionAuthClient.updateUser(user.id, r); @@ -633,8 +614,7 @@ public int compare(XMessageDAO o1, XMessageDAO o2) { } return new XMessageDAO(); } - }) - .doOnError(genericError("getLatestXMessage", null)); + }).doOnError(genericError("getLatestXMessage", null)); } private Mono fetchAdapterID(String appName) { @@ -700,8 +680,7 @@ public String apply(XMessageDAO xMessageLast) { return (appName1 == null || appName1.isEmpty()) ? Mono.just("finalAppName") : Mono.just(appName1); } - }) - .doOnError(genericError("getCampaignFromStartingMessage" , null)); + }).doOnError(genericError("getCampaignFromStartingMessage", null)); } catch (Exception e) { log.info("Inside getAppName - exception => " + e.getMessage()); try { From bfceff244d8f77470e2692ef54ad73dec38f0c3f Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 10 Jan 2022 16:00:56 +0530 Subject: [PATCH 3/8] changes --- pom.xml | 4 ++++ .../Application/AppConfigOrchestrator.java | 20 ++++++++++--------- .../Consumer/ReactiveConsumer.java | 18 +++++++++++------ src/main/resources/application.properties | 2 +- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 81aa3c8..97d32fd 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,10 @@ org.springframework.boot spring-boot-starter-validation + org.springframework.boot spring-boot-starter-web-services diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index a712432..109efc0 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -6,7 +6,9 @@ import com.uci.utils.kafka.ReactiveProducer; import io.fusionauth.client.FusionAuthClient; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.kie.api.io.Resource; import org.kie.api.runtime.KieSession; import org.kie.internal.io.ResourceFactory; @@ -67,12 +69,12 @@ public CampaignService getCampaignService() { return new CampaignService(webClient, fusionAuthClient, cache); } - @Bean - public KieSession DroolSession() { - Resource resource = ResourceFactory.newClassPathResource("OrchestratorRules.xlsx", getClass()); - KieSession kSession = new DroolsBeanFactory().getKieSession(resource); - return kSession; - } +// @Bean +// public KieSession DroolSession() { +// Resource resource = ResourceFactory.newClassPathResource("OrchestratorRules.xlsx", getClass()); +// KieSession kSession = new DroolsBeanFactory().getKieSession(resource); +// return kSession; +// } @Bean Map kafkaConsumerConfiguration() { @@ -100,7 +102,7 @@ Map kafkaProducerConfiguration() { ReceiverOptions kafkaReceiverOptions(@Value("${inboundProcessed}") String[] inTopicName) { ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration()); return options.subscription(Arrays.asList(inTopicName)) - .withKeyDeserializer(new JsonDeserializer<>()) + .withKeyDeserializer(new StringDeserializer()) .withValueDeserializer(new JsonDeserializer()); } @@ -110,8 +112,8 @@ SenderOptions kafkaSenderOptions() { } @Bean - Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { - return KafkaReceiver.create(kafkaReceiverOptions).receive(); + Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { + return KafkaReceiver.create(kafkaReceiverOptions).receiveAtmostOnce(); } @Bean diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index fcb4f31..b7019be 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -16,6 +16,7 @@ import com.uci.utils.encryption.AESWrapper; import com.uci.utils.kafka.ReactiveProducer; import com.uci.utils.kafka.SimpleProducer; +import com.uci.utils.kafka.SimpleProducer1; import io.fusionauth.domain.api.UserConsentResponse; import io.fusionauth.domain.api.UserRequest; @@ -35,6 +36,8 @@ import messagerosa.core.model.XMessage; import messagerosa.xml.XMessageParser; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.protocol.types.Field; import org.apache.tomcat.util.json.JSONParser; import org.json.JSONArray; @@ -81,7 +84,7 @@ @Slf4j public class ReactiveConsumer { - private final Flux> reactiveKafkaReceiver; + private final Flux> reactiveKafkaReceiver; // @Autowired // public KieSession kSession; @@ -90,7 +93,7 @@ public class ReactiveConsumer { public XMessageRepository xMessageRepository; @Autowired - public SimpleProducer kafkaProducer; + public SimpleProducer1 kafkaProducer; @Autowired public ReactiveProducer reactiveProducer; @@ -117,16 +120,19 @@ public class ReactiveConsumer { @EventListener(ApplicationStartedEvent.class) public void onMessage() { - reactiveKafkaReceiver.doOnNext(new Consumer>() { + reactiveKafkaReceiver.doOnNext(new Consumer>() { @Override - public void accept(ReceiverRecord stringMessage) { + public void accept(ConsumerRecord stringMessage) { try { + + log.info("headers:"+stringMessage.headers()); final long startTime = System.nanoTime(); logTimeTaken(startTime, 0); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); -// Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg, null); -// extracted.getCurrentSpan(); + Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), stringMessage.headers(), null); + log.info("extracted: "+extracted); + // extracted.getCurrentSpan(); Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan(); try (Scope scope = rootSpan.makeCurrent()) { Context currentContext = Context.current(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6ccd8d6..848383c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -40,7 +40,7 @@ fusionauth.key = ${FUSIONAUTH_KEY} encryptionKeyString=A%C*F-JaNdRgUkXp -spring.r2dbc.url=r2dbc:${FORMS_DB_URL} +spring.r2dbc.url=r2dbc:postgresql://${FORMS_DB_HOST}:${FORMS_DB_PORT}/${FORMS_DB_NAME} postgresql.db.host=${FORMS_DB_HOST} postgresql.db.port=${FORMS_DB_PORT} spring.r2dbc.name=${FORMS_DB_NAME} From 97877eb3209dd88e83a7e5d884c21350cb3c861c Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 10 Jan 2022 16:10:32 +0530 Subject: [PATCH 4/8] changes --- .../Application/AppConfigOrchestrator.java | 12 ++++++------ src/main/resources/application.properties | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java index 109efc0..98131f3 100644 --- a/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java +++ b/src/main/java/com/uci/orchestrator/Application/AppConfigOrchestrator.java @@ -69,12 +69,12 @@ public CampaignService getCampaignService() { return new CampaignService(webClient, fusionAuthClient, cache); } -// @Bean -// public KieSession DroolSession() { -// Resource resource = ResourceFactory.newClassPathResource("OrchestratorRules.xlsx", getClass()); -// KieSession kSession = new DroolsBeanFactory().getKieSession(resource); -// return kSession; -// } + @Bean + public KieSession DroolSession() { + Resource resource = ResourceFactory.newClassPathResource("OrchestratorRules.xlsx", getClass()); + KieSession kSession = new DroolsBeanFactory().getKieSession(resource); + return kSession; + } @Bean Map kafkaConsumerConfiguration() { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 848383c..6ccd8d6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -40,7 +40,7 @@ fusionauth.key = ${FUSIONAUTH_KEY} encryptionKeyString=A%C*F-JaNdRgUkXp -spring.r2dbc.url=r2dbc:postgresql://${FORMS_DB_HOST}:${FORMS_DB_PORT}/${FORMS_DB_NAME} +spring.r2dbc.url=r2dbc:${FORMS_DB_URL} postgresql.db.host=${FORMS_DB_HOST} postgresql.db.port=${FORMS_DB_PORT} spring.r2dbc.name=${FORMS_DB_NAME} From d800e91a1d6f222c1df11c5dbe1e4f06b41a18f1 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 17 Jan 2022 18:39:48 +0530 Subject: [PATCH 5/8] opentelemetry spans removed --- .../Consumer/ReactiveConsumer.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index b7019be..46a4a24 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -93,7 +93,7 @@ public class ReactiveConsumer { public XMessageRepository xMessageRepository; @Autowired - public SimpleProducer1 kafkaProducer; + public SimpleProducer kafkaProducer; @Autowired public ReactiveProducer reactiveProducer; @@ -130,34 +130,21 @@ public void accept(ConsumerRecord stringMessage) { logTimeTaken(startTime, 0); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); - Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), stringMessage.headers(), null); - log.info("extracted: "+extracted); - // extracted.getCurrentSpan(); - Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan(); - try (Scope scope = rootSpan.makeCurrent()) { - Context currentContext = Context.current(); + try { SenderReceiverInfo from = msg.getFrom(); - Span childSpan1 = createChildSpan("fetchAdapterID", currentContext, rootSpan); logTimeTaken(startTime, 1); fetchAdapterID(msg.getApp()).doOnNext(new Consumer() { @Override public void accept(String adapterID) { - childSpan1.end(); - Span childSpan2 = createChildSpan("resolveUserNew", currentContext, rootSpan); logTimeTaken(startTime, 2); from.setCampaignID(msg.getApp()); from.setDeviceType(DeviceType.PHONE); resolveUserNew(msg).doOnNext(new Consumer() { @Override public void accept(XMessage msg) { - childSpan2.end(); - Span childSpan3 = createChildSpan("getLastMessageID", currentContext, rootSpan); SenderReceiverInfo from = msg.getFrom(); // msg.setFrom(from); getLastMessageID(msg).doOnNext(lastMessageID -> { - childSpan3.end(); - Span childSpan4 = createChildSpan("sendMessageToKafka", currentContext, - rootSpan); logTimeTaken(startTime, 3); msg.setLastMessageID(lastMessageID); msg.setAdapterId(adapterID); @@ -165,8 +152,6 @@ public void accept(XMessage msg) { || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { kafkaProducer.send(odkTransformerTopic, msg.toXML()); - childSpan4.end(); - rootSpan.end(); // reactiveProducer.sendMessages(odkTransformerTopic, // msg.toXML()); } catch (JAXBException e) { @@ -174,17 +159,15 @@ public void accept(XMessage msg) { } logTimeTaken(startTime, 15); } - }).doOnError(genericError("getLastMessageID", childSpan3)).subscribe(); + }).doOnError(genericError("getLastMessageID")).subscribe(); } - }).doOnError(genericError("resolveUserNew", childSpan2)).subscribe(); + }).doOnError(genericError("resolveUserNew")).subscribe(); } - }).doOnError(genericError("fetchAdapterID", childSpan1)).subscribe(); + }).doOnError(genericError("fetchAdapterID")).subscribe(); } catch (Throwable e) { - genericException(e.getMessage(), rootSpan); - } finally { -// rootSpan.end(); + genericException(e.getMessage()); } } catch (Exception e) { e.printStackTrace(); @@ -221,6 +204,16 @@ private void genericException(String eMsg, Span span) { span.end(); } } + + /** + * Log Exceptions + * + * @param eMsg + */ + private void genericException(String eMsg) { + eMsg = "Exception: " + eMsg; + log.error(eMsg); + } /** * Log Exception & if span exists, add error to span @@ -240,6 +233,19 @@ private Consumer genericError(String s, Span span) { } }; } + + /** + * Log Exception + * + * @param s + * @return + */ + private Consumer genericError(String s) { + return c -> { + String msg = "Error in " + s + "::" + c.getMessage(); + log.error(msg); + }; + } private Mono resolveUserNew(XMessage xmsg) { try { From 2276bafd9ed10d9a7c0a3b52f61a24254a165ee2 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Tue, 18 Jan 2022 12:49:59 +0530 Subject: [PATCH 6/8] change --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 97d32fd..693a97f 100644 --- a/pom.xml +++ b/pom.xml @@ -28,10 +28,10 @@ org.springframework.boot spring-boot-starter-validation - + org.springframework.boot spring-boot-starter-web-services From 32b386fc0b2fe78a5195b316310132c87929ef65 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Wed, 19 Jan 2022 09:59:11 +0530 Subject: [PATCH 7/8] context propagation & extract --- .../Consumer/ReactiveConsumer.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 46a4a24..0ec36d5 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -16,7 +16,8 @@ import com.uci.utils.encryption.AESWrapper; import com.uci.utils.kafka.ReactiveProducer; import com.uci.utils.kafka.SimpleProducer; -import com.uci.utils.kafka.SimpleProducer1; +import com.uci.utils.kafka.adapter.TextMapGetterAdapter; +import com.uci.utils.kafka.RecordProducer; import io.fusionauth.domain.api.UserConsentResponse; import io.fusionauth.domain.api.UserRequest; @@ -27,6 +28,7 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; import io.fusionauth.domain.User; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -38,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.protocol.types.Field; import org.apache.tomcat.util.json.JSONParser; import org.json.JSONArray; @@ -93,7 +96,7 @@ public class ReactiveConsumer { public XMessageRepository xMessageRepository; @Autowired - public SimpleProducer kafkaProducer; + public RecordProducer kafkaProducer; @Autowired public ReactiveProducer reactiveProducer; @@ -123,10 +126,11 @@ public void onMessage() { reactiveKafkaReceiver.doOnNext(new Consumer>() { @Override public void accept(ConsumerRecord stringMessage) { - try { - - log.info("headers:"+stringMessage.headers()); - final long startTime = System.nanoTime(); + Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), stringMessage.headers(), TextMapGetterAdapter.getter); + log.info("Opentelemetry extracted context : "+extractedContext); + + try (Scope scope = extractedContext.makeCurrent()) { + final long startTime = System.nanoTime(); logTimeTaken(startTime, 0); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); @@ -151,7 +155,7 @@ public void accept(XMessage msg) { if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { - kafkaProducer.send(odkTransformerTopic, msg.toXML()); + kafkaProducer.send(odkTransformerTopic, msg.toXML(), Context.current()); // reactiveProducer.sendMessages(odkTransformerTopic, // msg.toXML()); } catch (JAXBException e) { From 799adf0c2a2a3d5ed1419ca9eb5f11484424b269 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Thu, 20 Jan 2022 11:34:57 +0530 Subject: [PATCH 8/8] spans --- .../Consumer/ReactiveConsumer.java | 25 +++++++++++++++++++ src/main/resources/application.properties | 10 ++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index 0ec36d5..78ecae5 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -137,9 +137,11 @@ public void accept(ConsumerRecord stringMessage) { try { SenderReceiverInfo from = msg.getFrom(); logTimeTaken(startTime, 1); + Span childSpan1 = createChildSpan("fetchAdapterID"); fetchAdapterID(msg.getApp()).doOnNext(new Consumer() { @Override public void accept(String adapterID) { + childSpan1.end(); logTimeTaken(startTime, 2); from.setCampaignID(msg.getApp()); from.setDeviceType(DeviceType.PHONE); @@ -148,16 +150,20 @@ public void accept(String adapterID) { public void accept(XMessage msg) { SenderReceiverInfo from = msg.getFrom(); // msg.setFrom(from); + Span childSpan2 = createChildSpan("getLastMessageID"); getLastMessageID(msg).doOnNext(lastMessageID -> { + childSpan2.end(); logTimeTaken(startTime, 3); msg.setLastMessageID(lastMessageID); msg.setAdapterId(adapterID); if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { + Span childSpan3 = createChildSpan("sendEventToKafka"); kafkaProducer.send(odkTransformerTopic, msg.toXML(), Context.current()); // reactiveProducer.sendMessages(odkTransformerTopic, // msg.toXML()); + childSpan3.end(); } catch (JAXBException e) { e.printStackTrace(); } @@ -193,6 +199,17 @@ private Span createChildSpan(String spanName, Context context, Span parentSpan) String prefix = "orchestrator-"; return tracer.spanBuilder(prefix + spanName).setParent(context.with(parentSpan)).startSpan(); } + + /** + * Create Child Span + * + * @param spanName + * @return childSpan + */ + private Span createChildSpan(String spanName) { + String prefix = "orchestratorSpan-"; + return tracer.spanBuilder(prefix + spanName).startSpan(); + } /** * Log Exceptions & if span exists, add error to span @@ -259,22 +276,28 @@ private Mono resolveUserNew(XMessage xmsg) { String deviceString = from.getDeviceType().toString() + ":" + from.getUserID(); String encodedBase64Key = encodeKey(secret); String deviceID = AESWrapper.encrypt(deviceString, encodedBase64Key); + Span childSpan1 = createChildSpan("FA-retrieveUserByUsername"); ClientResponse response = campaignService.fusionAuthClient .retrieveUserByUsername(deviceID); + childSpan1.end(); if (response.wasSuccessful()) { from.setDeviceID(response.successResponse.user.id.toString()); xmsg.setFrom(from); return xmsgCampaignForm(xmsg, response.successResponse.user); } else { + Span childSpan2 = createChildSpan("updateUser"); return botService.updateUser(deviceString, appName) .flatMap(new Function, Mono>() { @Override public Mono apply(Pair result) { + childSpan2.end(); if (result.getLeft()) { from.setDeviceID(result.getRight()); xmsg.setFrom(from); + Span childSpan3 = createChildSpan("FA-retrieveUserByUsername"); ClientResponse response = campaignService.fusionAuthClient .retrieveUserByUsername(deviceID); + childSpan3.end(); if (response.wasSuccessful()) { return xmsgCampaignForm(xmsg, response.successResponse.user); } else { @@ -296,10 +319,12 @@ public Mono apply(Pair result) { } private Mono xmsgCampaignForm(XMessage xmsg, User user) { + Span childSpan1 = createChildSpan("getCampaignFromNameTransformer"); return campaignService.getCampaignFromNameTransformer(xmsg.getCampaign()) .map(new Function() { @Override public XMessage apply(JsonNode campaign) { + childSpan1.end(); String campaignID = campaign.findValue("id").asText(); Map formIDs = getCampaignFormIds(campaign); String currentFormID = getCurrentFormId(xmsg, campaignID, formIDs, user); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f12b3a3..db7a818 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -53,9 +53,9 @@ caffeine.cache.max.size=0 caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} #Opentelemetry Lighstep Config -opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} -opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} -opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} -opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} -opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} +opentelemetry.lightstep.tracer=${LS_TRACER_NAME} +opentelemetry.lightstep.tracer.version=${LS_TRACER_VERSION} +opentelemetry.lightstep.service=${LS_SERVICE_NAME} +opentelemetry.lightstep.access.token=${LS_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${LS_END_POINT}