From cdc3849cfc37b75a5ec498467730fa9085ceb7ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 8 Jan 2025 16:23:40 +0100 Subject: [PATCH] #654 Some code cleanup --- gateleen-kafka/README_kafka.md | 2 +- .../gateleen/kafka/KafkaMessageSender.java | 5 +- .../kafka/KafkaProducerRepository.java | 6 +- .../gateleen/kafka/KafkaHandlerTest.java | 4 +- .../kafka/KafkaProducerRecordBuilderTest.java | 96 +++++++++++-------- 5 files changed, 62 insertions(+), 51 deletions(-) diff --git a/gateleen-kafka/README_kafka.md b/gateleen-kafka/README_kafka.md index 3d9d61c7a..09c0cc44f 100644 --- a/gateleen-kafka/README_kafka.md +++ b/gateleen-kafka/README_kafka.md @@ -48,7 +48,7 @@ The following topic configuration values are required: Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs). ## Usage -To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all +To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java). The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/` diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java index 584a92a7b..dced28eb9 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java @@ -43,12 +43,11 @@ Future sendMessages(KafkaProducer kafkaProducer, Promise promise = Promise.promise(); log.debug("Start processing {} messages for kafka", messages.size()); - @SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627 - List futures = messages.stream() + List> futures = messages.stream() .map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message)) .collect(toList()); - CompositeFuture.all(futures).mapEmpty().onComplete(result -> { + Future.all(futures).mapEmpty().onComplete(result -> { if (result.succeeded()) { promise.complete(); log.debug("Batch messages successfully sent to Kafka."); diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java index 445a85bf3..f9408b0dd 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java @@ -46,10 +46,10 @@ Optional, Pattern>> findMatchingKafkaProducer Promise closeAll() { log.info("About to close all kafka producers"); Promise promise = Promise.promise(); - List futures = new ArrayList<>(); + List> futures = new ArrayList<>(); for (Map.Entry> entry : kafkaProducers.entrySet()) { - Promise entryFuture = Promise.promise(); + Promise entryFuture = Promise.promise(); futures.add(entryFuture.future()); entry.getValue().close(event -> { if (event.succeeded()) { @@ -62,7 +62,7 @@ Promise closeAll() { } // wait for all producers to be closed - CompositeFuture.all(futures).onComplete(event -> { + Future.all(futures).onComplete(event -> { kafkaProducers.clear(); promise.complete(); }); diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java index 514a1d7fc..f971a0e84 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java @@ -2,7 +2,6 @@ import io.vertx.core.Future; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -30,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static java.lang.Thread.currentThread; @@ -66,7 +64,7 @@ public class KafkaHandlerTest { private ConfigurationResourceManager configurationResourceManager; private KafkaHandler handler; private MockResourceStorage storage; - private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory(); + private final GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory(); private Vertx vertxMock; private final String configResourceUri = "/kafka/topicsConfig"; diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java index 3cb7a63c3..382fbec23 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java @@ -8,15 +8,14 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.swisspush.gateleen.core.util.JsonObjectUtils; import org.swisspush.gateleen.validation.ValidationException; import java.util.List; +import static org.junit.Assert.assertThrows; import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords; /** @@ -27,35 +26,40 @@ @RunWith(VertxUnitRunner.class) public class KafkaProducerRecordBuilderTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test - public void buildRecordsInvalidJson() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Error while parsing payload"); - buildRecords("myTopic", Buffer.buffer("notValidJson")); + public void buildRecordsInvalidJson(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("notValidJson")); + }); + + context.assertEquals("Error while parsing payload", exception.getMessage()); } @Test - public void buildRecordsMissingRecordsArray() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Missing 'records' array"); - buildRecords("myTopic", Buffer.buffer("{}")); + public void buildRecordsMissingRecordsArray(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{}")); + }); + + context.assertEquals("Missing 'records' array", exception.getMessage()); } @Test - public void buildRecordsNotArray() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); + public void buildRecordsNotArray(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); + }); + + context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage()); } @Test - public void buildRecordsInvalidRecordsType() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}")); + public void buildRecordsInvalidRecordsType(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}")); + }); + + context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage()); } @Test @@ -66,38 +70,48 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation } @Test - public void buildRecordsInvalidKeyType() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'key' must be of type String"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); + public void buildRecordsInvalidKeyType(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); + }); + + context.assertEquals("Property 'key' must be of type String", exception.getMessage()); } @Test - public void buildRecordsInvalidValueType() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'value' must be of type JsonObject"); - buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); + public void buildRecordsInvalidValueType(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); + }); + + context.assertEquals("Property 'value' must be of type JsonObject", exception.getMessage()); } @Test - public void buildRecordsMissingValue() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'value' is required"); - buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}")); + public void buildRecordsMissingValue(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}")); + }); + + context.assertEquals("Property 'value' is required", exception.getMessage()); } @Test - public void buildRecordsInvalidHeadersType() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'headers' must be of type JsonObject"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); + public void buildRecordsInvalidHeadersType(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); + }); + + context.assertEquals("Property 'headers' must be of type JsonObject", exception.getMessage()); } @Test - public void buildRecordsInvalidHeadersValueType() throws ValidationException { - thrown.expect( ValidationException.class ); - thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); + public void buildRecordsInvalidHeadersValueType(TestContext context) { + Exception exception = assertThrows(ValidationException.class, () -> { + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); + }); + + context.assertEquals("Property 'headers' must be of type JsonObject holding String values only", exception.getMessage()); } @Test