From 516d028cc6fe99e26c0ad0f3f49aca36e8949594 Mon Sep 17 00:00:00 2001 From: Harish Yayi Date: Wed, 15 Apr 2020 11:05:23 -0400 Subject: [PATCH] java-spring-boot2: add kafka template (#742) * java-spring-boot2 - add kafka template * parameterize KafkaTemplate and some clean up * java-spring-boot2 - changes in README.md * java-spring-boot2 - update readme with TLS support * java-spring-boot2 - clean up Co-authored-by: Sandy Koh --- incubator/java-spring-boot2/stack.yaml | 2 +- .../templates/kafka/.gitignore | 28 +++ .../templates/kafka/README.md | 169 ++++++++++++++++++ .../java-spring-boot2/templates/kafka/pom.xml | 46 +++++ .../main/java/application/KafkaConsumer.java | 22 +++ .../java/application/LivenessEndpoint.java | 31 ++++ .../kafka/src/main/java/application/Main.java | 13 ++ .../application/config/KafkaProducer.java | 31 ++++ .../resources/application-test.properties | 11 ++ .../src/main/resources/application.properties | 14 ++ .../src/main/resources/public/index.html | 23 +++ .../java/application/KafkaConsumerTest.java | 52 ++++++ .../src/test/java/application/MainTests.java | 74 ++++++++ 13 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 incubator/java-spring-boot2/templates/kafka/.gitignore create mode 100644 incubator/java-spring-boot2/templates/kafka/README.md create mode 100644 incubator/java-spring-boot2/templates/kafka/pom.xml create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/java/application/KafkaConsumer.java create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/java/application/LivenessEndpoint.java create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/java/application/Main.java create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/java/application/config/KafkaProducer.java create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/resources/application-test.properties create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/resources/application.properties create mode 100644 incubator/java-spring-boot2/templates/kafka/src/main/resources/public/index.html create mode 100644 incubator/java-spring-boot2/templates/kafka/src/test/java/application/KafkaConsumerTest.java create mode 100644 incubator/java-spring-boot2/templates/kafka/src/test/java/application/MainTests.java diff --git a/incubator/java-spring-boot2/stack.yaml b/incubator/java-spring-boot2/stack.yaml index 28fc5c4f1..274363c32 100644 --- a/incubator/java-spring-boot2/stack.yaml +++ b/incubator/java-spring-boot2/stack.yaml @@ -1,5 +1,5 @@ name: Spring Boot® -version: 0.3.28 +version: 0.3.29 description: Spring Boot using OpenJ9 and Maven license: Apache-2.0 language: java diff --git a/incubator/java-spring-boot2/templates/kafka/.gitignore b/incubator/java-spring-boot2/templates/kafka/.gitignore new file mode 100644 index 000000000..d45495184 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/.gitignore @@ -0,0 +1,28 @@ +/target/* +.appsody-spring-trigger + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +/build/ + +### VS Code ### +.vscode/ diff --git a/incubator/java-spring-boot2/templates/kafka/README.md b/incubator/java-spring-boot2/templates/kafka/README.md new file mode 100644 index 000000000..ac5c958d5 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/README.md @@ -0,0 +1,169 @@ +# Kafka Template + +The java-spring-boot2 `kafka` template provides a consistent way of developing Spring Boot applications which connect to Kafka. This template is an extension of `default` template and uses [spring-kafka](https://spring.io/projects/spring-kafka#overview) to connect to the Kafka instance running on Kubernetes managed by [Strimzi](https://strimzi.io/) Kafka operator. + +The `kafka` template provides a `pom.xml` file that references the parent POM defined by the stack, dependencies that enables the Spring boot application to connect to Kafka, simple producer that publishes a message to the Kafka topic and a simple consumer that consumes the messages published on to Kafka topic by the producer. It also provides a basic liveness endpoint, and a set of unit tests that ensure enabled actuator endpoints work properly: `/actuator/health`, `/actuator/metric`, `/actuator/prometheus` and `/actuator/liveness` + +## Getting Started + +1. Create a new folder in your local directory and initialize it using the Appsody CLI, e.g.: + +``` +mkdir my-project +cd my-project +appsody init java-spring-boot2 kafka +``` +This will initialize a Spring Boot 2 project using the kafka template. + +2. Once your project has been initialized you can then run your application using the following command: + +``` +appsody run --docker-options "--env KAFKA_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS}" +``` +E.g: +``` +appsody run --network kafka_default --docker-options "--env KAFKA_BOOTSTRAP_SERVERS=kafka:9092" +``` +`DOCKER_NETWORK_NAME` is the name of the docker network in which the kafka container is running. + +This template expects `KAFKA_BOOTSTRAP_SERVERS` environment variable to be set to addresses of the bootstrap servers of kafka. + +This launches a Docker container that will run your application in the foreground, exposing it on port 8080. You should see that the producer publishes message to the kafka topic and the consumer reads it. The application will be restarted automatically when changes are detected. + +3. You should be able to access the following endpoints, as they are exposed by your template application by default: + +* Health endpoint: http://localhost:8080/actuator/health +* Liveness endpoint: http://localhost:8080/actuator/liveness +* Metrics endpoint: http://localhost:8080/actuator/metrics +* Prometheus endpoint: http://localhost:8080/actuator/prometheus + +4. To deploy the application to Kubernetes run the following command: +``` +appsody deploy +``` +Make sure to add the `KAFKA_BOOTSTRAP_SERVERS` environment variable in the `app-deploy.yaml` before running the above command + +``` +env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: ${KAFKA_BOOTSTRAP_SERVERS} +``` + +If you are trying to connect to a Kafka instance managed by Strimzi Kafka operator, the value of `KAFKA_BOOTSTRAP_SERVERS` should be a fully qualified service hostname. + +E.g: my-cluster-kafka-bootstrap.strimzi.svc.cluster.local:9092 + +* `my-cluster` is the Kafka resource name. +* `kafka-bootstrap` is the Broker load balancer name. +* `strimzi` is the namespace in which Kafka instance is deployed. +* `9092` is the PLAINTEXT port. + +5. To deploy the application that connects to kafka managed by Strimzi operator where the brokers support TLS Client authentication + +Add the following properties to `application.properties` + +``` +spring.kafka.properties.security.protocol=ssl +spring.kafka.properties.ssl.protocol=ssl +spring.kafka.properties.ssl.truststore.location=/etc/secrets/keystores/truststore.p12 +spring.kafka.properties.ssl.truststore.password=changeit +spring.kafka.properties.ssl.truststore.type=${TRUSTSTORE_PASSWORD} +spring.kafka.properties.ssl.keystore.location=/etc/secrets/keystores/keystore.p12 +spring.kafka.properties.ssl.keystore.password=${KEYSTORE_PASSWORD} +spring.kafka.properties.ssl.keystore.type=PKCS12 +spring.kafka.properties.ssl.key.password=${KEYSTORE_PASSWORD} +spring.kafka.properties.ssl.endpoint.identification.algorithm= +``` + +`TRUSTSTORE_PASSWORD` is the password that you have used when creating the truststore. + +`KEYSTORE_PASSWORD` is the password that you have used when creating the keystore. + +Next, add the following in the `app-deploy.yaml` under `spec` section + +* Add the following volumes + +``` +volumes: +# emptyDir volume to store the keystore and truststore files so that the application container can eventually read them. +- emtpyDir: {} + name: keystore-volume +# this is the secret that is created when the kafka user is created +- name: my-user-credentials + secret: + secretName: my-user +# secret that holds CA certificate created by the operator for the brokers +- name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert +``` +* Volume mount the `keystore-volume` + +``` +volumeMounts: +- mountPath: /etc/secrets/keystores + name: keystore-volume +``` +* Add `KAFKA_BOOTSTRAP_SERVERS` environment variable. E.g.: + +``` +env: +- name: KAFKA_BOOTSTRAP_SERVERS + value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093 +``` +`9093` is the TLS port. + +* Add `initContainers` that generate the keystore and truststore which will eventually be used by the application container. + +``` +initContainers: +- args: + - -c + - echo $ca_bundle && csplit -z -f crt- $ca_bundle '/-----BEGIN CERTIFICATE-----/' + '{*}' && for file in crt-*; do keytool -import -noprompt -keystore $truststore_jks + -file $file -storepass $password -storetype PKCS12 -alias service-$file; done + command: + - /bin/bash + env: + - name: ca_bundle + value: /etc/secrets/my-cluster-cluster-ca-cert/ca.crt + - name: truststore_jks + value: /etc/secrets/keystores/truststore.p12 + - name: password + value: ${TRUSTSTORE_PASSWORD} + image: registry.access.redhat.com/redhat-sso-7/sso71-openshift:1.1-16 + name: pem-to-truststore + volumeMounts: + - mountPath: /etc/secrets/keystores + name: keystore-volume + - mountPath: /etc/secrets/my-user + name: my-user-credentials + readOnly: true + - mountPath: /etc/secrets/my-cluster-cluster-ca-cert + name: my-cluster-cluster-ca-cert + readOnly: true +- args: + - -c + - openssl pkcs12 -export -inkey $keyfile -in $crtfile -out $keystore_pkcs12 -password + pass:$password -name "name" + command: + - /bin/bash + env: + - name: keyfile + value: /etc/secrets/my-user/user.key + - name: crtfile + value: /etc/secrets/my-user/user.crt + - name: keystore_pkcs12 + value: /etc/secrets/keystores/keystore.p12 + - name: password + value: ${KEYSTORE_PASSWORD} + image: registry.access.redhat.com/redhat-sso-7/sso71-openshift:1.1-16 + name: pem-to-keystore + volumeMounts: + - mountPath: /etc/secrets/keystores + name: keystore-volume + - mountPath: /etc/secrets/my-user + name: my-user-credentials + readOnly: true +``` +** Here `my-user` is the kafka user and `my-cluster` is the kafka cluster name. diff --git a/incubator/java-spring-boot2/templates/kafka/pom.xml b/incubator/java-spring-boot2/templates/kafka/pom.xml new file mode 100644 index 000000000..092f16dd3 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + + + {{.stack.parentpomgroup}} + {{.stack.parentpomid}} + {{.stack.parentpomrange}} + + + + dev.appsody + default-kafka-application + 0.0.1-SNAPSHOT + jar + + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + org.springframework.kafka + spring-kafka-test + test + + + + + diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/java/application/KafkaConsumer.java b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/KafkaConsumer.java new file mode 100644 index 000000000..4fb5ea6a0 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/KafkaConsumer.java @@ -0,0 +1,22 @@ +package application; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CountDownLatch; + +// a simple kafka consumer +@Service +public class KafkaConsumer { + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + @KafkaListener(topics = "orders", groupId = "orders-service") + public void receiveString(String message) { + System.out.println("Receiving message = " + message); + countDownLatch.countDown(); + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } +} diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/java/application/LivenessEndpoint.java b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/LivenessEndpoint.java new file mode 100644 index 000000000..c719db763 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/LivenessEndpoint.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2019 IBM Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package application; + +import org.springframework.boot.actuate.endpoint.annotation.Endpoint; +import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; +import org.springframework.stereotype.Component; + +// Simple custom liveness check +@Endpoint(id = "liveness") +@Component +public class LivenessEndpoint { + + @ReadOperation + public String testLiveness() { + return "{\"status\":\"UP\"}"; + } + +} diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/java/application/Main.java b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/Main.java new file mode 100644 index 000000000..144bfffe7 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/Main.java @@ -0,0 +1,13 @@ +package application; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Main { + + public static void main(String[] args) { + SpringApplication.run(Main.class, args); + } + +} diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/java/application/config/KafkaProducer.java b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/config/KafkaProducer.java new file mode 100644 index 000000000..12ee915ec --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/java/application/config/KafkaProducer.java @@ -0,0 +1,31 @@ +package application.config; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; + +import application.KafkaConsumer; + +import java.util.UUID; + +@Configuration +public class KafkaProducer { + + @Autowired + KafkaTemplate kafkaTemplate; + + private static String TOPIC_NAME = "orders"; + + // a simple kafka producer that publishes a message to the "orders" topic after the application is initialized + @Bean + public CommandLineRunner kafkaCommandLineRunner(KafkaConsumer kafkaConsumer) { + return args -> { + String data = "testData:" + UUID.randomUUID(); + System.out.println("Sending message to kafka = " + data); + kafkaTemplate.send(TOPIC_NAME, data); + kafkaConsumer.getCountDownLatch().await(); + }; + } +} diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/resources/application-test.properties b/incubator/java-spring-boot2/templates/kafka/src/main/resources/application-test.properties new file mode 100644 index 000000000..30bd17e17 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/resources/application-test.properties @@ -0,0 +1,11 @@ +# spring.embedded.kafka.brokers system property is set by Embedded Kafka server to the addresses of the bootstrap servers +spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers} +spring.kafka.consumer.group-id=orders-service +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer + +# spring.embedded.kafka.brokers system property is set by Embedded Kafka server to the addresses of the bootstrap servers +spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers} +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/resources/application.properties b/incubator/java-spring-boot2/templates/kafka/src/main/resources/application.properties new file mode 100644 index 000000000..dfe205afc --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/resources/application.properties @@ -0,0 +1,14 @@ +#enable the actuator endpoints for health, metrics, and prometheus. +management.endpoints.web.exposure.include=health,metrics,prometheus,liveness +opentracing.jaeger.log-spans=false + +spring.kafka.consumer.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS} +spring.kafka.consumer.group-id=orders-service +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer + +spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS} +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.consumer.enable-auto-commit=true diff --git a/incubator/java-spring-boot2/templates/kafka/src/main/resources/public/index.html b/incubator/java-spring-boot2/templates/kafka/src/main/resources/public/index.html new file mode 100644 index 000000000..47302f250 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/main/resources/public/index.html @@ -0,0 +1,23 @@ + + + + Hello from Appsody! + + +

Hello from Appsody!

+

Next steps with Spring Boot 2: +

+

+ + diff --git a/incubator/java-spring-boot2/templates/kafka/src/test/java/application/KafkaConsumerTest.java b/incubator/java-spring-boot2/templates/kafka/src/test/java/application/KafkaConsumerTest.java new file mode 100644 index 000000000..9bbf58c61 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/test/java/application/KafkaConsumerTest.java @@ -0,0 +1,52 @@ +package application; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.TimeUnit; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.context.annotation.FilterType; + + +@EmbeddedKafka(topics = {"orders"}) +@DirtiesContext +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@ActiveProfiles("test") +@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CommandLineRunner.class)) +public class KafkaConsumerTest { + + private static String TOPIC_NAME = "orders"; + + @Autowired + KafkaTemplate kafkaTemplate; + + @Autowired + KafkaConsumer consumer; + + @ClassRule + public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, TOPIC_NAME); + + @Test + public void testReceive() throws InterruptedException { + // send the message + kafkaTemplate.send("orders", "hello"); + consumer.getCountDownLatch().await(10000, TimeUnit.MILLISECONDS); + // check that message was delivered + assertThat(consumer.getCountDownLatch().getCount()).isEqualTo(0); + } + +} diff --git a/incubator/java-spring-boot2/templates/kafka/src/test/java/application/MainTests.java b/incubator/java-spring-boot2/templates/kafka/src/test/java/application/MainTests.java new file mode 100644 index 000000000..463546620 --- /dev/null +++ b/incubator/java-spring-boot2/templates/kafka/src/test/java/application/MainTests.java @@ -0,0 +1,74 @@ +package application; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; +import java.util.Map; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + + +@EmbeddedKafka(topics = {"orders"}) +@DirtiesContext +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@ActiveProfiles("test") +public class MainTests { + + @Autowired + private TestRestTemplate restTemplate; + + private static String TOPIC_NAME = "orders"; + + @ClassRule + public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC_NAME); + + @Test + public void testHealthEndpoint() { + ResponseEntity entity = this.restTemplate.getForEntity("/actuator/health", String.class); + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(entity.getBody()).contains("\"status\":\"UP\""); + } + + @Test + public void testLivenessEndpoint() { + ResponseEntity entity = this.restTemplate.getForEntity("/actuator/liveness", String.class); + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(entity.getBody()).contains("\"status\":\"UP\""); + } + + @Test + @SuppressWarnings("unchecked") + public void testMetricsEndpoint() { + testLivenessEndpoint(); // access a page + + @SuppressWarnings("rawtypes") + ResponseEntity entity = this.restTemplate.getForEntity("/actuator/metrics", Map.class); + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + + Map body = entity.getBody(); + assertThat(body).containsKey("names"); + assertThat((List) body.get("names")).contains("jvm.buffer.count"); + } + + @Test + public void testPrometheusEndpoint() { + testLivenessEndpoint(); // access a page + + ResponseEntity entity = this.restTemplate.getForEntity("/actuator/prometheus", String.class); + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(entity.getBody()).contains("# TYPE jvm_buffer_count_buffers gauge"); + } +}