From 4d4ad1df89c4bf9732913a3d9404e109cc681514 Mon Sep 17 00:00:00 2001 From: Bernard Tison Date: Sun, 20 Sep 2020 18:03:06 +0200 Subject: [PATCH] Use Kafka Streams and Infinispan to build a local materialized view of Responders. --- README.md | 7 +- etc/application.properties | 13 +- pom.xml | 28 +++- .../responder/simulator/ResponderService.java | 55 ------- .../responder/simulator/Simulator.java | 18 +-- .../Configuration.java | 2 +- .../ResponderLocationRepository.java | 1 + .../streams/ResponderNotFoundException.java | 4 + .../simulator/streams/ResponderService.java | 55 +++++++ .../simulator/streams/TopologyProducer.java | 104 ++++++++++++ .../infinispan/InfinispanKeyValueStore.java | 148 ++++++++++++++++++ .../InfinispanKeyValueStoreSupplier.java | 30 ++++ src/main/resources/application.properties | 5 + .../simulator/MissionEventSourceTest.java | 3 + .../responder/simulator/RestApiTest.java | 5 +- .../simulator/streams/KafkaResource.java | 41 +++++ src/test/resources/application.properties | 10 +- 17 files changed, 450 insertions(+), 79 deletions(-) delete mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/ResponderService.java rename src/main/java/com/redhat/emergency/response/responder/simulator/{repository => infinispan}/Configuration.java (95%) create mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderNotFoundException.java create mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderService.java create mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/streams/TopologyProducer.java create mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStore.java create mode 100644 src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStoreSupplier.java create mode 100644 src/test/java/com/redhat/emergency/response/responder/simulator/streams/KafkaResource.java diff --git a/README.md b/README.md index a762f36..b55bd14 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,9 @@ Simulates the movement of responders based on the messages received from the mission service. -Implemented with Quarkus \ No newline at end of file +Implemented with Quarkus + +#### Implementation details + +The service uses KStreams to build a local materialized view of the responders by handling _ResponderCreatedEvent_ and _ResponderDeletedEvent_ messages. +KStreams is configured to use Infinispan as repository for the materialized view. diff --git a/etc/application.properties b/etc/application.properties index 54b9a0d..0aad6ac 100644 --- a/etc/application.properties +++ b/etc/application.properties @@ -5,11 +5,16 @@ mp.messaging.incoming.mission-event.group.id=responder-simulator-quarkus mp.messaging.outgoing.responder-location-update.topic=topic-responder-location-update -responder-service.url={{ responder_service_application_name }}.{{ namespace_services }}.svc:8080 -responder-service.request-uri=/responder/ - quarkus.infinispan-client.server-list={{ datagrid_server_list }} quarkus.infinispan-client.auth-username={{ datagrid_username }} quarkus.infinispan-client.auth-password={{ datagrid_password }} -infinispan.cache.responder-simulator=responder-simulator \ No newline at end of file +infinispan.cache.responder-simulator=responder-simulator + +quarkus.kafka-streams.bootstrap-servers={{ kafka_bootstrap_address }} +quarkus.kafka-streams.application-id=responder-simulator-kstreams +quarkus.kafka-streams.topics=topic-responder-event + +infinispan.streams.store=responder-store + +kafka.topic.responder-event=topic-responder-event \ No newline at end of file diff --git a/pom.xml b/pom.xml index 94d2c57..eae639f 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 2.26.3 2.0.4 2.18.0 + 1.1.0.Final @@ -55,8 +56,8 @@ quarkus-vertx-web - io.smallrye.reactive - smallrye-mutiny-vertx-web-client + io.quarkus + quarkus-kafka-streams org.apache.commons @@ -96,6 +97,29 @@ ${json-unit.version} test + + org.apache.kafka + kafka-streams-test-utils + test + + + io.debezium + debezium-core + ${debezium.version} + test + + + io.debezium + debezium-core + ${debezium.version} + test-jar + test + + + org.apache.kafka + kafka_2.12 + test + diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/ResponderService.java b/src/main/java/com/redhat/emergency/response/responder/simulator/ResponderService.java deleted file mode 100644 index cc835a0..0000000 --- a/src/main/java/com/redhat/emergency/response/responder/simulator/ResponderService.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.redhat.emergency.response.responder.simulator; - -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Observes; -import javax.inject.Inject; - -import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.Uni; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.WebClientOptions; -import io.vertx.mutiny.core.Vertx; -import io.vertx.mutiny.ext.web.client.WebClient; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@ApplicationScoped -public class ResponderService { - - private static final Logger log = LoggerFactory.getLogger(ResponderService.class); - - @ConfigProperty(name = "responder-service.url") - String serviceUrl; - - @ConfigProperty(name = "responder-service.request-uri") - String requestUri; - - @Inject - Vertx vertx; - - private WebClient client; - - void onStart(@Observes StartupEvent e) { - int servicePort = serviceUrl.contains(":") ? Integer.parseInt(serviceUrl.substring(serviceUrl.indexOf(":") + 1)) : 8080; - String serviceHost = serviceUrl.contains(":") ? serviceUrl.substring(0, serviceUrl.indexOf(":")) : serviceUrl; - client = WebClient.create(vertx, new WebClientOptions().setDefaultHost(serviceHost).setDefaultPort(servicePort).setMaxPoolSize(100).setHttp2MaxPoolSize(100)); - } - - public Uni isPerson(String responderId) { - return client.get(requestUri + responderId).send().onItem().transform(resp -> { - if (resp.statusCode() != 200) { - log.error("Error when calling responder service. Return code = " + resp.statusCode()); - return false; - } - JsonObject json = resp.bodyAsJsonObject(); - if (!json.containsKey("person")) { - log.error("Error when calling responder service. Response does not contain property 'person'. Response: " + json.encode()); - return false; - } - log.debug("Responder " + responderId + ": person = " + json.getBoolean("person")); - return json.getBoolean("person"); - }); - } - -} diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/Simulator.java b/src/main/java/com/redhat/emergency/response/responder/simulator/Simulator.java index 8fcce3a..61402a2 100644 --- a/src/main/java/com/redhat/emergency/response/responder/simulator/Simulator.java +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/Simulator.java @@ -3,7 +3,6 @@ import java.math.BigDecimal; import java.time.Duration; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @@ -12,6 +11,7 @@ import com.redhat.emergency.response.responder.simulator.model.MissionStep; import com.redhat.emergency.response.responder.simulator.model.ResponderLocation; import com.redhat.emergency.response.responder.simulator.repository.ResponderLocationRepository; +import com.redhat.emergency.response.responder.simulator.streams.ResponderService; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -43,9 +43,6 @@ public class Simulator { @ConfigProperty(name = "simulator.distance.base") double baseDistance; - @ConfigProperty(name = "simulator.distance.variation") - double distanceVariation; - private final UnicastProcessor> processor = UnicastProcessor.create(); @ConsumeEvent("simulator-mission-created") @@ -127,20 +124,15 @@ private void processLocationUpdate(String key) { private Uni toResponderLocation(JsonObject json) { - return responderService.isPerson(json.getString("responderId")) - .onItem().transform(person -> { + return responderService.responder(json.getString("responderId")) + .onItem().transform(responder -> { List steps = json.getJsonArray("steps").stream().map(o -> (JsonObject) o) .map(j -> new MissionStep(new Coordinates(BigDecimal.valueOf(j.getDouble("lat")), BigDecimal.valueOf(j.getDouble("lon"))), j.getBoolean("wayPoint"), j.getBoolean("destination"))).collect(Collectors.toList()); Coordinates currentPosition = new Coordinates(BigDecimal.valueOf(json.getDouble("responderStartLat")), BigDecimal.valueOf((json.getDouble("responderStartLong")))); - return new ResponderLocation(json.getString("id"), json.getString("responderId"), json.getString("incidentId"), steps, currentPosition, person, distanceUnit()); + return new ResponderLocation(json.getString("id"), json.getString("responderId"), json.getString("incidentId"), + steps, currentPosition, responder.getBoolean("person", false), responder.getDouble("distanceUnit", baseDistance)); }); } - - private double distanceUnit() { - double max = baseDistance * (1 + distanceVariation); - double min = baseDistance * (1 - distanceVariation); - return ThreadLocalRandom.current().nextDouble(max - min) + min; - } } diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/repository/Configuration.java b/src/main/java/com/redhat/emergency/response/responder/simulator/infinispan/Configuration.java similarity index 95% rename from src/main/java/com/redhat/emergency/response/responder/simulator/repository/Configuration.java rename to src/main/java/com/redhat/emergency/response/responder/simulator/infinispan/Configuration.java index d2c7812..75e293d 100644 --- a/src/main/java/com/redhat/emergency/response/responder/simulator/repository/Configuration.java +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/infinispan/Configuration.java @@ -1,4 +1,4 @@ -package com.redhat.emergency.response.responder.simulator.repository; +package com.redhat.emergency.response.responder.simulator.infinispan; import org.infinispan.commons.configuration.BasicConfiguration; diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/repository/ResponderLocationRepository.java b/src/main/java/com/redhat/emergency/response/responder/simulator/repository/ResponderLocationRepository.java index f379fe0..8b80636 100644 --- a/src/main/java/com/redhat/emergency/response/responder/simulator/repository/ResponderLocationRepository.java +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/repository/ResponderLocationRepository.java @@ -4,6 +4,7 @@ import javax.enterprise.event.Observes; import javax.inject.Inject; +import com.redhat.emergency.response.responder.simulator.infinispan.Configuration; import com.redhat.emergency.response.responder.simulator.model.ResponderLocation; import io.quarkus.runtime.StartupEvent; import io.vertx.core.json.Json; diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderNotFoundException.java b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderNotFoundException.java new file mode 100644 index 0000000..418609e --- /dev/null +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderNotFoundException.java @@ -0,0 +1,4 @@ +package com.redhat.emergency.response.responder.simulator.streams; + +public class ResponderNotFoundException extends RuntimeException { +} diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderService.java b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderService.java new file mode 100644 index 0000000..5fc5cda --- /dev/null +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/ResponderService.java @@ -0,0 +1,55 @@ +package com.redhat.emergency.response.responder.simulator.streams; + +import java.time.Duration; +import java.util.function.Supplier; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import io.smallrye.mutiny.Uni; +import io.vertx.core.json.JsonObject; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ResponderService { + + private static final Logger log = LoggerFactory.getLogger(ResponderService.class); + + @Inject + KafkaStreams streams; + + @ConfigProperty(name = "infinispan.streams.store", defaultValue = "responder-store") + String storeName; + + public Uni responder(String id) { + return Uni.createFrom().item(() -> doGetResponder(id)) + .onFailure(ResponderNotFoundException.class).retry().withBackOff(Duration.ofMillis(500), Duration.ofMillis(1000)).atMost(10) + .onFailure().recoverWithItem((Supplier) JsonObject::new); + } + + private JsonObject doGetResponder(String id) { + String responderStr = responderStore().get(id); + if (responderStr == null) { + log.warn("Responder with id " + id + " not found in the responder store"); + throw new ResponderNotFoundException(); + } + return new JsonObject(responderStr); + } + + private ReadOnlyKeyValueStore responderStore() { + while (true) { + try { + return streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); + } catch (InvalidStateStoreException e) { + // ignore, store not ready yet + } + } + } + +} diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/streams/TopologyProducer.java b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/TopologyProducer.java new file mode 100644 index 0000000..404616b --- /dev/null +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/TopologyProducer.java @@ -0,0 +1,104 @@ +package com.redhat.emergency.response.responder.simulator.streams; + +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Produces; +import javax.inject.Inject; + +import com.redhat.emergency.response.responder.simulator.streams.infinispan.InfinispanKeyValueStoreSupplier; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class TopologyProducer { + + private static final Logger log = LoggerFactory.getLogger(TopologyProducer.class); + + private static final String RESPONDERS_CREATED_EVENT = "RespondersCreatedEvent"; + private static final String RESPONDERS_DELETED_EVENT = "RespondersDeletedEvent"; + private static final String[] ACCEPTED_MESSAGE_TYPES = {RESPONDERS_CREATED_EVENT, RESPONDERS_DELETED_EVENT}; + + private static final String ACTION_CREATE = "create"; + private static final String ACTION_DELETE = "delete"; + + + @ConfigProperty(name = "kafka.topic.responder-event") + String responderEventTopic; + + @ConfigProperty(name = "simulator.distance.base") + double baseDistance; + + @ConfigProperty(name = "simulator.distance.variation") + double distanceVariation; + + @Inject + InfinispanKeyValueStoreSupplier keyValueStoreSupplier; + + @Produces + public Topology buildTopology() { + + StreamsBuilder builder = new StreamsBuilder(); + + builder.stream(responderEventTopic, Consumed.with(Serdes.String(), Serdes.String())) + .mapValues(value -> { + try { + JsonObject json = new JsonObject(value); + log.debug("Processing message: " + value); + return json; + } catch (Exception e) { + log.warn("Unexpected message which is not a valid JSON object"); + return new JsonObject(); + } + }) + .filter((key, value) -> { + String messageType = value.getString("messageType"); + if (Arrays.asList(ACCEPTED_MESSAGE_TYPES).contains(messageType)) { + return true; + } + log.debug("Message with type '" + messageType + "' is ignored"); + return false; + }) + .flatMapValues((ValueMapper>) value -> { + if (RESPONDERS_CREATED_EVENT.equals(value.getString("messageType"))) { + JsonArray responders = value.getJsonObject("body").getJsonArray("responders"); + return responders.stream().map(o -> (JsonObject)o).map(resp -> new JsonObject().put("action", ACTION_CREATE) + .put("id", resp.getString("id")) + .put("responder", resp.put("distanceUnit", distanceUnit()))) + .collect(Collectors.toList()); + } else { + JsonArray responderIds = value.getJsonObject("body").getJsonArray("responders"); + return responderIds.stream().map(id -> new JsonObject().put("action", ACTION_DELETE) + .put("id", id)).collect(Collectors.toList()); + } + }) + .map((KeyValueMapper>) (key, value) -> { + if (ACTION_CREATE.equalsIgnoreCase(value.getString("action"))) { + return new KeyValue<>(value.getString("id"), value.getJsonObject("responder").encode()); + } else { + return new KeyValue<>(value.getString("id"), null); + } + }) + .toTable(Materialized.as(keyValueStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())); + + return builder.build(); + } + + private double distanceUnit() { + double max = baseDistance * (1 + distanceVariation); + double min = baseDistance * (1 - distanceVariation); + return ThreadLocalRandom.current().nextDouble(max - min) + min; + } +} diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStore.java b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStore.java new file mode 100644 index 0000000..6b779cb --- /dev/null +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStore.java @@ -0,0 +1,148 @@ +package com.redhat.emergency.response.responder.simulator.streams.infinispan; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; +import javax.inject.Inject; + +import com.redhat.emergency.response.responder.simulator.infinispan.Configuration; +import io.quarkus.runtime.StartupEvent; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class InfinispanKeyValueStore implements KeyValueStore { + + private static final Logger log = LoggerFactory.getLogger(InfinispanKeyValueStore.class); + + private volatile boolean open = false; + + @Inject + RemoteCacheManager cacheManager; + + @ConfigProperty(name = "infinispan.streams.store", defaultValue = "responder-store") + String storeName; + + @ConfigProperty(name = "infinispan.cache.create.lazy", defaultValue = "false") + boolean lazy; + + volatile RemoteCache cache; + + void onStart(@Observes StartupEvent e) { + // do not initialize the cache at startup when remote cache is not available, e.g. in QuarkusTests + if (!lazy) { + log.info("Creating remote cache"); + cache = initCache(); + } + } + + @Override + public void put(Bytes key, byte[] value) { + if (value == null) { + delete(key); + } else { + getCache().putIfAbsent(key.get(), value); + } + + } + + @Override + public byte[] putIfAbsent(Bytes key, byte[] value) { + return getCache().putIfAbsent(key.get(), value); + } + + @Override + public void putAll(List> entries) { + if (entries == null) { + return; + } + Map keyValueMap = entries.stream().collect(Collectors.toMap(k -> k.key.get(), v -> v.value)); + getCache().putAll(keyValueMap); + } + + @Override + public byte[] delete(Bytes key) { + return getCache().remove(key.get()); + } + + @Override + public String name() { + return storeName; + } + + @Override + public void init(ProcessorContext context, StateStore root) { + if (root != null) { + context.register(root, (key, value) -> put(Bytes.wrap(key), value)); + } + open = true; + } + + @Override + public void flush() { + // do nothing + } + + @Override + public void close() { + // do nothing; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public byte[] get(Bytes key) { + return getCache().get(key.get()); + } + + @Override + public KeyValueIterator range(Bytes from, Bytes to) { + throw new UnsupportedOperationException("Method range not implemented)"); + } + + @Override + public KeyValueIterator all() { + throw new UnsupportedOperationException("Method all not implemented)"); + } + + @Override + public long approximateNumEntries() { + return getCache().size(); + } + + private RemoteCache getCache() { + RemoteCache cache = this.cache; + if (cache == null) { + synchronized(this) { + if (this.cache == null) { + this.cache = cache = initCache(); + } + } + } + return cache; + } + + private RemoteCache initCache() { + Configuration configuration = Configuration.builder().name(storeName).mode("SYNC").owners(2).build(); + return cacheManager.administration().getOrCreateCache(storeName, configuration); + } +} diff --git a/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStoreSupplier.java b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStoreSupplier.java new file mode 100644 index 0000000..4bf87dd --- /dev/null +++ b/src/main/java/com/redhat/emergency/response/responder/simulator/streams/infinispan/InfinispanKeyValueStoreSupplier.java @@ -0,0 +1,30 @@ +package com.redhat.emergency.response.responder.simulator.streams.infinispan; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; + +@ApplicationScoped +public class InfinispanKeyValueStoreSupplier implements KeyValueBytesStoreSupplier { + + @Inject + InfinispanKeyValueStore keyValueStore; + + @Override + public String name() { + return keyValueStore.name(); + } + + @Override + public KeyValueStore get() { + return keyValueStore; + } + + @Override + public String metricsScope() { + return keyValueStore.name(); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7add276..51e7a50 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -19,3 +19,8 @@ simulator.delay=10000 simulator.distance.base=1500.0 simulator.distance.variation=0.2 +kafka-streams.cache.max.bytes.buffering=10240 +kafka-streams.commit.interval.ms=1000 +kafka-streams.metadata.max.age.ms=500 +kafka-streams.auto.offset.reset=earliest + diff --git a/src/test/java/com/redhat/emergency/response/responder/simulator/MissionEventSourceTest.java b/src/test/java/com/redhat/emergency/response/responder/simulator/MissionEventSourceTest.java index 06bc1e4..69bdf96 100644 --- a/src/test/java/com/redhat/emergency/response/responder/simulator/MissionEventSourceTest.java +++ b/src/test/java/com/redhat/emergency/response/responder/simulator/MissionEventSourceTest.java @@ -12,6 +12,8 @@ import javax.inject.Inject; +import com.redhat.emergency.response.responder.simulator.streams.KafkaResource; +import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.mockito.InjectMock; import io.vertx.core.json.JsonObject; @@ -22,6 +24,7 @@ import org.mockito.Captor; @QuarkusTest +@QuarkusTestResource(KafkaResource.class) public class MissionEventSourceTest { @InjectMock diff --git a/src/test/java/com/redhat/emergency/response/responder/simulator/RestApiTest.java b/src/test/java/com/redhat/emergency/response/responder/simulator/RestApiTest.java index a93d9e8..c1d1692 100644 --- a/src/test/java/com/redhat/emergency/response/responder/simulator/RestApiTest.java +++ b/src/test/java/com/redhat/emergency/response/responder/simulator/RestApiTest.java @@ -9,6 +9,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.openMocks; +import com.redhat.emergency.response.responder.simulator.streams.KafkaResource; +import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.mockito.InjectMock; import io.restassured.RestAssured; @@ -20,6 +22,7 @@ import org.mockito.Captor; @QuarkusTest +@QuarkusTestResource(KafkaResource.class) public class RestApiTest { @InjectMock @@ -66,6 +69,4 @@ void testMissionStatusMissingFields() { .statusCode(500); verify(simulator, never()).processResponderLocationStatus(any()); } - - } diff --git a/src/test/java/com/redhat/emergency/response/responder/simulator/streams/KafkaResource.java b/src/test/java/com/redhat/emergency/response/responder/simulator/streams/KafkaResource.java new file mode 100644 index 0000000..3f61ecf --- /dev/null +++ b/src/test/java/com/redhat/emergency/response/responder/simulator/streams/KafkaResource.java @@ -0,0 +1,41 @@ +package com.redhat.emergency.response.responder.simulator.streams; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import io.debezium.kafka.KafkaCluster; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaResource implements QuarkusTestResourceLifecycleManager { + + private KafkaCluster kafka; + + @Override + public Map start() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connection.timeout.ms", "45000"); + File directory = Testing.Files.createTestingDirectory("kafka-data", true); + kafka = new KafkaCluster().withPorts(2182, 9092) + .addBrokers(1) + .usingDirectory(directory) + .deleteDataUponShutdown(true) + .withKafkaConfiguration(props) + .deleteDataPriorToStartup(true) + .startup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return Collections.emptyMap(); + } + + @Override + public void stop() { + if (kafka != null) { + kafka.shutdown(); + } + } +} diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index d9d7735..ffec8a0 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -7,4 +7,12 @@ simulator.distance.variation=0.2 responder-service.url=localhost:8080 responder-service.request-uri=/responder/ -infinispan.cache.create.lazy=true \ No newline at end of file +infinispan.cache.create.lazy=true + +quarkus.kafka-streams.bootstrap-servers=localhost:9092 +quarkus.kafka-streams.application-id=responder-simulator +quarkus.kafka-streams.topics=topic-responder-event + +kafka-streams.state.dir=target/data/kafka-data/stores + +kafka.topic.responder-event=topic-responder-event \ No newline at end of file