Skip to content

Commit

Permalink
[Hot Fix] - Sending Notifications in Chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajjangid05 committed Apr 14, 2023
1 parent 6561b1c commit d7c778d
Showing 1 changed file with 18 additions and 36 deletions.
54 changes: 18 additions & 36 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ public void accept(XMessage msg) {
chunkSize = null;
}
if(chunkSize != null) {
if (msg.getTransformers() != null && msg.getTransformers().get(0) != null && msg.getTransformers().size() > 0
if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null
&& msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 0;
int count = 1;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
Expand Down Expand Up @@ -289,7 +289,7 @@ private XMessage setXMessageTransformers(XMessage xMessage, JsonNode botNode) {
*/
private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
String botId = botNode.get("id").asText();

/* Get federated users from federation services */
JSONArray users = userService.getUsersFromFederatedServers(botId);

Expand All @@ -304,7 +304,7 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
ObjectNode node = mapper.createObjectNode();
node.put("body", transformerMeta.get("body").asText());
node.put("type", transformerMeta.get("templateType").asText());

ArrayNode sampleData = mapper.createArrayNode();
for (int i = 0; i < users.length(); i++) {
ObjectNode userData = mapper.createObjectNode();
Expand All @@ -320,12 +320,12 @@ private String getFederatedUsersMeta(JsonNode botNode, JsonNode transformer) {
sampleData.add(userData);
}
node.put("sampleData", sampleData);

/* Fetch user messages by template from template service */
ArrayList<JSONObject> usersMessage = userService.getUsersMessageByTemplate(node);

log.info("usersMessage: "+usersMessage);

/* Set User messages against the user phone */
ObjectNode federatedUsersMeta = mapper.createObjectNode();
ArrayNode userMetaData = mapper.createArrayNode();
Expand Down Expand Up @@ -435,10 +435,10 @@ public void accept(Throwable throwable) {
*/
private String getFAUserIdForApp(String deviceID, UUID appID) {
String userID = null;

Object result = redisCacheService.getFAUserIDForAppCache(getFACacheName(deviceID, appID));
userID = result != null ? result.toString() : null;

if(userID == null || userID.isEmpty()) {
ClientResponse<UserResponse, Errors> response = botService.fusionAuthClient.retrieveUserByUsername(deviceID);

Expand Down Expand Up @@ -587,36 +587,18 @@ private void switchFromTo(XMessage xMessage) {
*/
private List<JSONArray> chunkArrayList(JSONArray users, int chunkSize) {
if (users != null && users.length() > 0) {
ArrayList<JSONObject> arrayToChunk = new ArrayList<>();

for (int i = 0; i < users.length(); i++) {
arrayToChunk.add(users.getJSONObject(i));
}
ArrayList<List<JSONObject>> chunkList = new ArrayList<>();
int guide = arrayToChunk.size();
int index = 0;
int tale = chunkSize;
while (tale < arrayToChunk.size()) {
chunkList.add(arrayToChunk.subList(index, tale));
guide = guide - chunkSize;
index = index + chunkSize;
tale = tale + chunkSize;
}
if (guide > 0) {
chunkList.add(arrayToChunk.subList(index, index + guide));
}
List<JSONArray> userChunksList = new ArrayList<>();
for (List<JSONObject> l : chunkList) {
JSONArray jsonArray = new JSONArray();
for (JSONObject jsonObject : l) {
jsonArray.put(jsonObject);
}
userChunksList.add(jsonArray);
ArrayList<JSONArray> chunksList = new ArrayList<>();
chunksList.add(new JSONArray());
for (int x = 0; x < users.length(); x++) {
JSONObject user = users.getJSONObject(x);
if (chunksList.get(chunksList.size() - 1).length() == chunkSize)
chunksList.add(new JSONArray());
chunksList.get(chunksList.size() - 1).put(user);
}
return userChunksList;
return chunksList;
} else{
log.error("Federated Users null found : "+users);
return null;
}
return null;
}
}

0 comments on commit d7c778d

Please sign in to comment.