From 94738460ca3f498bdac2b610af350692cfb5a525 Mon Sep 17 00:00:00 2001 From: lukess Date: Thu, 9 Feb 2017 16:35:01 -0800 Subject: [PATCH] STORM-2353: Replace kafka-unit by kafka_2.11 and kafka-clients to test kafka-clients:0.10.1.1 --- external/storm-kafka-client/pom.xml | 36 +++++- .../org/apache/storm/kafka/KafkaUnit.java | 111 ++++++++++++++++++ .../org/apache/storm/kafka/KafkaUnitRule.java | 46 ++++++++ .../spout/SingleTopicKafkaSpoutTest.java | 26 ++-- 4 files changed, 202 insertions(+), 17 deletions(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 0fdb64d85e9..2ff98c12c71 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -88,15 +88,41 @@ test - info.batey.kafka - kafka-unit - 0.6 + org.slf4j + log4j-over-slf4j test - org.slf4j - log4j-over-slf4j + org.apache.kafka + kafka_2.11 + ${storm.kafka.client.version} + test test + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.kafka + kafka-clients + ${storm.kafka.client.version} + test + test + + + org.apache.kafka + kafka_2.11 + ${storm.kafka.client.version} + test + + + org.slf4j + slf4j-log4j12 + + diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java new file mode 100644 index 00000000000..4bf1cea04ac --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.*; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class KafkaUnit { + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkUtils zkUtils; + private KafkaProducer producer; + private static final String ZK_HOST = "127.0.0.1"; + private static final String KAFKA_HOST = "127.0.0.1"; + private static final int KAFKA_PORT = 9092; + + public KafkaUnit() { + } + + public void setUp() throws IOException { + // setup ZK + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZK_HOST + ":" + zkServer.port(); + ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT)); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // setup default Producer + createProducer(); + } + + public void tearDown() { + closeProducer(); + kafkaServer.shutdown(); + zkUtils.close(); + zkServer.shutdown(); + } + + public void createTopic(String topicName) { + AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + public int getKafkaPort() { + return KAFKA_PORT; + } + + private void createProducer() { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(producerProps); + } + + public void createProducer(Serializer keySerializer, Serializer valueSerializer) { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer); + } + + public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException { + producer.send(producerRecord).get(10, TimeUnit.SECONDS); + } + + private void closeProducer() { + producer.close(); + } +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java new file mode 100644 index 00000000000..6e90c9df7c4 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.junit.rules.ExternalResource; + +import java.io.IOException; + + +public class KafkaUnitRule extends ExternalResource { + + private final KafkaUnit kafkaUnit; + + public KafkaUnitRule() { + this.kafkaUnit = new KafkaUnit(); + } + + @Override + public void before() throws IOException { + kafkaUnit.setUp(); + } + + @Override + public void after() { + kafkaUnit.tearDown(); + } + + public KafkaUnit getKafkaUnit() { + return this.kafkaUnit; + } +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index fdc97347b60..58221257339 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -19,9 +19,9 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; -import info.batey.kafka.unit.KafkaUnitRule; -import kafka.producer.KeyedMessage; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.kafka.KafkaUnitRule; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -31,6 +31,8 @@ import org.mockito.ArgumentCaptor; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; @@ -54,6 +56,7 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; + public class SingleTopicKafkaSpoutTest { private class SpoutContext { @@ -85,25 +88,24 @@ public SpoutContext(KafkaSpout spout, @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs); + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), commitOffsetPeriodMs); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); } - private void populateTopicData(String topicName, int msgCount) { + void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException { kafkaUnitRule.getKafkaUnit().createTopic(topicName); - IntStream.range(0, msgCount).forEach(value -> { - KeyedMessage keyedMessage = new KeyedMessage<>( - topicName, Integer.toString(value), - Integer.toString(value)); - - kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); - }); + for (int i = 0; i < msgCount; i++) { + ProducerRecord producerRecord = new ProducerRecord<>( + topicName, Integer.toString(i), + Integer.toString(i)); + kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord); + } } - private void initializeSpout(int msgCount) { + private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); spout.open(conf, topologyContext, collector); spout.activate();