Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTable producing empty Streams with runStreams #89

Open
jacobBaumbach opened this issue Sep 11, 2017 · 4 comments
Open

KTable producing empty Streams with runStreams #89

jacobBaumbach opened this issue Sep 11, 2017 · 4 comments

Comments

@jacobBaumbach
Copy link

Hello, I am using runStreams in my tests to see whether my KTable is accurately tracking a user session. I am printing the results of my KTable and I am getting the expected output, but the consumer is returning an empty stream. Are KTables currently not supported, am I setting up my tests incorrectly or something else? Thanks a lot for your help!

@jiminhsieh
Copy link
Contributor

@jacobBaumbach I noticed you didn't materialize the data stream at your code. If you add data.to(stringSerdes, stringSerdes, outputTopic), you should get non-empty Stream.

@tiboun
Copy link

tiboun commented Aug 31, 2018

@jiminhsieh @manub

I have the same issue with the following dependencies:

val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % Test
val embeddedKafkaStreams = "net.manub" %% "scalatest-embedded-kafka-streams" % "1.0.0" % Test

with kafka version 1.0.0

I reproduced this issue with the following test:

package net.manub.embeddedkafka.streams

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {

  implicit val config =
    EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)

  val (inTopic, outTopic) = ("in", "out")

  val stringSerde: Serde[String] = Serdes.String()

  "A Kafka streams test" should {
    "be easy to run with streams and consumer lifecycle management" in {
      val streamBuilder = new StreamsBuilderS
      val stream: KStreamS[String, String] =
        streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))

      stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
      val materializer =
        Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
      streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))

      myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
        publishToKafka(inTopic, "hello", "world")
        publishToKafka(inTopic, "foo", "bar")
        publishToKafka(inTopic, "baz", "yaz")
        val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
        value.get("hello") should be("world")
      }
    }

    /**
      * Copy paste from runStreams to expose KafkaStreams
      */
    def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
        block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
      withRunningKafka {
        topicsToCreate.foreach(topic => createCustomTopic(topic))
        val streamId = UUIDs.newUuid().toString
        val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
        streams.start()
        try {
          block(streams)
        } finally {
          streams.close()
        }
      }(config)
  }
}

By the way, I have 2 questions :

  • Is the KafkaStream built from runStreams not exposed on purpose ?
  • Can we have runStreamsOnFoundPort ? runStreams currently only use withRunningKafka

@tiboun
Copy link

tiboun commented Aug 31, 2018

If I add consumeFirstMessageFrom[String](outTopic) before getting the value of hello, then it works.

package net.manub.embeddedkafka.streams

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {

  implicit val config =
    EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)

  val (inTopic, outTopic) = ("in", "out")

  val stringSerde: Serde[String] = Serdes.String()

  "A Kafka streams test" should {
    "be easy to run with streams and consumer lifecycle management" in {
      val streamBuilder = new StreamsBuilderS
      val stream: KStreamS[String, String] =
        streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))

      stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
      val materializer =
        Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
      streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))

      myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
        val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
        publishToKafka(inTopic, "hello", "world")
        publishToKafka(inTopic, "foo", "bar")
        publishToKafka(inTopic, "baz", "yaz")
        println(consumeFirstMessageFrom[String](outTopic))
        value.get("hello") should be("world")
        import scala.collection.JavaConverters._
        value.all().asScala.foreach(println)
      }
    }

    /**
      * Code comming from runStream
      * @param topicsToCreate
      * @param topology
      * @param extraConfig
      * @param block
      * @param config
      * @return
      */
    def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
        block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
      withRunningKafka {
        topicsToCreate.foreach(topic => createCustomTopic(topic))
        val streamId = UUIDs.newUuid().toString
        val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
        streams.start()
        try {
          block(streams)
        } finally {
          streams.close()
        }
      }(config)
  }
}

@tiboun
Copy link

tiboun commented Aug 31, 2018

We finally decided to use Eventually.eventually in scalatest. This resolve the empty stream issue. Maybe you have an alternative to that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants