diff --git a/pom.xml b/pom.xml
index 001a310..c6bbaf1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
0.0.1-SNAPSHOT
2.22.0
- 0.15.0
+ 0.16.0
1.8
UTF-8
1.8
@@ -24,6 +24,10 @@
+
+ io.quarkus
+ quarkus-resteasy
+
io.quarkus
quarkus-jsonb
@@ -47,32 +51,10 @@
io.quarkus
quarkus-smallrye-reactive-messaging
-
-
- smallrye-reactive-messaging-provider
- io.smallrye.reactive
-
-
-
-
- io.smallrye.reactive
- smallrye-reactive-messaging-provider
- 0.0.9
io.quarkus
quarkus-smallrye-reactive-messaging-kafka
-
-
- smallrye-reactive-messaging-kafka
- io.smallrye.reactive
-
-
-
-
- io.smallrye.reactive
- smallrye-reactive-messaging-kafka
- 0.0.9
io.quarkus
@@ -83,8 +65,8 @@
quarkus-jsonp
- io.vertx
- vertx-web
+ io.quarkus
+ quarkus-smallrye-health
io.quarkus
diff --git a/src/main/java/com/redhat/cajun/navy/incident/rest/ApplicationHealthCheck.java b/src/main/java/com/redhat/cajun/navy/incident/rest/ApplicationHealthCheck.java
new file mode 100644
index 0000000..a8da7a4
--- /dev/null
+++ b/src/main/java/com/redhat/cajun/navy/incident/rest/ApplicationHealthCheck.java
@@ -0,0 +1,18 @@
+package com.redhat.cajun.navy.incident.rest;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.health.Health;
+import org.eclipse.microprofile.health.HealthCheck;
+import org.eclipse.microprofile.health.HealthCheckResponse;
+
+@Health
+@ApplicationScoped
+public class ApplicationHealthCheck implements HealthCheck {
+
+
+ @Override
+ public HealthCheckResponse call() {
+ return HealthCheckResponse.named("Health check").up().build();
+ }
+}
diff --git a/src/main/java/com/redhat/cajun/navy/incident/rest/IncidentsResource.java b/src/main/java/com/redhat/cajun/navy/incident/rest/IncidentsResource.java
new file mode 100644
index 0000000..77aadae
--- /dev/null
+++ b/src/main/java/com/redhat/cajun/navy/incident/rest/IncidentsResource.java
@@ -0,0 +1,84 @@
+package com.redhat.cajun.navy.incident.rest;
+
+import java.util.concurrent.CompletionStage;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import io.vertx.axle.core.eventbus.EventBus;
+import io.vertx.core.eventbus.DeliveryOptions;
+import io.vertx.core.json.JsonObject;
+
+@Path("/incidents")
+public class IncidentsResource {
+
+ @Inject
+ EventBus bus;
+
+ @GET
+ @Path("/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CompletionStage incidents() {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidents");
+ return bus.send("incident-service", new JsonObject(), options)
+ .thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
+ }
+
+ @POST
+ @Path("/")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public CompletionStage createIncident(String incident) {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "createIncident");
+ return bus.send("incident-service", new JsonObject(incident), options)
+ .thenApply(msg -> Response.status(200).build());
+ }
+
+ @GET
+ @Path("/{status}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CompletionStage incidentsByStatus(@PathParam("status") String status) {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByStatus");
+ return bus.send("incident-service", new JsonObject().put("status", status), options)
+ .thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
+ }
+
+ @GET
+ @Path("/incident/{id}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CompletionStage incidentById(@PathParam("id") String incidentId) {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentById");
+ return bus.send("incident-service", new JsonObject().put("incidentId", incidentId), options)
+ .thenApply(msg -> {
+ JsonObject incident = msg.body().getJsonObject("incident");
+ if (incident == null) {
+ return Response.status(404).build();
+ } else {
+ return Response.ok(incident.encode()).build();
+ }
+ });
+ }
+
+ @GET
+ @Path("/byname/{name}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CompletionStage incidentsByName(@PathParam("name") String name) {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByName");
+ return bus.send("incident-service", new JsonObject().put("name", name), options)
+ .thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
+ }
+
+ @POST
+ @Path("/reset")
+ public CompletionStage reset() {
+ DeliveryOptions options = new DeliveryOptions().addHeader("action", "reset");
+ return bus.send("incident-service", new JsonObject(), options)
+ .thenApply(msg -> Response.ok().build());
+ }
+
+}
diff --git a/src/main/java/com/redhat/cajun/navy/incident/rest/RestApi.java b/src/main/java/com/redhat/cajun/navy/incident/rest/RestApi.java
deleted file mode 100644
index 0ab878c..0000000
--- a/src/main/java/com/redhat/cajun/navy/incident/rest/RestApi.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.redhat.cajun.navy.incident.rest;
-
-import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.event.Observes;
-import javax.inject.Inject;
-
-import com.redhat.cajun.navy.incident.service.IncidentCodec;
-import io.quarkus.runtime.StartupEvent;
-import io.vertx.core.eventbus.DeliveryOptions;
-import io.vertx.core.json.JsonObject;
-import io.vertx.reactivex.core.Vertx;
-import io.vertx.reactivex.core.buffer.Buffer;
-import io.vertx.reactivex.ext.web.Router;
-import io.vertx.reactivex.ext.web.RoutingContext;
-import io.vertx.reactivex.ext.web.handler.BodyHandler;
-import org.eclipse.microprofile.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ApplicationScoped
-public class RestApi {
-
- private static Logger log = LoggerFactory.getLogger(RestApi.class);
-
- @Inject
- Vertx vertx;
-
- @Inject
- Config config;
-
- void onStart(@Observes StartupEvent ev) {
-
- Router router = Router.router(vertx);
-
- //health check
- router.get("/health").handler(this::healthCheck);
-
- router.route("/incidents").handler(BodyHandler.create());
- router.get("/incidents").handler(this::allIncidents);
- router.get("/incidents/incident/:id").handler(this::incidentById);
- router.get("/incidents/:status").handler(this::incidentsByStatus);
- router.get("/incidents/byname/:name").handler(this::incidentsByName);
- router.post("/incidents/reset").handler(this::reset);
- router.post("/incidents").handler(this::create);
-
- vertx.createHttpServer().requestHandler(router)
- .rxListen( config.getOptionalValue("http.server.port", Integer.class).orElse(8080))
- .subscribe(h -> log.info("Http Server started successfully"),
- t -> log.error("Error when starting Http server", t));
-
- vertx.eventBus().registerCodec(new IncidentCodec());
-
- }
-
- private void healthCheck(RoutingContext rc) {
- rc.response().setStatusCode(200).end(new JsonObject().put("status", "ok").encode());
- }
-
- private void allIncidents(RoutingContext rc) {
- DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidents");
- vertx.eventBus().rxSend("incident-service", new JsonObject(), options)
- .subscribe((json) -> rc.response().setStatusCode(200).putHeader("Content-Type", "application/json")
- .end(json.body().getJsonArray("incidents").encode()), rc::fail);
- }
-
- private void incidentById(RoutingContext rc) {
- String incidentId = rc.request().getParam("id");
- DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentById");
- vertx.eventBus().rxSend("incident-service", new JsonObject().put("incidentId", incidentId), options)
- .subscribe((msg) -> {
- JsonObject incident = msg.body().getJsonObject("incident");
- if (incident == null) {
- rc.response().setStatusCode(404).end();
- } else {
- rc.response().setStatusCode(200).putHeader("Content-Type", "application/json")
- .end(incident.encode());
- }
- }, rc::fail);
-
- }
-
- private void incidentsByStatus(RoutingContext rc) {
- String status = rc.request().getParam("status");
- DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByStatus");
- vertx.eventBus().rxSend("incident-service", new JsonObject().put("status", status), options)
- .subscribe((msg) -> rc.response().setStatusCode(200).putHeader("Content-Type", "application/json")
- .end(msg.body().getJsonArray("incidents").encode()), rc::fail);
- }
-
- private void incidentsByName(RoutingContext rc) {
- String name = rc.request().getParam("name");
- DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByName");
- vertx.eventBus().rxSend("incident-service", new JsonObject().put("name", name), options)
- .subscribe((msg) -> rc.response().setStatusCode(200).putHeader("Content-Type", "application/json")
- .end(msg.body().getJsonArray("incidents").encode()), rc::fail);
- }
-
- private void reset(RoutingContext rc) {
- DeliveryOptions options = new DeliveryOptions().addHeader("action", "reset");
- vertx.eventBus().rxSend("incident-service", new JsonObject(), options)
- .subscribe((msg) -> rc.response().setStatusCode(200).end(), rc::fail);
- }
-
- private void create(RoutingContext rc) {
- Buffer buffer = rc.getBody();
- DeliveryOptions options = new DeliveryOptions().setCodecName(new IncidentCodec().name()).addHeader("action", "createIncident");
- vertx.eventBus().rxSend("incident-service", buffer.getDelegate(), options)
- .subscribe((msg) -> rc.response().setStatusCode(200).end(), rc::fail);
-
- }
-
-}
diff --git a/src/main/java/com/redhat/cajun/navy/incident/service/EventBusConsumer.java b/src/main/java/com/redhat/cajun/navy/incident/service/EventBusConsumer.java
index 24ee61b..1e2b2f8 100644
--- a/src/main/java/com/redhat/cajun/navy/incident/service/EventBusConsumer.java
+++ b/src/main/java/com/redhat/cajun/navy/incident/service/EventBusConsumer.java
@@ -13,7 +13,8 @@
import com.redhat.cajun.navy.incident.message.IncidentReportedEvent;
import com.redhat.cajun.navy.incident.model.Incident;
import io.quarkus.vertx.ConsumeEvent;
-import io.reactivex.processors.BehaviorProcessor;
+import io.reactivex.processors.FlowableProcessor;
+import io.reactivex.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
@@ -32,32 +33,33 @@ public class EventBusConsumer {
@Inject
IncidentService service;
- private BehaviorProcessor processor = BehaviorProcessor.create();
+ private FlowableProcessor processor = UnicastProcessor.create().toSerialized();
+
+ private Object lock = new Object();
private IncidentCodec codec = new IncidentCodec();
- @SuppressWarnings("unchecked")
@ConsumeEvent(value = "incident-service", blocking = true)
- public void consume(Message> msg) {
+ public void consume(Message msg) {
String action = msg.headers().get("action");
switch (action) {
case "incidents" :
- incidents((Message) msg);
+ incidents(msg);
break;
case "incidentById" :
- incidentById((Message) msg);
+ incidentById(msg);
break;
case "incidentsByStatus":
- incidentsByStatus((Message) msg);
+ incidentsByStatus(msg);
break;
case "incidentsByName":
- incidentsByName((Message) msg);
+ incidentsByName(msg);
break;
case "reset" :
- reset((Message) msg);
+ reset(msg);
break;
case "createIncident":
- createIncident((Message) msg);
+ createIncident(msg);
break;
default:
msg.fail(-1, "Unsupported operation");
@@ -102,12 +104,9 @@ private void reset(Message msg) {
msg.reply(new JsonObject());
}
- private void createIncident(Message msg) {
- Incident created = service.create(msg.body());
- boolean success = false;
- while (!success) {
- success = processor.offer(created);
- }
+ private void createIncident(Message msg) {
+ Incident created = service.create(codec.fromJsonObject(msg.body()));
+ processor.onNext(created);
msg.reply(new JsonObject());
}
@@ -133,7 +132,7 @@ private CompletionStage> future = new CompletableFuture<>();
KafkaMessage kafkaMessage = KafkaMessage.of(incident.getId(), json);
future.complete(kafkaMessage);
diff --git a/src/main/java/com/redhat/cajun/navy/incident/service/IncidentCodec.java b/src/main/java/com/redhat/cajun/navy/incident/service/IncidentCodec.java
index 08ef6ee..cd5d313 100644
--- a/src/main/java/com/redhat/cajun/navy/incident/service/IncidentCodec.java
+++ b/src/main/java/com/redhat/cajun/navy/incident/service/IncidentCodec.java
@@ -1,43 +1,9 @@
package com.redhat.cajun.navy.incident.service;
import com.redhat.cajun.navy.incident.model.Incident;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.json.JsonObject;
-public class IncidentCodec implements MessageCodec {
-
-
- @Override
- public void encodeToWire(Buffer buffer, Buffer incident) {
- buffer.appendInt(incident.length());
- buffer.appendBuffer(incident);
- }
-
- @Override
- public Incident decodeFromWire(int position, Buffer buffer) {
- int _pos = position;
- int length = buffer.getInt(_pos);
- String jsonStr = buffer.getString(_pos+=4, _pos+=length);
- JsonObject jsonObject = new JsonObject(jsonStr);
- return fromJsonObject(jsonObject);
- }
-
- @Override
- public Incident transform(Buffer incident) {
- JsonObject jsonObject = new JsonObject(incident);
- return fromJsonObject(jsonObject);
- }
-
- @Override
- public String name() {
- return this.getClass().getSimpleName();
- }
-
- @Override
- public byte systemCodecID() {
- return -1;
- }
+public class IncidentCodec {
public JsonObject toJsonObject(Incident incident) {
return new JsonObject().put("id", incident.getId())
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 417cb32..350c3ee 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,12 +1,10 @@
-# Configuration file
-http.server.port=8080
-
quarkus.datasource.driver=org.postgresql.Driver
quarkus.hibernate-orm.database.generation=none
quarkus.log.category."com.redhat.cajun.navy".level=DEBUG
quarkus.log.category."org.eclipse.yasson".level=ERROR
quarkus.log.console.enable=true
+quarkus.log.console.level=DEBUG
quarkus.log.level=INFO
# Configure the Kafka source