-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KTable producing empty Streams with runStreams #89
Comments
@jacobBaumbach I noticed you didn't materialize the data stream at your code. If you add |
I have the same issue with the following dependencies: val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % Test
val embeddedKafkaStreams = "net.manub" %% "scalatest-embedded-kafka-streams" % "1.0.0" % Test with kafka version 1.0.0 I reproduced this issue with the following test: package net.manub.embeddedkafka.streams
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}
class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
implicit val config =
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)
val (inTopic, outTopic) = ("in", "out")
val stringSerde: Serde[String] = Serdes.String()
"A Kafka streams test" should {
"be easy to run with streams and consumer lifecycle management" in {
val streamBuilder = new StreamsBuilderS
val stream: KStreamS[String, String] =
streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))
stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
val materializer =
Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))
myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
publishToKafka(inTopic, "baz", "yaz")
val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
value.get("hello") should be("world")
}
}
/**
* Copy paste from runStreams to expose KafkaStreams
*/
def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
streams.start()
try {
block(streams)
} finally {
streams.close()
}
}(config)
}
} By the way, I have 2 questions :
|
If I add package net.manub.embeddedkafka.streams
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}
class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
implicit val config =
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)
val (inTopic, outTopic) = ("in", "out")
val stringSerde: Serde[String] = Serdes.String()
"A Kafka streams test" should {
"be easy to run with streams and consumer lifecycle management" in {
val streamBuilder = new StreamsBuilderS
val stream: KStreamS[String, String] =
streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))
stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
val materializer =
Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))
myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
publishToKafka(inTopic, "baz", "yaz")
println(consumeFirstMessageFrom[String](outTopic))
value.get("hello") should be("world")
import scala.collection.JavaConverters._
value.all().asScala.foreach(println)
}
}
/**
* Code comming from runStream
* @param topicsToCreate
* @param topology
* @param extraConfig
* @param block
* @param config
* @return
*/
def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
streams.start()
try {
block(streams)
} finally {
streams.close()
}
}(config)
}
} |
We finally decided to use Eventually.eventually in scalatest. This resolve the empty stream issue. Maybe you have an alternative to that. |
Hello, I am using
runStreams
in my tests to see whether my KTable is accurately tracking a user session. I am printing the results of my KTable and I am getting the expected output, but the consumer is returning an empty stream. Are KTables currently not supported, am I setting up my tests incorrectly or something else? Thanks a lot for your help!The text was updated successfully, but these errors were encountered: