-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2353: Replace kafka-unit by kafka_2.11 and kafka-clients to test kafka-clients:0.10.1.1 #1935
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good replacement. Only have a few nitpicks.
|
||
public void createProducer() { | ||
Properties producerProps = new Properties(); | ||
producerProps.setProperty("bootstrap.servers", KAFKA_HOST + ":" + KAFKA_PORT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Could use constants from https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
producer = new KafkaProducer<>(producerProps); | ||
} | ||
|
||
public void createProducer(Class keySerializer, Class valueSerializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Wouldn't hurt if these parameters were serializable if someone wants to use KafkaUnit for a localcluster test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I misunderstood the code. You can put it back if you want. You might want to change the signature to createProducer(Serializer, Serializer)
and use this constructor though https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#KafkaProducer(java.util.Properties,%20org.apache.kafka.common.serialization.Serializer,%20org.apache.kafka.common.serialization.Serializer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I did not know the Serializer interface. I will add it.
} | ||
|
||
public void sendMessage(ProducerRecord producerRecord) { | ||
producer.send(producerRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is asynchronous. We should either return the future or wait for it here, so the test code doesn't proceed before the message is actually sent.
@@ -54,18 +54,20 @@ public SpoutContext(KafkaSpout<String, String> spout, | |||
void populateTopicData(String topicName, int msgCount) { | |||
kafkaUnitRule.getKafkaUnit().createTopic(topicName); | |||
|
|||
kafkaUnitRule.getKafkaUnit().createProducer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nicer if the producer were created/closed in setup/teardown for the Rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah cool, the code is cleaner.
|
||
public void sendMessage(ProducerRecord producerRecord) { | ||
try { | ||
producer.send(producerRecord).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a timeout of some sort
try { | ||
producer.send(producerRecord).get(); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't catch these. Throw Exception from sendMessage, then tests using this can also throw Exception so the test fails if this happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad!
LGTM, good work :) |
Thank You for your patience @srdo |
@srdo should I resolve conflicts by merge? |
@lukess Yes, just do a |
May I know the next step for review? :) My Kafka server is on 0.10.1, and I'm releasing storm-kafka-client with kafka_client 0.10.1 from my branch. I cannot release on origin master/1.x branch without skip tests. Thanks! |
I'm not sure what you mean about not being able to release on master/1.x. The next step is that someone with write access needs to review. @HeartSaVioR do you have time to take a look? |
@srdo the issue is if I clone a storm repo from github, and follow doc would fail if ` shouldContinueWithSlowDoubleAcks(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 1.651 sec <<< ERROR! shouldEmitAllMessages(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 2.465 sec <<< ERROR! shouldReplayFirstTupleFailedOutOfOrder(org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest) Time elapsed: 1.648 sec <<< ERROR! Results : Tests in error: Tests run: 12, Failures: 0, Errors: 4, Skipped: It only works if I know the doc does say storm-kafka-client module only supports 0.10 or newer kafka client API not storm-kafka-client module only supports 0.10 or newer kafka client library. Just not that clear to me. :) |
@lukess Okay, I get it. Tests work on this branch, but not on master when run with Kafka 0.10.1.x. It seems like it might be an issue with kafka-unit not being updated for 0.10.1.x. chbatey/kafka-unit#43. Since kafka-unit is being replaced in this PR anyway, we can ignore that issue. |
@srdo exactly! I'm testing the 0.10.1.1, and found some issues like STORM-2361, STORM-2343, and KAFKA-4405... already. I think bump kafka client version is very important for high level API. |
2361 is for the older Kafka spout (storm-kafka instead of storm-kafka-client), and 2343 is a bug in the spout code, not the Kafka client. I agree that we should get this branch merged though :) |
Yes, because of 2343 I'm still using storm-kafka in production to communicate with Kafka 0.10.1.1 (so I got 2361 issue too). After 2343 fixed I have to verify KAFKA-4405 too before bump Kafka 0.10.2.0. |
import org.junit.Before; | ||
import org.mockito.Captor; | ||
import org.mockito.MockitoAnnotations; | ||
import static org.mockito.Mockito.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using wildcard import.
@lukess It looks good. Could you address minor nits and also squash commits into one? |
@HeartSaVioR thanks! I'm studying how to squash into one because of a merge commit. |
@lukess |
@HeartSaVioR Yes, thanks. I'm going through conflicts and don't know where they come from. |
+1 |
@lukess |
…t kafka-clients:0.10.1.1
@HeartSaVioR I think the conflict because of merge so I reset last two commits and rebase from last apache:master, and then squash. I think this should be clean. |
Great! +1 |
This pr is for testing kafka-clients:0.10.1.1 on storm-kafka-client. I replace the kafka-unit by kafka_2.11 and kafka-clients, and add TestRule as suggested by @srdo.