diff --git a/Dockerfile b/Dockerfile index 2acce99..e49a792 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,4 @@ WORKDIR $HOME COPY --from=build $HOME/target/*.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java","-Xmx350m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] +ENTRYPOINT ["java","-Xmx1024m","-Xshareclasses","-XX:+CMSClassUnloadingEnabled","-XX:+UseG1GC","-XX:+ExplicitGCInvokesConcurrent","-jar","app.jar"] diff --git a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java index cd079be..64a1430 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/OuboundConsumer.java @@ -22,15 +22,33 @@ public class OuboundConsumer { @Value("${outbound}") private String outboundTopic; + @Value("${notificationOutbound}") + public String notificationOutbound; + @Autowired public SimpleProducer kafkaProducer; + private long consumeCount, pushNotificationCount, pushOtherCount; + + @KafkaListener(id = "${processOutbound}", topics = "${processOutbound}", properties = {"spring.json.value.default.type=java.lang.String"}) public void onMessage(@Payload String stringMessage) { try { + consumeCount++; + log.info("OutboundConsumer:onMessage:: Consume Topic Count: " + consumeCount); XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - kafkaProducer.send(outboundTopic, msg.toXML()); - } catch(JAXBException e) { + if (msg != null && msg.getProvider() != null && msg.getProvider().equalsIgnoreCase("firebase") + && msg.getChannel().equalsIgnoreCase("web")) { + pushNotificationCount++; + log.info("OutboundConsumer:onMessage:: Notification push to kafka topic count: " + pushNotificationCount); + kafkaProducer.send(notificationOutbound, msg.toXML()); + } else { + pushOtherCount++; + log.info("OutboundConsumer:onMessage:: Other push to kafka topic count: " + pushOtherCount); + kafkaProducer.send(outboundTopic, msg.toXML()); + } + + } catch (JAXBException e) { e.printStackTrace(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bf5557f..68451c8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -13,6 +13,7 @@ broadcast-transformer=${KAFKA_BROADCAST_TRANSFORMER_TOPIC:#{"broadcast-transform generic-transformer=${KAFKA_GENERIC_TRANSFORMER_TOPIC:#{"generic-transformer"}} processOutbound=${KAFKA_PROCESS_OUTBOUND} outbound=${KAFKA_OUTBOUND_TOPIC} +notificationOutbound=${KAFKA_NOTIFICATION_TOPIC} # Cassandra #spring.data.cassandra.contactpoints=${CASSANDRA_URL}