Skip to content

Commit

Permalink
Merge branch 'release-4.6.0' of https://github.com/samagra-comms/orch…
Browse files Browse the repository at this point in the history
…estrator into hotflix-kafka-v-1
  • Loading branch information
surabhi-mahawar committed Feb 11, 2022
2 parents 2c403c2 + b866ac4 commit 66af60e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 38 deletions.
12 changes: 9 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.7.RELEASE</version>
<version>2.5.7</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.uci</groupId>
Expand All @@ -24,6 +24,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web-services</artifactId>
Expand Down Expand Up @@ -67,13 +75,11 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.10.4</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.uci.orchestrator.Application;

import com.uci.dao.service.HealthService;
import com.github.benmanes.caffeine.cache.Cache;
import com.uci.orchestrator.Drools.DroolsBeanFactory;
import com.uci.utils.CampaignService;
import com.uci.utils.kafka.ReactiveProducer;
Expand All @@ -10,8 +10,9 @@
import org.kie.api.io.Resource;
import org.kie.api.runtime.KieSession;
import org.kie.internal.io.ResourceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand All @@ -31,6 +32,7 @@
import java.util.Map;

@Configuration
@EnableCaching
public class AppConfigOrchestrator {

@Value("${spring.kafka.bootstrap-servers}")
Expand All @@ -49,6 +51,9 @@ public class AppConfigOrchestrator {

@Value("${fusionauth.key}")
public String FUSIONAUTH_KEY;

@Autowired
public Cache<Object, Object> cache;

@Bean
public FusionAuthClient getFAClient() {
Expand All @@ -62,7 +67,7 @@ public CampaignService getCampaignService() {
.defaultHeader("admin-token", CAMPAIGN_ADMIN_TOKEN)
.build();
FusionAuthClient fusionAuthClient = getFAClient();
return new CampaignService(webClient, fusionAuthClient);
return new CampaignService(webClient, fusionAuthClient, cache);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,35 +106,32 @@ public class ReactiveConsumer {
private final String DEFAULT_APP_NAME = "Global Bot";
LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);


@EventListener(ApplicationStartedEvent.class)
public void onMessage() {

reactiveKafkaReceiver
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
try {
final long startTime = System.nanoTime();
logTimeTaken(startTime, 0);
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
System.out.println(msg.getTransformers());
// if(msg.getTransformers() != null) {
// msg.getTransformers().forEach((t)->{
// System.out.println(t.getId());
// System.out.println(t.getMetaData());
// });
// }
SenderReceiverInfo from = msg.getFrom();
logTimeTaken(startTime, 1);
getAppName(msg.getPayload().getText(), msg.getFrom())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String appName) {
logTimeTaken(startTime, 2);
msg.setApp(appName);
msg.setApp(appName);
fetchAdapterID(appName)
.doOnNext(new Consumer<String>() {
@Override
public void accept(String adapterID) {
logTimeTaken(startTime, 3);

from.setCampaignID(appName);
from.setDeviceType(DeviceType.PHONE);
resolveUserNew(msg)
Expand Down Expand Up @@ -610,7 +607,7 @@ private Mono<String> fetchAdapterID(String appName) {

private Mono<String> getAppName(String text, SenderReceiverInfo from) {
LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);
log.info("Inside getAppName" + text + "::" + from.getUserID());
log.info("Inside getAppName " + text + "::" + from.getUserID());
if (text.equals("")) {
try {
return getLatestXMessage(from.getUserID(), yesterday, XMessage.MessageState.SENT.name()).map(new Function<XMessageDAO, String>() {
Expand All @@ -629,6 +626,7 @@ public String apply(XMessageDAO xMessageLast) {
}
} else {
try {
log.info("Inside getAppName " + text + "::" + from.getUserID());
return botService.getCampaignFromStartingMessage(text)
.flatMap(new Function<String, Mono<? extends String>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ResponseEntity<JsonNode> cassandraStatusCheck() throws IOException, JsonP
@RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" })
public ResponseEntity<JsonNode> kafkaStatusCheck() throws IOException, JsonProcessingException {
JsonNode jsonNode = getResponseJsonNode();
((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode());
// ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode());

return ResponseEntity.ok(jsonNode);
}
Expand All @@ -60,23 +60,23 @@ public ResponseEntity<JsonNode> campaignUrlStatusCheck() throws JsonProcessingEx
return ResponseEntity.ok(jsonNode);
}

private static final Logger logger = LogManager.getLogger();

/*
* Test with default kafka appender
* telemetry object build internally via custom layout mentioned in xml by sent message
*/
@RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" })
public ResponseEntity<JsonNode> testKafkaLogAppender() throws JsonProcessingException, IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}");

logger.info("Info Test Message");

logger.error("Error Test Message");

return ResponseEntity.ok(jsonNode);
}
// private static final Logger logger = LogManager.getLogger();
//
// /*
// * Test with default kafka appender
// * telemetry object build internally via custom layout mentioned in xml by sent message
// */
// @RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" })
// public ResponseEntity<JsonNode> testKafkaLogAppender() throws JsonProcessingException, IOException {
// ObjectMapper mapper = new ObjectMapper();
// JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}");
//
// logger.info("Info Test Message");
//
// logger.error("Error Test Message");
//
// return ResponseEntity.ok(jsonNode);
// }

/**
* Returns json node for service response
Expand Down
23 changes: 18 additions & 5 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
#max.incremental.fetch.session.cache.slots

# Kafka
spring.kafka.bootstrap-servers=${BOOTSTRAP_SERVERS}
Expand All @@ -15,12 +14,13 @@ odk-transformer=${KAFKA_ODK_TRANSFORMER_TOPIC}
kafka.logs.topic = logs

# Cassandra
# spring.data.cassandra.contactpoints=${CASSANDRA_URL}
# spring.data.cassandra.port=${CASSANDRA_PORT}
# spring.data.cassandra.keyspace-name=${CASSANDRA_KEYSPACE}
#spring.data.cassandra.contactpoints=${CASSANDRA_URL}
#spring.data.cassandra.port=${CASSANDRA_PORT}
#spring.data.cassandra.keyspace-name=${CASSANDRA_KEYSPACE}
cassandra.contactpoints=${CASSANDRA_URL}
cassandra.port=${CASSANDRA_PORT}
keyspace-name=${CASSANDRA_KEYSPACE}
spring.data.cassandra.local-datacenter=datacenter1
logging.level.root=INFO
spring.data.cassandra.connect-timeout=10000ms
spring.data.cassandra.read-timeout=10000ms
Expand All @@ -33,9 +33,22 @@ logging.level.com.datastax.driver.core.QueryLogger.SLOW=TRACE

server.port=8686
campaign.url = ${CAMPAIGN_URL}
campaign.admin.token = ${CAMPAIGN_ADMIN_TOKEN}
campaign.admin.token = EXnYOvDx4KFqcQkdXqI38MHgFvnJcxMS

fusionauth.url = ${FUSIONAUTH_URL}
fusionauth.key = ${FUSIONAUTH_KEY}

encryptionKeyString=A%C*F-JaNdRgUkXp

spring.r2dbc.url=r2dbc:${FORMS_DB_URL}
postgresql.db.host=${FORMS_DB_HOST}
postgresql.db.port=${FORMS_DB_PORT}
spring.r2dbc.name=${FORMS_DB_NAME}
spring.r2dbc.username=${FORMS_DB_USERNAME}
spring.r2dbc.password=${FORMS_DB_PASSWORD}

#Caffeine Cache
#caffeine.cache.max.size=${CAFFEINE_CACHE_MAX_SIZE:#{1000}}
caffeine.cache.max.size=0
caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}}

0 comments on commit 66af60e

Please sign in to comment.