diff --git a/lib/pom.xml b/lib/pom.xml
index 3ffb75f..6bef2e8 100644
--- a/lib/pom.xml
+++ b/lib/pom.xml
@@ -10,10 +10,10 @@
kafka-hook
-
- org.jboss.spec.javax.ws.rs
- jboss-jaxrs-api_2.1_spec
- ${jaxrs.version}
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
com.fasterxml.jackson.core
diff --git a/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java b/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java
new file mode 100644
index 0000000..3b19248
--- /dev/null
+++ b/lib/src/main/java/se/yolean/kafka/hook/ProduceFailed.java
@@ -0,0 +1,23 @@
+package se.yolean.kafka.hook;
+
+import se.yolean.kafka.hook.types.v1.HookError;
+
+
+/**
+ * Means we failed to fulfil the primary responsibility of kafka-hook.
+ */
+public class ProduceFailed extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ private final HookError error;
+
+ public ProduceFailed(HookError error) {
+ this.error = error;
+ }
+
+ public HookError getError() {
+ return error;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 5c42085..9af08ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
3.8.1
io.quarkus
- 2.0.1.Final
+ 3.9.4
2.11.3
1.5.6
5.7.0
diff --git a/rest/pom.xml b/rest/pom.xml
index 81575bb..758e76d 100644
--- a/rest/pom.xml
+++ b/rest/pom.xml
@@ -34,11 +34,11 @@
io.quarkus
- quarkus-resteasy
+ quarkus-vertx-web
io.quarkus
- quarkus-resteasy-jackson
+ quarkus-jackson
io.quarkus
diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java
deleted file mode 100644
index 95ade98..0000000
--- a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRestResource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package se.yolean.kafka.hook.rest;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.inject.Inject;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriInfo;
-
-import se.yolean.kafka.hook.http.KafkaHookResource;
-
-@Produces(MediaType.APPLICATION_JSON)
-@Path("/hook/v1")
-public class KafkaHookRestResource {
-
- @Inject KafkaHookResource hook;
-
- @POST
- public Response produce(@Context HttpHeaders headers, @Context UriInfo uri, InputStream payload) throws IOException {
- return hook.produce(headers, uri, "", payload);
- }
-
- @POST
- @Path("/{type}")
- public Response produce(@Context HttpHeaders headers, @Context UriInfo uri, @PathParam("type") String type, InputStream payload)
- // if we fail to read the payload, which would be very strange
- throws IOException {
- return hook.produce(headers, uri, type, payload);
- }
-
-}
diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java
new file mode 100644
index 0000000..a677605
--- /dev/null
+++ b/rest/src/main/java/se/yolean/kafka/hook/rest/KafkaHookRoutes.java
@@ -0,0 +1,38 @@
+package se.yolean.kafka.hook.rest;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import io.quarkus.vertx.web.Route;
+import io.quarkus.vertx.web.Route.HandlerType;
+import io.vertx.codegen.annotations.Nullable;
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.ext.web.RoutingContext;
+import se.yolean.kafka.hook.ProduceFailed;
+import se.yolean.kafka.hook.types.v1.HookReceipt;
+
+@ApplicationScoped
+public class KafkaHookRoutes {
+
+ @Route(methods = HttpMethod.POST, regex = ".*/v1/?(.*)", type = Route.HandlerType.BLOCKING, produces = "application/json")
+ public HookReceipt produce(RoutingContext ctx) throws IOException {
+ String type = ctx.pathParam("param0");
+ MultiMap headers = ctx.request().headers();
+ @Nullable
+ Buffer payload = ctx.getBody();
+ return hook.produce(headers, uri, "", payload);
+ }
+
+ @Route(type = HandlerType.FAILURE, produces = "application/json")
+ void unsupported(ProduceFailed e, HttpServerResponse response) {
+ response.setStatusCode(500).end(e.getError());
+ }
+
+ // TODO pixy drop-in, stuff like: "/{topic: ^[^/]+$}/messages"
+
+}
diff --git a/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java b/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java
deleted file mode 100644
index 2527432..0000000
--- a/rest/src/main/java/se/yolean/kafka/hook/rest/PixyDropInReplacementResource.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package se.yolean.kafka.hook.rest;
-
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-// TODO can we make a drop-in replacement for https://github.com/mailgun/kafka-pixy#produce ?
-// Excluding /clusters endpoints
-
-@Path("/topics")
-public class PixyDropInReplacementResource {
-
- @POST
- @Path("/{topic: ^[^/]+$}/messages")
- public Response pixy() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
-}
diff --git a/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java b/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java
index 0b70b1d..a52bff8 100644
--- a/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java
+++ b/rest/src/test/java/se/yolean/kafka/hook/rest/KafkaHookResourceIntegrationTest.java
@@ -29,6 +29,7 @@
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import java.io.UnsupportedEncodingException;
import java.time.Duration;
@@ -160,6 +161,44 @@ public void testProduceString() throws UnsupportedEncodingException {
assertEquals("github.com/Yolean/kafka-hook/mytype", headers(record2).get("ce_type"));
}
+ @Test
+ public void testProduceAlternativeUrls() throws UnsupportedEncodingException {
+ given()
+ .contentType(ContentType.TEXT)
+ .accept(ContentType.JSON)
+ .body("test1".getBytes())
+ .when().post("/hook/v1/mytype/with/slashes")
+ .then()
+ .body(containsString("\"offset\":" + (startOffset)))
+ .statusCode(200);
+ given()
+ .contentType(ContentType.TEXT)
+ .accept(ContentType.JSON)
+ .body("test2".getBytes())
+ .when().post("/some-prefix/v1/hook")
+ .then()
+ .body(containsString("\"offset\":" + (startOffset + 1)))
+ .statusCode(200);
+ given()
+ .contentType(ContentType.TEXT)
+ .accept(ContentType.JSON)
+ .body("test3".getBytes())
+ .when().post("/some-prefix/v1/hook/sub/type/")
+ .then()
+ .body(containsString("\"offset\":" + (startOffset + 2)))
+ .statusCode(200);
+ waitBetweenPolls();
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ Iterator> it = records.iterator();
+ ConsumerRecord record1 = it.next();
+ assertEquals("github.com/Yolean/kafka-hook/mytype/with/slashes", headers(record1).get("ce_type"));
+ ConsumerRecord record2 = it.next();
+ assertEquals("github.com/Yolean/kafka-hook/", headers(record2).get("ce_type"));
+ ConsumerRecord record3 = it.next();
+ assertNotEquals("github.com/Yolean/kafka-hook/sub/type/", headers(record3).get("ce_type"));
+ assertEquals("github.com/Yolean/kafka-hook/sub/type", headers(record3).get("ce_type"));
+ }
+
@Test
public void testCloudeventsDistributedTracingExtensionWithEnvoyHeaders() {
given()