Skip to content

Commit

Permalink
Implement withRunningKafkaOnFoundPort (#76)
Browse files Browse the repository at this point in the history
* Make withRunningKafka generic

* Factor out withTempDir helper method

* Factor out withRunningZooKeeper helper method

* Implement withRunningKafkaOnFoundPort

This is useful for writing tests that configure kafka/zookeeper to
listen on port 0, i.e. listen on an arbitrary available port.
  • Loading branch information
mikemintz authored and manub committed Jun 27, 2017
1 parent 09d50c5 commit 53e88e4
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 19 deletions.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ class MySpec extends WordSpec with EmbeddedKafka {
}
```

If you want to run ZooKeeper and Kafka on arbitrary available ports, you can
use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more
reliable, especially when running tests in parallel or on machines where other
tests or services may be running with port numbers you can't control.

```scala
class MySpec extends WordSpec with EmbeddedKafka {

"runs with embedded kafka on arbitrary available ports" should {

val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
// now a kafka broker is listening on actualConfig.kafkaPort
publishStringMessageToKafka("topic", "message")
consumeFirstStringMessageFrom("topic") shouldBe "message"
}

}
```

The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties

```scala
Expand All @@ -102,7 +123,7 @@ class MySpec extends WordSpec with EmbeddedKafka {
}
```

This works for both `withRunningKafka` and `EmbeddedKafka.start()`
This works for `withRunningKafka`, `withRunningKafkaOnFoundPort`, and `EmbeddedKafka.start()`

Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a
`customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,64 @@ sealed trait EmbeddedKafkaSupport {
* @param body the function to execute
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def withRunningKafka(body: => Any)(
implicit config: EmbeddedKafkaConfig): Any = {

def cleanLogs(directories: Directory*): Unit = {
directories.foreach(_.deleteRecursively())
def withRunningKafka[T](body: => T)(
implicit config: EmbeddedKafkaConfig): T = {
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
withTempDir("kafka") { kafkaLogsDir =>
val broker = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
try {
body
} finally {
broker.shutdown()
broker.awaitShutdown()
}
}
}
}

val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka")
/**
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
* The actual ZooKeeper and Kafka ports will be detected and inserted into a copied version of
* the EmbeddedKafkaConfig that gets passed to body. This is useful if you set either or both
* port to 0, which will listen on an arbitrary available port.
*
* @param config the user-defined [[EmbeddedKafkaConfig]]
* @param body the function to execute, given an [[EmbeddedKafkaConfig]] with the actual
* ports Kafka and ZooKeeper are running on
*/
def withRunningKafkaOnFoundPort[T](config: EmbeddedKafkaConfig)(body: EmbeddedKafkaConfig => T): T = {
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
withTempDir("kafka") { kafkaLogsDir =>
val broker: KafkaServer = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
val kafkaPort = broker.boundPort(broker.config.listeners.head.listenerName)
val actualConfig = config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort)
try {
body(actualConfig)
} finally {
broker.shutdown()
broker.awaitShutdown()
}
}
}
}

val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir)
val broker = startKafka(config, kafkaLogsDir)
private def withRunningZooKeeper[T](port: Int)(body: Int => T): T = {
withTempDir("zookeeper-logs") { zkLogsDir =>
val factory = startZooKeeper(port, zkLogsDir)
try {
body(factory.getLocalPort)
} finally {
factory.shutdown()
}
}
}

private def withTempDir[T](prefix: String)(body: Directory => T): T = {
val dir = Directory.makeTemp(prefix)
try {
body
body(dir)
} finally {
broker.shutdown()
broker.awaitShutdown()
factory.shutdown()
cleanLogs(zkLogsDir, kafkaLogsDir)
dir.deleteRecursively()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net.manub.embeddedkafka

class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec
extends EmbeddedKafkaSpecSupport
with EmbeddedKafka {

"the withRunningKafkaOnFoundPort method" should {
"start and stop Kafka and Zookeeper successfully on non-zero ports" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 12346)
val actualConfig = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig =>
actualConfig shouldBe userDefinedConfig
bothKafkaAndZkAreAvailable(actualConfig)
actualConfig
}
bothKafkaAndZkAreNotAvailable(actualConfig)
}

"start and stop multiple Kafka and Zookeeper successfully on arbitrary available ports" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
val actualConfig1 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig1 =>
bothKafkaAndZkAreAvailable(actualConfig1)
publishStringMessageToKafka("topic", "message1")(actualConfig1)
consumeFirstStringMessageFrom("topic")(actualConfig1) shouldBe "message1"
val actualConfig2 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig2 =>
bothKafkaAndZkAreAvailable(actualConfig2)
publishStringMessageToKafka("topic", "message2")(actualConfig2)
consumeFirstStringMessageFrom("topic")(actualConfig2) shouldBe "message2"
val allConfigs = Seq(userDefinedConfig, actualConfig1, actualConfig2)
// Confirm both actual configs are running on separate non-zero ports, but otherwise equal
allConfigs.map(_.kafkaPort).distinct should have size 3
allConfigs.map(_.zooKeeperPort).distinct should have size 3
allConfigs.map(_.copy(kafkaPort = 0, zooKeeperPort = 0)).distinct should have size 1
actualConfig2
}
bothKafkaAndZkAreNotAvailable(actualConfig2)
actualConfig1
}
bothKafkaAndZkAreNotAvailable(actualConfig1)
}

"work with a simple example using implicits" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
publishStringMessageToKafka("topic", "message")
consumeFirstStringMessageFrom("topic") shouldBe "message"
}
}
}

private def bothKafkaAndZkAreAvailable(config: EmbeddedKafkaConfig): Unit = {
kafkaIsAvailable(config.kafkaPort)
zookeeperIsAvailable(config.zooKeeperPort)
}

private def bothKafkaAndZkAreNotAvailable(config: EmbeddedKafkaConfig): Unit = {
kafkaIsNotAvailable(config.kafkaPort)
zookeeperIsNotAvailable(config.zooKeeperPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ abstract class EmbeddedKafkaSpecSupport
expectMsg(1 second, ConnectionSuccessful)
}

def kafkaIsNotAvailable(): Unit = {
def kafkaIsNotAvailable(kafkaPort: Int = 6001): Unit = {
system.actorOf(
TcpClient.props(new InetSocketAddress("localhost", 6001), testActor))
TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor))
expectMsg(1 second, ConnectionFailed)
}

def zookeeperIsNotAvailable(): Unit = {
def zookeeperIsNotAvailable(zookeeperPort: Int = 6000): Unit = {
system.actorOf(
TcpClient.props(new InetSocketAddress("localhost", 6000), testActor))
TcpClient.props(new InetSocketAddress("localhost", zookeeperPort), testActor))
expectMsg(1 second, ConnectionFailed)
}
}
Expand Down

0 comments on commit 53e88e4

Please sign in to comment.