diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 0fdb64d85e9..2ff98c12c71 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -88,15 +88,41 @@
test
- info.batey.kafka
- kafka-unit
- 0.6
+ org.slf4j
+ log4j-over-slf4j
test
- org.slf4j
- log4j-over-slf4j
+ org.apache.kafka
+ kafka_2.11
+ ${storm.kafka.client.version}
+ test
test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${storm.kafka.client.version}
+ test
+ test
+
+
+ org.apache.kafka
+ kafka_2.11
+ ${storm.kafka.client.version}
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
new file mode 100644
index 00000000000..4bf1cea04ac
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+public class KafkaUnit {
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private ZkUtils zkUtils;
+ private KafkaProducer producer;
+ private static final String ZK_HOST = "127.0.0.1";
+ private static final String KAFKA_HOST = "127.0.0.1";
+ private static final int KAFKA_PORT = 9092;
+
+ public KafkaUnit() {
+ }
+
+ public void setUp() throws IOException {
+ // setup ZK
+ zkServer = new EmbeddedZookeeper();
+ String zkConnect = ZK_HOST + ":" + zkServer.port();
+ ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // setup default Producer
+ createProducer();
+ }
+
+ public void tearDown() {
+ closeProducer();
+ kafkaServer.shutdown();
+ zkUtils.close();
+ zkServer.shutdown();
+ }
+
+ public void createTopic(String topicName) {
+ AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+
+ public int getKafkaPort() {
+ return KAFKA_PORT;
+ }
+
+ private void createProducer() {
+ Properties producerProps = new Properties();
+ producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producer = new KafkaProducer<>(producerProps);
+ }
+
+ public void createProducer(Serializer keySerializer, Serializer valueSerializer) {
+ Properties producerProps = new Properties();
+ producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer);
+ }
+
+ public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
+ producer.send(producerRecord).get(10, TimeUnit.SECONDS);
+ }
+
+ private void closeProducer() {
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
new file mode 100644
index 00000000000..6e90c9df7c4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.rules.ExternalResource;
+
+import java.io.IOException;
+
+
+public class KafkaUnitRule extends ExternalResource {
+
+ private final KafkaUnit kafkaUnit;
+
+ public KafkaUnitRule() {
+ this.kafkaUnit = new KafkaUnit();
+ }
+
+ @Override
+ public void before() throws IOException {
+ kafkaUnit.setUp();
+ }
+
+ @Override
+ public void after() {
+ kafkaUnit.tearDown();
+ }
+
+ public KafkaUnit getKafkaUnit() {
+ return this.kafkaUnit;
+ }
+}
\ No newline at end of file
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index fdc97347b60..58221257339 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -19,9 +19,9 @@
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
-import info.batey.kafka.unit.KafkaUnitRule;
-import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -31,6 +31,8 @@
import org.mockito.ArgumentCaptor;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
@@ -54,6 +56,7 @@
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
+
public class SingleTopicKafkaSpoutTest {
private class SpoutContext {
@@ -85,25 +88,24 @@ public SpoutContext(KafkaSpout spout,
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs);
+ KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), commitOffsetPeriodMs);
this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
}
- private void populateTopicData(String topicName, int msgCount) {
+ void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
kafkaUnitRule.getKafkaUnit().createTopic(topicName);
- IntStream.range(0, msgCount).forEach(value -> {
- KeyedMessage keyedMessage = new KeyedMessage<>(
- topicName, Integer.toString(value),
- Integer.toString(value));
-
- kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
- });
+ for (int i = 0; i < msgCount; i++) {
+ ProducerRecord producerRecord = new ProducerRecord<>(
+ topicName, Integer.toString(i),
+ Integer.toString(i));
+ kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
+ }
}
- private void initializeSpout(int msgCount) {
+ private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
spout.open(conf, topologyContext, collector);
spout.activate();