Skip to content

Commit

Permalink
#654 Some code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Jan 8, 2025
1 parent 3396e85 commit cdc3849
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 51 deletions.
2 changes: 1 addition & 1 deletion gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following topic configuration values are required:
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).

## Usage
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).

The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
Promise<Void> promise = Promise.promise();
log.debug("Start processing {} messages for kafka", messages.size());

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = messages.stream()
List<Future<Void>> futures = messages.stream()
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
.collect(toList());

CompositeFuture.all(futures).<Void>mapEmpty().onComplete(result -> {
Future.all(futures).<Void>mapEmpty().onComplete(result -> {
if (result.succeeded()) {
promise.complete();
log.debug("Batch messages successfully sent to Kafka.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ Optional<Pair<KafkaProducer<String, String>, Pattern>> findMatchingKafkaProducer
Promise<Void> closeAll() {
log.info("About to close all kafka producers");
Promise<Void> promise = Promise.promise();
List<Future> futures = new ArrayList<>();
List<Future<Void>> futures = new ArrayList<>();

for (Map.Entry<Pattern, KafkaProducer<String, String>> entry : kafkaProducers.entrySet()) {
Promise entryFuture = Promise.promise();
Promise<Void> entryFuture = Promise.promise();
futures.add(entryFuture.future());
entry.getValue().close(event -> {
if (event.succeeded()) {
Expand All @@ -62,7 +62,7 @@ Promise<Void> closeAll() {
}

// wait for all producers to be closed
CompositeFuture.all(futures).onComplete(event -> {
Future.all(futures).onComplete(event -> {
kafkaProducers.clear();
promise.complete();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -30,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

import static java.lang.Thread.currentThread;
Expand Down Expand Up @@ -66,7 +64,7 @@ public class KafkaHandlerTest {
private ConfigurationResourceManager configurationResourceManager;
private KafkaHandler handler;
private MockResourceStorage storage;
private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
private final GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
private Vertx vertxMock;

private final String configResourceUri = "/kafka/topicsConfig";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.swisspush.gateleen.core.util.JsonObjectUtils;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.List;

import static org.junit.Assert.assertThrows;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;

/**
Expand All @@ -27,35 +26,40 @@
@RunWith(VertxUnitRunner.class)
public class KafkaProducerRecordBuilderTest {

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void buildRecordsInvalidJson() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Error while parsing payload");
buildRecords("myTopic", Buffer.buffer("notValidJson"));
public void buildRecordsInvalidJson(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("notValidJson"));
});

context.assertEquals("Error while parsing payload", exception.getMessage());
}

@Test
public void buildRecordsMissingRecordsArray() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Missing 'records' array");
buildRecords("myTopic", Buffer.buffer("{}"));
public void buildRecordsMissingRecordsArray(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{}"));
});

context.assertEquals("Missing 'records' array", exception.getMessage());
}

@Test
public void buildRecordsNotArray() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
public void buildRecordsNotArray(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
});

context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
}

@Test
public void buildRecordsInvalidRecordsType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
public void buildRecordsInvalidRecordsType(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
});

context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
}

@Test
Expand All @@ -66,38 +70,48 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation
}

@Test
public void buildRecordsInvalidKeyType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'key' must be of type String");
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
public void buildRecordsInvalidKeyType(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
});

context.assertEquals("Property 'key' must be of type String", exception.getMessage());
}

@Test
public void buildRecordsInvalidValueType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'value' must be of type JsonObject");
buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
public void buildRecordsInvalidValueType(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
});

context.assertEquals("Property 'value' must be of type JsonObject", exception.getMessage());
}

@Test
public void buildRecordsMissingValue() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'value' is required");
buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
public void buildRecordsMissingValue(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
});

context.assertEquals("Property 'value' is required", exception.getMessage());
}

@Test
public void buildRecordsInvalidHeadersType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'headers' must be of type JsonObject");
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
public void buildRecordsInvalidHeadersType(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
});

context.assertEquals("Property 'headers' must be of type JsonObject", exception.getMessage());
}

@Test
public void buildRecordsInvalidHeadersValueType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only");
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
public void buildRecordsInvalidHeadersValueType(TestContext context) {
Exception exception = assertThrows(ValidationException.class, () -> {
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
});

context.assertEquals("Property 'headers' must be of type JsonObject holding String values only", exception.getMessage());
}

@Test
Expand Down

0 comments on commit cdc3849

Please sign in to comment.