diff --git a/lib/pom.xml b/lib/pom.xml index 3ffb75f..6bef2e8 100644 --- a/lib/pom.xml +++ b/lib/pom.xml @@ -10,10 +10,10 @@ kafka-hook - - org.jboss.spec.javax.ws.rs - jboss-jaxrs-api_2.1_spec - ${jaxrs.version} + + io.vertx + vertx-core + ${vertx.version} com.fasterxml.jackson.core diff --git a/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java b/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java new file mode 100644 index 0000000..3b19248 --- /dev/null +++ b/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java @@ -0,0 +1,23 @@ +package se.yolean.kafka.hook; + +import se.yolean.kafka.hook.types.v1.HookError; + + +/** + * Means we failed to fulfil the primary responsibility of kafka-hook. + */ +public class ProduceFailed extends Exception { + + private static final long serialVersionUID = 1L; + + private final HookError error; + + public ProduceFailed(HookError error) { + this.error = error; + } + + public HookError getError() { + return error; + } + +} diff --git a/pom.xml b/pom.xml index 5c42085..9af08ba 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 3.8.1 io.quarkus - 2.0.1.Final + 3.9.4 2.11.3 1.5.6 5.7.0 diff --git a/rest/pom.xml b/rest/pom.xml index 81575bb..758e76d 100644 --- a/rest/pom.xml +++ b/rest/pom.xml @@ -34,11 +34,11 @@ io.quarkus - quarkus-resteasy + quarkus-vertx-web io.quarkus - quarkus-resteasy-jackson + quarkus-jackson io.quarkus diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java deleted file mode 100644 index 95ade98..0000000 --- a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java +++ /dev/null @@ -1,38 +0,0 @@ -package se.yolean.kafka.hook.rest; - -import java.io.IOException; -import java.io.InputStream; - -import javax.inject.Inject; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; - -import se.yolean.kafka.hook.http.KafkaHookResource; - -@Produces(MediaType.APPLICATION_JSON) -@Path("/hook/v1") -public class KafkaHookRestResource { - - @Inject KafkaHookResource hook; - - @POST - public Response produce(@Context HttpHeaders headers, @Context UriInfo uri, InputStream payload) throws IOException { - return hook.produce(headers, uri, "", payload); - } - - @POST - @Path("/{type}") - public Response produce(@Context HttpHeaders headers, @Context UriInfo uri, @PathParam("type") String type, InputStream payload) - // if we fail to read the payload, which would be very strange - throws IOException { - return hook.produce(headers, uri, type, payload); - } - -} diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java new file mode 100644 index 0000000..a677605 --- /dev/null +++ b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java @@ -0,0 +1,38 @@ +package se.yolean.kafka.hook.rest; + +import java.io.IOException; +import java.io.InputStream; + +import javax.enterprise.context.ApplicationScoped; + +import io.quarkus.vertx.web.Route; +import io.quarkus.vertx.web.Route.HandlerType; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; +import se.yolean.kafka.hook.ProduceFailed; +import se.yolean.kafka.hook.types.v1.HookReceipt; + +@ApplicationScoped +public class KafkaHookRoutes { + + @Route(methods = HttpMethod.POST, regex = ".*/v1/?(.*)", type = Route.HandlerType.BLOCKING, produces = "application/json") + public HookReceipt produce(RoutingContext ctx) throws IOException { + String type = ctx.pathParam("param0"); + MultiMap headers = ctx.request().headers(); + @Nullable + Buffer payload = ctx.getBody(); + return hook.produce(headers, uri, "", payload); + } + + @Route(type = HandlerType.FAILURE, produces = "application/json") + void unsupported(ProduceFailed e, HttpServerResponse response) { + response.setStatusCode(500).end(e.getError()); + } + + // TODO pixy drop-in, stuff like: "/{topic: ^[^/]+$}/messages" + +} diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java b/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java deleted file mode 100644 index 2527432..0000000 --- a/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java +++ /dev/null @@ -1,19 +0,0 @@ -package se.yolean.kafka.hook.rest; - -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.core.Response; - -// TODO can we make a drop-in replacement for https://github.com/mailgun/kafka-pixy#produce ? -// Excluding /clusters endpoints - -@Path("/topics") -public class PixyDropInReplacementResource { - - @POST - @Path("/{topic: ^[^/]+$}/messages") - public Response pixy() { - throw new UnsupportedOperationException("Not implemented"); - } - -} diff --git a/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java b/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java index 0b70b1d..a52bff8 100644 --- a/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java +++ b/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java @@ -29,6 +29,7 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import java.io.UnsupportedEncodingException; import java.time.Duration; @@ -160,6 +161,44 @@ public void testProduceString() throws UnsupportedEncodingException { assertEquals("github.com/Yolean/kafka-hook/mytype", headers(record2).get("ce_type")); } + @Test + public void testProduceAlternativeUrls() throws UnsupportedEncodingException { + given() + .contentType(ContentType.TEXT) + .accept(ContentType.JSON) + .body("test1".getBytes()) + .when().post("/hook/v1/mytype/with/slashes") + .then() + .body(containsString("\"offset\":" + (startOffset))) + .statusCode(200); + given() + .contentType(ContentType.TEXT) + .accept(ContentType.JSON) + .body("test2".getBytes()) + .when().post("/some-prefix/v1/hook") + .then() + .body(containsString("\"offset\":" + (startOffset + 1))) + .statusCode(200); + given() + .contentType(ContentType.TEXT) + .accept(ContentType.JSON) + .body("test3".getBytes()) + .when().post("/some-prefix/v1/hook/sub/type/") + .then() + .body(containsString("\"offset\":" + (startOffset + 2))) + .statusCode(200); + waitBetweenPolls(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + Iterator> it = records.iterator(); + ConsumerRecord record1 = it.next(); + assertEquals("github.com/Yolean/kafka-hook/mytype/with/slashes", headers(record1).get("ce_type")); + ConsumerRecord record2 = it.next(); + assertEquals("github.com/Yolean/kafka-hook/", headers(record2).get("ce_type")); + ConsumerRecord record3 = it.next(); + assertNotEquals("github.com/Yolean/kafka-hook/sub/type/", headers(record3).get("ce_type")); + assertEquals("github.com/Yolean/kafka-hook/sub/type", headers(record3).get("ce_type")); + } + @Test public void testCloudeventsDistributedTracingExtensionWithEnvoyHeaders() { given()