Skip to content

Commit

Permalink
Merge pull request #1 from btison/async-jaxrs
Browse files Browse the repository at this point in the history
Using JaxRS 2.1 async request processing instead of vert.x for REST layer
  • Loading branch information
btison authored Jun 5, 2019
2 parents 2aafbf9 + 394bd21 commit d5808a6
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 192 deletions.
32 changes: 7 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<version>0.0.1-SNAPSHOT</version>
<properties>
<surefire-plugin.version>2.22.0</surefire-plugin.version>
<quarkus.version>0.15.0</quarkus.version>
<quarkus.version>0.16.0</quarkus.version>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
Expand All @@ -24,6 +24,10 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb</artifactId>
Expand All @@ -47,32 +51,10 @@
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
<exclusions>
<exclusion>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<groupId>io.smallrye.reactive</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
<groupId>io.smallrye.reactive</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -83,8 +65,8 @@
<artifactId>quarkus-jsonp</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.redhat.cajun.navy.incident.rest;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.health.Health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;

@Health
@ApplicationScoped
public class ApplicationHealthCheck implements HealthCheck {


@Override
public HealthCheckResponse call() {
return HealthCheckResponse.named("Health check").up().build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.redhat.cajun.navy.incident.rest;

import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import io.vertx.axle.core.eventbus.EventBus;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;

@Path("/incidents")
public class IncidentsResource {

@Inject
EventBus bus;

@GET
@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<Response> incidents() {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidents");
return bus.<JsonObject>send("incident-service", new JsonObject(), options)
.thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
}

@POST
@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
public CompletionStage<Response> createIncident(String incident) {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "createIncident");
return bus.<JsonObject>send("incident-service", new JsonObject(incident), options)
.thenApply(msg -> Response.status(200).build());
}

@GET
@Path("/{status}")
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<Response> incidentsByStatus(@PathParam("status") String status) {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByStatus");
return bus.<JsonObject>send("incident-service", new JsonObject().put("status", status), options)
.thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
}

@GET
@Path("/incident/{id}")
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<Response> incidentById(@PathParam("id") String incidentId) {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentById");
return bus.<JsonObject>send("incident-service", new JsonObject().put("incidentId", incidentId), options)
.thenApply(msg -> {
JsonObject incident = msg.body().getJsonObject("incident");
if (incident == null) {
return Response.status(404).build();
} else {
return Response.ok(incident.encode()).build();
}
});
}

@GET
@Path("/byname/{name}")
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<Response> incidentsByName(@PathParam("name") String name) {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "incidentsByName");
return bus.<JsonObject>send("incident-service", new JsonObject().put("name", name), options)
.thenApply(msg -> Response.ok(msg.body().getJsonArray("incidents").encode()).build());
}

@POST
@Path("/reset")
public CompletionStage<Response> reset() {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "reset");
return bus.<JsonObject>send("incident-service", new JsonObject(), options)
.thenApply(msg -> Response.ok().build());
}

}
112 changes: 0 additions & 112 deletions src/main/java/com/redhat/cajun/navy/incident/rest/RestApi.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import com.redhat.cajun.navy.incident.message.IncidentReportedEvent;
import com.redhat.cajun.navy.incident.model.Incident;
import io.quarkus.vertx.ConsumeEvent;
import io.reactivex.processors.BehaviorProcessor;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand All @@ -32,32 +33,33 @@ public class EventBusConsumer {
@Inject
IncidentService service;

private BehaviorProcessor<Incident> processor = BehaviorProcessor.create();
private FlowableProcessor<Incident> processor = UnicastProcessor.<Incident>create().toSerialized();

private Object lock = new Object();

private IncidentCodec codec = new IncidentCodec();

@SuppressWarnings("unchecked")
@ConsumeEvent(value = "incident-service", blocking = true)
public void consume(Message<?> msg) {
public void consume(Message<JsonObject> msg) {
String action = msg.headers().get("action");
switch (action) {
case "incidents" :
incidents((Message<JsonObject>) msg);
incidents(msg);
break;
case "incidentById" :
incidentById((Message<JsonObject>) msg);
incidentById(msg);
break;
case "incidentsByStatus":
incidentsByStatus((Message<JsonObject>) msg);
incidentsByStatus(msg);
break;
case "incidentsByName":
incidentsByName((Message<JsonObject>) msg);
incidentsByName(msg);
break;
case "reset" :
reset((Message<JsonObject>) msg);
reset(msg);
break;
case "createIncident":
createIncident((Message<Incident>) msg);
createIncident(msg);
break;
default:
msg.fail(-1, "Unsupported operation");
Expand Down Expand Up @@ -102,12 +104,9 @@ private void reset(Message<JsonObject> msg) {
msg.reply(new JsonObject());
}

private void createIncident(Message<Incident> msg) {
Incident created = service.create(msg.body());
boolean success = false;
while (!success) {
success = processor.offer(created);
}
private void createIncident(Message<JsonObject> msg) {
Incident created = service.create(codec.fromJsonObject(msg.body()));
processor.onNext(created);
msg.reply(new JsonObject());
}

Expand All @@ -133,7 +132,7 @@ private CompletionStage<org.eclipse.microprofile.reactive.messaging.Message<Stri
.build();
Jsonb jsonb = JsonbBuilder.create();
String json = jsonb.toJson(message);
log.info("Message: " + json);
log.debug("Message: " + json);
CompletableFuture<org.eclipse.microprofile.reactive.messaging.Message<String>> future = new CompletableFuture<>();
KafkaMessage<String, String> kafkaMessage = KafkaMessage.of(incident.getId(), json);
future.complete(kafkaMessage);
Expand Down
Loading

0 comments on commit d5808a6

Please sign in to comment.