Skip to content

Commit

Permalink
Merge pull request #37 from samagra-comms/release-v2.0.0
Browse files Browse the repository at this point in the history
Pull Request for Merge release-v2.0.0 in Development
  • Loading branch information
pankajjangid05 authored Apr 27, 2023
2 parents d61cdd2 + 6096a17 commit 2433c25
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 19 deletions.
98 changes: 80 additions & 18 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import java.io.ByteArrayInputStream;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.Comparator;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -95,7 +95,9 @@ public class ReactiveConsumer {
private final String DEFAULT_APP_NAME = "Global Bot";
LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);


@Value("${broadcastNotificationChunkSize}")
private String broadcastNotificationChunkSize;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
public void onMessage(@Payload String stringMessage) {
try {
Expand Down Expand Up @@ -137,10 +139,41 @@ public void accept(XMessage msg) {

if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
try {
log.info("final msg.toXML(): "+msg.toXML().toString());
if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
log.info("final msg.toXML(): " + msg.toXML().toString());

if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
Integer chunkSize = null;
try {
chunkSize = Integer.parseInt(broadcastNotificationChunkSize);
} catch(NumberFormatException ex){
chunkSize = null;
}
if(chunkSize != null) {
if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null
&& msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 1;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
count++;
}
}
} else {
log.error("federatedUsers not found : " + msg.toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}
} else{
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}

} else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
kafkaProducer.send(genericTransformerTopic, msg.toXML());
} else {
kafkaProducer.send(odkTransformerTopic, msg.toXML());
Expand Down Expand Up @@ -216,7 +249,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) {
? transformerMeta.findValue("formID").asText()
: "");
if (transformerMeta.get("type") != null && transformerMeta.get("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer));
if(xMessage != null && xMessage.getFrom() != null && xMessage.getFrom().getMeta() != null && xMessage.getFrom().getMeta().containsKey("page")){
metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, xMessage.getFrom().getMeta().get("page")));
} else{
metaData.put("federatedUsers", getFederatedUsersMeta(botNode, transformer, null));
}
}

if (transformerMeta.findValue("hiddenFields") != null && !transformerMeta.findValue("hiddenFields").isEmpty()) {
Expand Down Expand Up @@ -261,11 +298,11 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) {
* @param transformer
* @return Federated users as json string
*/
private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer, String page) {
String botId = botNode.get("id").asText();

/* Get federated users from federation services */
JSONArray users = userService.getUsersFromFederatedServers(botId);
JSONArray users = userService.getUsersFromFederatedServers(botId, page);

/* Check if users, & related meta data exists in transformer */
if(users != null && transformer.get("meta") != null
Expand All @@ -278,7 +315,7 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
ObjectNode node = mapper.createObjectNode();
node.put("body", transformerMeta.get("body").asText());
node.put("type", transformerMeta.get("templateType").asText());

ArrayNode sampleData = mapper.createArrayNode();
for (int i = 0; i < users.length(); i++) {
ObjectNode userData = mapper.createObjectNode();
Expand All @@ -294,12 +331,12 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
sampleData.add(userData);
}
node.put("sampleData", sampleData);

/* Fetch user messages by template from template service */
ArrayList<JSONObject> usersMessage = userService.getUsersMessageByTemplate(node);

log.info("usersMessage: "+usersMessage);

/* Set User messages against the user phone */
ObjectNode federatedUsersMeta = mapper.createObjectNode();
ArrayNode userMetaData = mapper.createArrayNode();
Expand Down Expand Up @@ -409,10 +446,10 @@ public void accept(Throwable throwable) {
*/
private String getFAUserIdForApp(String deviceID, UUID appID) {
String userID = null;

Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID));
userID = result != null ? result.toString() : null;

if(userID == null || userID.isEmpty()) {
ClientResponse<UserResponse, Errors> response = botService.fusionAuthClient.retrieveUserByUsername(deviceID);

Expand Down Expand Up @@ -477,7 +514,7 @@ private void logTimeTaken(long startTime, int checkpointID) {
* @return
*/
private Mono<String> getLastMessageID(XMessage msg) {
if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("text")) {
if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("text")) {
return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function<XMessageDAO, String>() {
@Override
public String apply(XMessageDAO msg1) {
Expand All @@ -489,13 +526,15 @@ public String apply(XMessageDAO msg1) {
}
});

} else if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("button")) {
} else if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("button")) {
return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function<XMessageDAO, String>() {
@Override
public String apply(XMessageDAO lastMessage) {
return String.valueOf(lastMessage.getId());
}
});
} else {
log.error("UserId not found : "+msg.toString());
}
return Mono.empty();
}
Expand Down Expand Up @@ -550,4 +589,27 @@ private void switchFromTo(XMessage xMessage) {
xMessage.setFrom(to);
xMessage.setTo(from);
}

/**
* Convert Federated users into chunks
* @param users
* @param chunkSize
* @return
*/
private List<JSONArray> chunkArrayList(JSONArray users, int chunkSize) {
if (users != null && users.length() > 0) {
ArrayList<JSONArray> chunksList = new ArrayList<>();
chunksList.add(new JSONArray());
for (int x = 0; x < users.length(); x++) {
JSONObject user = users.getJSONObject(x);
if (chunksList.get(chunksList.size() - 1).length() == chunkSize)
chunksList.add(new JSONArray());
chunksList.get(chunksList.size() - 1).put(user);
}
return chunksList;
} else{
log.error("Federated Users null found : "+users);
return null;
}
}
}
12 changes: 11 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ spring.azure.blob.store.account.key=${AZURE_BLOB_STORE_ACCOUNT_KEY:#{"key"}}
spring.azure.blob.store.container.name=${AZURE_BLOB_STORE_CONTAINER:#{"container"}}

# Template Service Base Url
template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}}
template.service.base.url=${TEMPLATE_SERVICE_BASE_URL:#{"http://templater2.ngrok.samagra.io/"}}

# Email Config
spring.mail.host=${EMAIL_HOST:#{""}}
spring.mail.port=${EMAIL_PORT:#{"587"}}
spring.mail.username=${EMAIL_USERNAME:#{""}}
spring.mail.password=${EMAIL_PASSWORD:#{""}}
spring.mail.recipient=${RECIPIENT_EMAILS:#{""}}

# Send notifications in chunks
broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}}

0 comments on commit 2433c25

Please sign in to comment.