Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2353: Replace kafka-unit by kafka_2.11 and kafka-clients to test kafka-clients:0.10.1.1 #1935

Merged
merged 1 commit into from
Mar 9, 2017

Conversation

lukess
Copy link
Contributor

@lukess lukess commented Feb 10, 2017

This pr is for testing kafka-clients:0.10.1.1 on storm-kafka-client. I replace the kafka-unit by kafka_2.11 and kafka-clients, and add TestRule as suggested by @srdo.

Copy link
Contributor

@srdo srdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good replacement. Only have a few nitpicks.


public void createProducer() {
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", KAFKA_HOST + ":" + KAFKA_PORT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

producer = new KafkaProducer<>(producerProps);
}

public void createProducer(Class keySerializer, Class valueSerializer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Wouldn't hurt if these parameters were serializable if someone wants to use KafkaUnit for a localcluster test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood the code. You can put it back if you want. You might want to change the signature to createProducer(Serializer, Serializer) and use this constructor though https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#KafkaProducer(java.util.Properties,%20org.apache.kafka.common.serialization.Serializer,%20org.apache.kafka.common.serialization.Serializer)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I did not know the Serializer interface. I will add it.

}

public void sendMessage(ProducerRecord producerRecord) {
producer.send(producerRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asynchronous. We should either return the future or wait for it here, so the test code doesn't proceed before the message is actually sent.

@@ -54,18 +54,20 @@ public SpoutContext(KafkaSpout<String, String> spout,
void populateTopicData(String topicName, int msgCount) {
kafkaUnitRule.getKafkaUnit().createTopic(topicName);

kafkaUnitRule.getKafkaUnit().createProducer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nicer if the producer were created/closed in setup/teardown for the Rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah cool, the code is cleaner.


public void sendMessage(ProducerRecord producerRecord) {
try {
producer.send(producerRecord).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a timeout of some sort

try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't catch these. Throw Exception from sendMessage, then tests using this can also throw Exception so the test fails if this happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad!

@srdo
Copy link
Contributor

srdo commented Feb 10, 2017

LGTM, good work :)

@lukess
Copy link
Contributor Author

lukess commented Feb 10, 2017

Thank You for your patience @srdo

@lukess
Copy link
Contributor Author

lukess commented Feb 15, 2017

@srdo should I resolve conflicts by merge?

@srdo
Copy link
Contributor

srdo commented Feb 16, 2017

@lukess Yes, just do a git pull. Before this gets merged, the commits should be squashed down to one commit (do something like this http://stackoverflow.com/a/5201642).

@lukess
Copy link
Contributor Author

lukess commented Feb 17, 2017

May I know the next step for review? :) My Kafka server is on 0.10.1, and I'm releasing storm-kafka-client with kafka_client 0.10.1 from my branch. I cannot release on origin master/1.x branch without skip tests. Thanks!

@srdo
Copy link
Contributor

srdo commented Feb 17, 2017

I'm not sure what you mean about not being able to release on master/1.x.

The next step is that someone with write access needs to review. @HeartSaVioR do you have time to take a look?

@lukess
Copy link
Contributor Author

lukess commented Feb 17, 2017

@srdo the issue is if I clone a storm repo from github, and follow doc would fail if
mvn clean package -Dstorm.kafka.client.version=0.10.1.0
mvn clean package -Dstorm.kafka.client.version=0.10.1.1

`
Running org.apache.storm.kafka.bolt.KafkaBoltTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.175 sec - in org.apache.storm.kafka.bolt.KafkaBoltTest
Running org.apache.storm.kafka.spout.KafkaSpoutRebalanceTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.055 sec - in org.apache.storm.kafka.spout.KafkaSpoutRebalanceTest
Running org.apache.storm.kafka.spout.KafkaSpoutConfigTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0 sec - in org.apache.storm.kafka.spout.KafkaSpoutConfigTest
Running org.apache.storm.kafka.spout.ByTopicRecordTranslatorTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 sec - in org.apache.storm.kafka.spout.ByTopicRecordTranslatorTest
Running org.apache.storm.kafka.spout.DefaultRecordTranslatorTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 sec - in org.apache.storm.kafka.spout.DefaultRecordTranslatorTest
Running org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest
Tests run: 4, Failures: 0, Errors: 4, Skipped: 0, Time elapsed: 7.591 sec <<< FAILURE! - in org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest
shouldReplayInOrderFailedMessages(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 1.826 sec <<< ERROR!
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at info.batey.kafka.unit.KafkaUnit.sendMessages(KafkaUnit.java:263)
at org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest.lambda$populateTopicData$0(SingleTopicKafkaSpoutTest.java:102)

shouldContinueWithSlowDoubleAcks(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 1.651 sec <<< ERROR!
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at info.batey.kafka.unit.KafkaUnit.sendMessages(KafkaUnit.java:263)
at org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest.lambda$populateTopicData$0(SingleTopicKafkaSpoutTest.java:102)

shouldEmitAllMessages(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 2.465 sec <<< ERROR!
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at info.batey.kafka.unit.KafkaUnit.sendMessages(KafkaUnit.java:263)
at org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest.lambda$populateTopicData$0(SingleTopicKafkaSpoutTest.java:102)

shouldReplayFirstTupleFailedOutOfOrder(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 1.648 sec <<< ERROR!
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at info.batey.kafka.unit.KafkaUnit.sendMessages(KafkaUnit.java:263)
at org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest.lambda$populateTopicData$0(SingleTopicKafkaSpoutTest.java:102)

Results :

Tests in error:
SingleTopicKafkaSpoutTest.shouldContinueWithSlowDoubleAcks:129->initializeSpout:107->populateTopicData:97->lambda$populateTopicData$0:102 » FailedToSendMessage
SingleTopicKafkaSpoutTest.shouldEmitAllMessages:168->initializeSpout:107->populateTopicData:97->lambda$populateTopicData$0:102 » FailedToSendMessage
SingleTopicKafkaSpoutTest.shouldReplayFirstTupleFailedOutOfOrder:237->initializeSpout:107->populateTopicData:97->lambda$populateTopicData$0:102 » FailedToSendMessage
SingleTopicKafkaSpoutTest.shouldReplayInOrderFailedMessages:196->initializeSpout:107->populateTopicData:97->lambda$populateTopicData$0:102 » FailedToSendMessage

Tests run: 12, Failures: 0, Errors: 4, Skipped:
`

It only works if
mvn clean package -Dstorm.kafka.client.version=0.10.0.0
mvn clean package -Dstorm.kafka.client.version=0.10.0.1

I know the doc does say storm-kafka-client module only supports 0.10 or newer kafka client API not storm-kafka-client module only supports 0.10 or newer kafka client library. Just not that clear to me. :)

@srdo
Copy link
Contributor

srdo commented Feb 19, 2017

@lukess Okay, I get it. Tests work on this branch, but not on master when run with Kafka 0.10.1.x. It seems like it might be an issue with kafka-unit not being updated for 0.10.1.x. chbatey/kafka-unit#43. Since kafka-unit is being replaced in this PR anyway, we can ignore that issue.

@lukess
Copy link
Contributor Author

lukess commented Feb 21, 2017

@srdo exactly! I'm testing the 0.10.1.1, and found some issues like STORM-2361, STORM-2343, and KAFKA-4405... already. I think bump kafka client version is very important for high level API.

@srdo
Copy link
Contributor

srdo commented Feb 21, 2017

2361 is for the older Kafka spout (storm-kafka instead of storm-kafka-client), and 2343 is a bug in the spout code, not the Kafka client. I agree that we should get this branch merged though :)

@lukess
Copy link
Contributor Author

lukess commented Feb 21, 2017

Yes, because of 2343 I'm still using storm-kafka in production to communicate with Kafka 0.10.1.1 (so I got 2361 issue too). After 2343 fixed I have to verify KAFKA-4405 too before bump Kafka 0.10.2.0.

import org.junit.Before;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import static org.mockito.Mockito.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid using wildcard import.

@HeartSaVioR
Copy link
Contributor

@lukess It looks good. Could you address minor nits and also squash commits into one?

@lukess
Copy link
Contributor Author

lukess commented Mar 9, 2017

@HeartSaVioR thanks! I'm studying how to squash into one because of a merge commit.

@HeartSaVioR
Copy link
Contributor

@lukess
If you have hard time to squash I can do it while merging. Please let me know if you want to.

@lukess
Copy link
Contributor Author

lukess commented Mar 9, 2017

@HeartSaVioR Yes, thanks. I'm going through conflicts and don't know where they come from.

@HeartSaVioR
Copy link
Contributor

+1

@HeartSaVioR
Copy link
Contributor

@lukess
There seemed to have bad conflicts here. Might be better to create new branch and rework based on current master branch, if you really don't mind. Please mention me then I'll give +1 to new PR.

@lukess
Copy link
Contributor Author

lukess commented Mar 9, 2017

@HeartSaVioR I think the conflict because of merge so I reset last two commits and rebase from last apache:master, and then squash. I think this should be clean.

@HeartSaVioR
Copy link
Contributor

Great! +1

@asfgit asfgit merged commit 9473846 into apache:master Mar 9, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants