Skip to content

Commit

Permalink
[Feature] - Sending Notifications in Chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajjangid05 committed Apr 14, 2023
1 parent 04b85a3 commit 6561b1c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 17 deletions.
101 changes: 85 additions & 16 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import io.fusionauth.domain.UserRegistration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import messagerosa.core.model.DeviceType;
import messagerosa.core.model.SenderReceiverInfo;
import messagerosa.core.model.Transformer;
import messagerosa.core.model.XMessage;
import messagerosa.core.model.*;
import messagerosa.xml.XMessageParser;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONArray;
Expand All @@ -43,11 +40,7 @@
import javax.xml.bind.JAXBException;
import java.io.ByteArrayInputStream;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -95,7 +88,9 @@ public class ReactiveConsumer {
private final String DEFAULT_APP_NAME = "Global Bot";
LocalDateTime yesterday = LocalDateTime.now().minusDays(1L);


@Value("${broadcastNotificationChunkSize}")
private String broadcastNotificationChunkSize;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
public void onMessage(@Payload String stringMessage) {
try {
Expand Down Expand Up @@ -137,10 +132,41 @@ public void accept(XMessage msg) {

if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
try {
log.info("final msg.toXML(): "+msg.toXML().toString());
if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else if(firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
log.info("final msg.toXML(): " + msg.toXML().toString());

if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
Integer chunkSize = null;
try {
chunkSize = Integer.parseInt(broadcastNotificationChunkSize);
} catch(NumberFormatException ex){
chunkSize = null;
}
if(chunkSize != null) {
if (msg.getTransformers() != null && msg.getTransformers().get(0) != null && msg.getTransformers().size() > 0
&& msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 0;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
count++;
}
}
} else {
log.error("federatedUsers not found : " + msg.toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}
} else{
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}

} else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
kafkaProducer.send(genericTransformerTopic, msg.toXML());
} else {
kafkaProducer.send(odkTransformerTopic, msg.toXML());
Expand Down Expand Up @@ -477,7 +503,7 @@ private void logTimeTaken(long startTime, int checkpointID) {
* @return
*/
private Mono<String> getLastMessageID(XMessage msg) {
if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("text")) {
if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("text")) {
return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function<XMessageDAO, String>() {
@Override
public String apply(XMessageDAO msg1) {
Expand All @@ -489,13 +515,15 @@ public String apply(XMessageDAO msg1) {
}
});

} else if (msg != null && msg.getMessageType().toString().equalsIgnoreCase("button")) {
} else if (msg != null && msg.getFrom() != null && msg.getFrom().getUserID() != null && msg.getMessageType().toString().equalsIgnoreCase("button")) {
return getLatestXMessage(msg.getFrom().getUserID(), yesterday, "SENT").map(new Function<XMessageDAO, String>() {
@Override
public String apply(XMessageDAO lastMessage) {
return String.valueOf(lastMessage.getId());
}
});
} else {
log.error("UserId not found : "+msg.toString());
}
return Mono.empty();
}
Expand Down Expand Up @@ -550,4 +578,45 @@ private void switchFromTo(XMessage xMessage) {
xMessage.setFrom(to);
xMessage.setTo(from);
}

/**
* Convert Federated users into chunks
* @param users
* @param chunkSize
* @return
*/
private List<JSONArray> chunkArrayList(JSONArray users, int chunkSize) {
if (users != null && users.length() > 0) {
ArrayList<JSONObject> arrayToChunk = new ArrayList<>();

for (int i = 0; i < users.length(); i++) {
arrayToChunk.add(users.getJSONObject(i));
}
ArrayList<List<JSONObject>> chunkList = new ArrayList<>();
int guide = arrayToChunk.size();
int index = 0;
int tale = chunkSize;
while (tale < arrayToChunk.size()) {
chunkList.add(arrayToChunk.subList(index, tale));
guide = guide - chunkSize;
index = index + chunkSize;
tale = tale + chunkSize;
}
if (guide > 0) {
chunkList.add(arrayToChunk.subList(index, index + guide));
}
List<JSONArray> userChunksList = new ArrayList<>();
for (List<JSONObject> l : chunkList) {
JSONArray jsonArray = new JSONArray();
for (JSONObject jsonObject : l) {
jsonArray.put(jsonObject);
}
userChunksList.add(jsonArray);
}
return userChunksList;
} else{
log.error("Federated Users null found : "+users);
}
return null;
}
}
5 changes: 4 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,7 @@ spring.mail.host=${EMAIL_HOST:#{""}}
spring.mail.port=${EMAIL_PORT:#{"587"}}
spring.mail.username=${EMAIL_USERNAME:#{""}}
spring.mail.password=${EMAIL_PASSWORD:#{""}}
spring.mail.recipient=${RECIPIENT_EMAILS:#{""}}
spring.mail.recipient=${RECIPIENT_EMAILS:#{""}}

# Send notifications in chunks
broadcastNotificationChunkSize=${BROADCAST_NOTIFICATION_CHUNK_SIZE:#{""}}

0 comments on commit 6561b1c

Please sign in to comment.