Skip to content

Commit

Permalink
Merge pull request #50 from pankajjangid05/prod-development
Browse files Browse the repository at this point in the history
Prod development
  • Loading branch information
pankajjangid05 authored May 22, 2023
2 parents 5cd1f23 + bcb0b1b commit 59eff69
Showing 1 changed file with 68 additions and 58 deletions.
126 changes: 68 additions & 58 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public class ReactiveConsumer {
@Value("${broadcastNotificationChunkSize}")
private String broadcastNotificationChunkSize;

private long notificationProcessedCount;

@KafkaListener(id = "${inboundProcessed}", topics = "${inboundProcessed}", properties = {"spring.json.value.default.type=java.lang.String"})
public void onMessage(@Payload String stringMessage) {
try {
Expand Down Expand Up @@ -129,69 +131,77 @@ public void accept(JsonNode botNode) {
public void accept(XMessage msg) {
SenderReceiverInfo from = msg.getFrom();
// msg.setFrom(from);
getLastMessageID(msg)
.doOnNext(lastMessageID -> {
logTimeTaken(startTime, 4, null);
msg.setLastMessageID(lastMessageID);

/* Switch From & To */
switchFromTo(msg);

if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
try {
log.info("final msg.toXML(): " + msg.toXML().toString());

if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
Integer chunkSize = null;
try {
if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals(BotUtil.transformerTypeBroadcast)) {
Integer chunkSize = null;
try {
chunkSize = Integer.parseInt(broadcastNotificationChunkSize);
} catch(NumberFormatException ex){
chunkSize = null;
}
if(chunkSize != null) {
if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null
&& msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 1;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
count++;
}
}
} else {
log.error("federatedUsers not found : " + msg.toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}
} else{
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
}
notificationProcessedCount++;
log.info("Notification processed : " + notificationProcessedCount);
} else {
getLastMessageID(msg)
.doOnNext(lastMessageID -> {
logTimeTaken(startTime, 4, null);
msg.setLastMessageID(lastMessageID);

/* Switch From & To */
switchFromTo(msg);

if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
try {
chunkSize = Integer.parseInt(broadcastNotificationChunkSize);
} catch(NumberFormatException ex){
chunkSize = null;
}
if(chunkSize != null) {
if (msg.getTransformers() != null && msg.getTransformers().size() > 0 && msg.getTransformers().get(0) != null
&& msg.getTransformers().get(0).getMetaData() != null && msg.getTransformers().get(0).getMetaData().get("federatedUsers") != null) {
JSONArray federatedUsers = new JSONObject(msg.getTransformers().get(0).getMetaData().get("federatedUsers")).getJSONArray("list");
int totalFederatedUsers = federatedUsers.length();
if (totalFederatedUsers <= chunkSize) {
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
} else {
List<JSONArray> jsonArrayList = chunkArrayList(federatedUsers, chunkSize);
int count = 1;
for (JSONArray jsonArray : jsonArrayList) {
log.info("Total Federated Users : " + federatedUsers.length() + " Chunk size : " + jsonArray.length() + " Sent to kafka : "+count);
msg.getTransformers().get(0).getMetaData().put("federatedUsers", new JSONObject().put("list", jsonArray).toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
count++;
}
}
log.info("final msg.toXML(): " + msg.toXML().toString());
if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
kafkaProducer.send(genericTransformerTopic, msg.toXML());
} else {
log.error("federatedUsers not found : " + msg.toString());
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
kafkaProducer.send(odkTransformerTopic, msg.toXML());
}
} else{
kafkaProducer.send(broadcastTransformerTopic, msg.toXML());
// reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML());
} catch (JAXBException e) {
e.printStackTrace();
}

} else if (firstTransformer.findValue("type") != null && firstTransformer.findValue("type").asText().equals("generic")) {
kafkaProducer.send(genericTransformerTopic, msg.toXML());
} else {
kafkaProducer.send(odkTransformerTopic, msg.toXML());
logTimeTaken(startTime, 0, "process-end: %d ms");
}
// reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML());
} catch (JAXBException e) {
e.printStackTrace();
}
logTimeTaken(startTime, 0, "process-end: %d ms");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
log.error("Error in getLastMessageID" + throwable.getMessage());
}
})
.subscribe();
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
log.error("Error in getLastMessageID" + throwable.getMessage());
}
})
.subscribe();
}
} catch (JAXBException ex) {
log.error("Error while converting toXML() : " + ex.getMessage());
} catch (Exception ex){
log.error("An Error Occurred : " + ex.getMessage());
}
}
})
.doOnError(new Consumer<Throwable>() {
Expand Down

0 comments on commit 59eff69

Please sign in to comment.