diff --git a/README.md b/README.md index c78c91c..23df13b 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,27 @@ class MySpec extends WordSpec with EmbeddedKafka { } ``` +If you want to run ZooKeeper and Kafka on arbitrary available ports, you can +use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more +reliable, especially when running tests in parallel or on machines where other +tests or services may be running with port numbers you can't control. + +```scala +class MySpec extends WordSpec with EmbeddedKafka { + +"runs with embedded kafka on arbitrary available ports" should { + + val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + + withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => + // now a kafka broker is listening on actualConfig.kafkaPort + publishStringMessageToKafka("topic", "message") + consumeFirstStringMessageFrom("topic") shouldBe "message" + } + +} +``` + The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties ```scala @@ -102,7 +123,7 @@ class MySpec extends WordSpec with EmbeddedKafka { } ``` -This works for both `withRunningKafka` and `EmbeddedKafka.start()` +This works for `withRunningKafka`, `withRunningKafkaOnFoundPort`, and `EmbeddedKafka.start()` Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a `customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`. diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index 46bfaee..ba5e1e6 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -129,26 +129,64 @@ sealed trait EmbeddedKafkaSupport { * @param body the function to execute * @param config an implicit [[EmbeddedKafkaConfig]] */ - def withRunningKafka(body: => Any)( - implicit config: EmbeddedKafkaConfig): Any = { - - def cleanLogs(directories: Directory*): Unit = { - directories.foreach(_.deleteRecursively()) + def withRunningKafka[T](body: => T)( + implicit config: EmbeddedKafkaConfig): T = { + withRunningZooKeeper(config.zooKeeperPort) { zkPort => + withTempDir("kafka") { kafkaLogsDir => + val broker = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir) + try { + body + } finally { + broker.shutdown() + broker.awaitShutdown() + } + } } + } - val zkLogsDir = Directory.makeTemp("zookeeper-logs") - val kafkaLogsDir = Directory.makeTemp("kafka") + /** + * Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter. + * The actual ZooKeeper and Kafka ports will be detected and inserted into a copied version of + * the EmbeddedKafkaConfig that gets passed to body. This is useful if you set either or both + * port to 0, which will listen on an arbitrary available port. + * + * @param config the user-defined [[EmbeddedKafkaConfig]] + * @param body the function to execute, given an [[EmbeddedKafkaConfig]] with the actual + * ports Kafka and ZooKeeper are running on + */ + def withRunningKafkaOnFoundPort[T](config: EmbeddedKafkaConfig)(body: EmbeddedKafkaConfig => T): T = { + withRunningZooKeeper(config.zooKeeperPort) { zkPort => + withTempDir("kafka") { kafkaLogsDir => + val broker: KafkaServer = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir) + val kafkaPort = broker.boundPort(broker.config.listeners.head.listenerName) + val actualConfig = config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort) + try { + body(actualConfig) + } finally { + broker.shutdown() + broker.awaitShutdown() + } + } + } + } - val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir) - val broker = startKafka(config, kafkaLogsDir) + private def withRunningZooKeeper[T](port: Int)(body: Int => T): T = { + withTempDir("zookeeper-logs") { zkLogsDir => + val factory = startZooKeeper(port, zkLogsDir) + try { + body(factory.getLocalPort) + } finally { + factory.shutdown() + } + } + } + private def withTempDir[T](prefix: String)(body: Directory => T): T = { + val dir = Directory.makeTemp(prefix) try { - body + body(dir) } finally { - broker.shutdown() - broker.awaitShutdown() - factory.shutdown() - cleanLogs(zkLogsDir, kafkaLogsDir) + dir.deleteRecursively() } } diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala new file mode 100644 index 0000000..3b842df --- /dev/null +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala @@ -0,0 +1,59 @@ +package net.manub.embeddedkafka + +class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec + extends EmbeddedKafkaSpecSupport + with EmbeddedKafka { + + "the withRunningKafkaOnFoundPort method" should { + "start and stop Kafka and Zookeeper successfully on non-zero ports" in { + val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 12346) + val actualConfig = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig => + actualConfig shouldBe userDefinedConfig + bothKafkaAndZkAreAvailable(actualConfig) + actualConfig + } + bothKafkaAndZkAreNotAvailable(actualConfig) + } + + "start and stop multiple Kafka and Zookeeper successfully on arbitrary available ports" in { + val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + val actualConfig1 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig1 => + bothKafkaAndZkAreAvailable(actualConfig1) + publishStringMessageToKafka("topic", "message1")(actualConfig1) + consumeFirstStringMessageFrom("topic")(actualConfig1) shouldBe "message1" + val actualConfig2 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig2 => + bothKafkaAndZkAreAvailable(actualConfig2) + publishStringMessageToKafka("topic", "message2")(actualConfig2) + consumeFirstStringMessageFrom("topic")(actualConfig2) shouldBe "message2" + val allConfigs = Seq(userDefinedConfig, actualConfig1, actualConfig2) + // Confirm both actual configs are running on separate non-zero ports, but otherwise equal + allConfigs.map(_.kafkaPort).distinct should have size 3 + allConfigs.map(_.zooKeeperPort).distinct should have size 3 + allConfigs.map(_.copy(kafkaPort = 0, zooKeeperPort = 0)).distinct should have size 1 + actualConfig2 + } + bothKafkaAndZkAreNotAvailable(actualConfig2) + actualConfig1 + } + bothKafkaAndZkAreNotAvailable(actualConfig1) + } + + "work with a simple example using implicits" in { + val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => + publishStringMessageToKafka("topic", "message") + consumeFirstStringMessageFrom("topic") shouldBe "message" + } + } + } + + private def bothKafkaAndZkAreAvailable(config: EmbeddedKafkaConfig): Unit = { + kafkaIsAvailable(config.kafkaPort) + zookeeperIsAvailable(config.zooKeeperPort) + } + + private def bothKafkaAndZkAreNotAvailable(config: EmbeddedKafkaConfig): Unit = { + kafkaIsNotAvailable(config.kafkaPort) + zookeeperIsNotAvailable(config.zooKeeperPort) + } +} diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala index 9ba35ec..b194cc2 100644 --- a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala @@ -60,15 +60,15 @@ abstract class EmbeddedKafkaSpecSupport expectMsg(1 second, ConnectionSuccessful) } - def kafkaIsNotAvailable(): Unit = { + def kafkaIsNotAvailable(kafkaPort: Int = 6001): Unit = { system.actorOf( - TcpClient.props(new InetSocketAddress("localhost", 6001), testActor)) + TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor)) expectMsg(1 second, ConnectionFailed) } - def zookeeperIsNotAvailable(): Unit = { + def zookeeperIsNotAvailable(zookeeperPort: Int = 6000): Unit = { system.actorOf( - TcpClient.props(new InetSocketAddress("localhost", 6000), testActor)) + TcpClient.props(new InetSocketAddress("localhost", zookeeperPort), testActor)) expectMsg(1 second, ConnectionFailed) } }