diff --git a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java index 6f1ed062..e438c2e3 100644 --- a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java +++ b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java @@ -92,11 +92,11 @@ public CompletionStage> listTopics() { * @param topicName topic name to create * @return a CompletionStage Void */ - public CompletionStage createTopic(String topicName) { + public CompletionStage createTopic(String topicName, int partitions, short replicationFactor) { LOGGER.trace("Create topic thread {}", Thread.currentThread()); - LOGGER.info("Create topic {}", topicName); + LOGGER.info("Create topic {}, partitions {}, replicationFactor {}", topicName, partitions, replicationFactor); CompletableFuture promise = new CompletableFuture<>(); - this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1))) + this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor))) .all() .whenComplete((topic, exception) -> { LOGGER.trace("Create topic callback thread {}", Thread.currentThread()); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index ac6790a7..32208a9b 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -184,8 +184,10 @@ public void doGetTopic(RoutingContext routingContext) { */ public void doCreateTopic(RoutingContext routingContext) { String topicName = routingContext.pathParam("topicname"); + int partitions = Integer.parseInt(routingContext.queryParams().get("partitions")); + short replicationFactor = Short.parseShort(routingContext.queryParams().get("replication_factor")); - this.kafkaBridgeAdmin.createTopic(topicName) + this.kafkaBridgeAdmin.createTopic(topicName, partitions, replicationFactor) .whenComplete(((topic, exception) -> { LOGGER.trace("Create topic handler thread {}", Thread.currentThread()); if (exception == null) { diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index 90113f1e..2f5749b2 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -753,7 +753,7 @@ } ] }, - "/create-topic/{topicname}": { + "/admin/topics/{topicname}": { "post": { "tags": [ "Topics" @@ -800,6 +800,24 @@ "schema": { "type": "string" } + }, + { + "name": "partitions", + "in": "query", + "description": "Number of partitions for the topic.", + "required": true, + "schema": { + "type": "integer" + } + }, + { + "name": "replication_factor", + "in": "query", + "description": "Replication factor for the topic.", + "required": true, + "schema": { + "type": "integer" + } } ] }, diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index a0089a14..9d6f53c9 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -686,7 +686,7 @@ } ] }, - "/create-topic/{topicname}": { + "/admin/topics/{topicname}": { "post": { "tags": [ "Topics" @@ -725,6 +725,24 @@ "schema": { "type": "string" } + }, + { + "name": "partitions", + "in": "query", + "description": "Number of partitions for the topic.", + "required": true, + "schema": { + "type": "integer" + } + }, + { + "name": "replication_factor", + "in": "query", + "description": "Replication factor for the topic.", + "required": true, + "schema": { + "type": "integer" + } } ] }, diff --git a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java index 9b690569..43be8805 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java @@ -219,7 +219,9 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun @Test void createTopicTest(VertxTestContext context) { baseService() - .postRequest("/create-topic/" + topic) + .postRequest("/admin/topics/" + topic) + .addQueryParam("partitions", "1") + .addQueryParam("replication_factor", "1") .as(BodyCodec.jsonArray()) .send(ar -> { context.verify(() -> { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java index 6f3d395a..6b9f9eef 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java @@ -151,8 +151,8 @@ void openapiTest(VertxTestContext context) { assertThat(paths.containsKey("/topics/{topicname}/partitions/{partitionid}/offsets"), is(true)); assertThat(paths.containsKey("/topics/{topicname}/partitions"), is(true)); assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/topics/{topicname}/partitions/{partitionid}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.SEND_TO_PARTITION.toString())); - assertThat(paths.containsKey("/create-topic/{topicname}"), is(true)); - assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/create-topic/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString())); + assertThat(paths.containsKey("/admin/topics/{topicname}"), is(true)); + assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/admin/topics/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString())); assertThat(paths.containsKey("/healthy"), is(true)); assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/healthy").getJsonObject("get").getString("operationId"), is(HttpOpenApiOperations.HEALTHY.toString())); assertThat(paths.containsKey("/ready"), is(true));