Skip to content

Commit

Permalink
Use Kafka Streams and Infinispan to build a local materialized view o…
Browse files Browse the repository at this point in the history
…f Responders.
  • Loading branch information
btison committed Sep 21, 2020
1 parent cef178e commit 4d4ad1d
Show file tree
Hide file tree
Showing 17 changed files with 450 additions and 79 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@

Simulates the movement of responders based on the messages received from the mission service.

Implemented with Quarkus
Implemented with Quarkus

#### Implementation details

The service uses KStreams to build a local materialized view of the responders by handling _ResponderCreatedEvent_ and _ResponderDeletedEvent_ messages.
KStreams is configured to use Infinispan as repository for the materialized view.
13 changes: 9 additions & 4 deletions etc/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ mp.messaging.incoming.mission-event.group.id=responder-simulator-quarkus

mp.messaging.outgoing.responder-location-update.topic=topic-responder-location-update

responder-service.url={{ responder_service_application_name }}.{{ namespace_services }}.svc:8080
responder-service.request-uri=/responder/

quarkus.infinispan-client.server-list={{ datagrid_server_list }}
quarkus.infinispan-client.auth-username={{ datagrid_username }}
quarkus.infinispan-client.auth-password={{ datagrid_password }}

infinispan.cache.responder-simulator=responder-simulator
infinispan.cache.responder-simulator=responder-simulator

quarkus.kafka-streams.bootstrap-servers={{ kafka_bootstrap_address }}
quarkus.kafka-streams.application-id=responder-simulator-kstreams
quarkus.kafka-streams.topics=topic-responder-event

infinispan.streams.store=responder-store

kafka.topic.responder-event=topic-responder-event
28 changes: 26 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<wiremock.version>2.26.3</wiremock.version>
<smallrye.version>2.0.4</smallrye.version>
<json-unit.version>2.18.0</json-unit.version>
<debezium.version>1.1.0.Final</debezium.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -55,8 +56,8 @@
<artifactId>quarkus-vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -96,6 +97,29 @@
<version>${json-unit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.math.BigDecimal;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
Expand All @@ -12,6 +11,7 @@
import com.redhat.emergency.response.responder.simulator.model.MissionStep;
import com.redhat.emergency.response.responder.simulator.model.ResponderLocation;
import com.redhat.emergency.response.responder.simulator.repository.ResponderLocationRepository;
import com.redhat.emergency.response.responder.simulator.streams.ResponderService;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -43,9 +43,6 @@ public class Simulator {
@ConfigProperty(name = "simulator.distance.base")
double baseDistance;

@ConfigProperty(name = "simulator.distance.variation")
double distanceVariation;

private final UnicastProcessor<Pair<String, ResponderLocation>> processor = UnicastProcessor.create();

@ConsumeEvent("simulator-mission-created")
Expand Down Expand Up @@ -127,20 +124,15 @@ private void processLocationUpdate(String key) {


private Uni<ResponderLocation> toResponderLocation(JsonObject json) {
return responderService.isPerson(json.getString("responderId"))
.onItem().transform(person -> {
return responderService.responder(json.getString("responderId"))
.onItem().transform(responder -> {
List<MissionStep> steps = json.getJsonArray("steps").stream().map(o -> (JsonObject) o)
.map(j -> new MissionStep(new Coordinates(BigDecimal.valueOf(j.getDouble("lat")), BigDecimal.valueOf(j.getDouble("lon"))),
j.getBoolean("wayPoint"), j.getBoolean("destination"))).collect(Collectors.toList());
Coordinates currentPosition = new Coordinates(BigDecimal.valueOf(json.getDouble("responderStartLat")),
BigDecimal.valueOf((json.getDouble("responderStartLong"))));
return new ResponderLocation(json.getString("id"), json.getString("responderId"), json.getString("incidentId"), steps, currentPosition, person, distanceUnit());
return new ResponderLocation(json.getString("id"), json.getString("responderId"), json.getString("incidentId"),
steps, currentPosition, responder.getBoolean("person", false), responder.getDouble("distanceUnit", baseDistance));
});
}

private double distanceUnit() {
double max = baseDistance * (1 + distanceVariation);
double min = baseDistance * (1 - distanceVariation);
return ThreadLocalRandom.current().nextDouble(max - min) + min;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.redhat.emergency.response.responder.simulator.repository;
package com.redhat.emergency.response.responder.simulator.infinispan;

import org.infinispan.commons.configuration.BasicConfiguration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import javax.enterprise.event.Observes;
import javax.inject.Inject;

import com.redhat.emergency.response.responder.simulator.infinispan.Configuration;
import com.redhat.emergency.response.responder.simulator.model.ResponderLocation;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.json.Json;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.redhat.emergency.response.responder.simulator.streams;

public class ResponderNotFoundException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.redhat.emergency.response.responder.simulator.streams;

import java.time.Duration;
import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ResponderService {

private static final Logger log = LoggerFactory.getLogger(ResponderService.class);

@Inject
KafkaStreams streams;

@ConfigProperty(name = "infinispan.streams.store", defaultValue = "responder-store")
String storeName;

public Uni<JsonObject> responder(String id) {
return Uni.createFrom().item(() -> doGetResponder(id))
.onFailure(ResponderNotFoundException.class).retry().withBackOff(Duration.ofMillis(500), Duration.ofMillis(1000)).atMost(10)
.onFailure().recoverWithItem((Supplier<JsonObject>) JsonObject::new);
}

private JsonObject doGetResponder(String id) {
String responderStr = responderStore().get(id);
if (responderStr == null) {
log.warn("Responder with id " + id + " not found in the responder store");
throw new ResponderNotFoundException();
}
return new JsonObject(responderStr);
}

private ReadOnlyKeyValueStore<String, String> responderStore() {
while (true) {
try {
return streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.redhat.emergency.response.responder.simulator.streams;

import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import com.redhat.emergency.response.responder.simulator.streams.infinispan.InfinispanKeyValueStoreSupplier;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class TopologyProducer {

private static final Logger log = LoggerFactory.getLogger(TopologyProducer.class);

private static final String RESPONDERS_CREATED_EVENT = "RespondersCreatedEvent";
private static final String RESPONDERS_DELETED_EVENT = "RespondersDeletedEvent";
private static final String[] ACCEPTED_MESSAGE_TYPES = {RESPONDERS_CREATED_EVENT, RESPONDERS_DELETED_EVENT};

private static final String ACTION_CREATE = "create";
private static final String ACTION_DELETE = "delete";


@ConfigProperty(name = "kafka.topic.responder-event")
String responderEventTopic;

@ConfigProperty(name = "simulator.distance.base")
double baseDistance;

@ConfigProperty(name = "simulator.distance.variation")
double distanceVariation;

@Inject
InfinispanKeyValueStoreSupplier keyValueStoreSupplier;

@Produces
public Topology buildTopology() {

StreamsBuilder builder = new StreamsBuilder();

builder.stream(responderEventTopic, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
try {
JsonObject json = new JsonObject(value);
log.debug("Processing message: " + value);
return json;
} catch (Exception e) {
log.warn("Unexpected message which is not a valid JSON object");
return new JsonObject();
}
})
.filter((key, value) -> {
String messageType = value.getString("messageType");
if (Arrays.asList(ACCEPTED_MESSAGE_TYPES).contains(messageType)) {
return true;
}
log.debug("Message with type '" + messageType + "' is ignored");
return false;
})
.flatMapValues((ValueMapper<JsonObject, Iterable<JsonObject>>) value -> {
if (RESPONDERS_CREATED_EVENT.equals(value.getString("messageType"))) {
JsonArray responders = value.getJsonObject("body").getJsonArray("responders");
return responders.stream().map(o -> (JsonObject)o).map(resp -> new JsonObject().put("action", ACTION_CREATE)
.put("id", resp.getString("id"))
.put("responder", resp.put("distanceUnit", distanceUnit())))
.collect(Collectors.toList());
} else {
JsonArray responderIds = value.getJsonObject("body").getJsonArray("responders");
return responderIds.stream().map(id -> new JsonObject().put("action", ACTION_DELETE)
.put("id", id)).collect(Collectors.toList());
}
})
.map((KeyValueMapper<String, JsonObject, KeyValue<String, String>>) (key, value) -> {
if (ACTION_CREATE.equalsIgnoreCase(value.getString("action"))) {
return new KeyValue<>(value.getString("id"), value.getJsonObject("responder").encode());
} else {
return new KeyValue<>(value.getString("id"), null);
}
})
.toTable(Materialized.<String, String>as(keyValueStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));

return builder.build();
}

private double distanceUnit() {
double max = baseDistance * (1 + distanceVariation);
double min = baseDistance * (1 - distanceVariation);
return ThreadLocalRandom.current().nextDouble(max - min) + min;
}
}
Loading

0 comments on commit 4d4ad1d

Please sign in to comment.