Skip to content

Commit

Permalink
Formatted with scalafmt.
Browse files Browse the repository at this point in the history
  • Loading branch information
manub committed Feb 26, 2017
1 parent 0057f82 commit d1127eb
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 139 deletions.
Empty file added .scalafmt.conf
Empty file.
18 changes: 8 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ lazy val commonSettings = Seq(
javaOptions += "-Xmx1G"
)


lazy val commonLibrarySettings = libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1",
"org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.avro" % "avro" % "1.8.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
)
"org.scalatest" %% "scalatest" % "3.0.1",
"org.apache.kafka" %% "kafka" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.avro" % "avro" % "1.8.1" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
)

lazy val publishSettings = Seq(
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand Down Expand Up @@ -65,7 +64,6 @@ lazy val root = (project in file("."))
.settings(publishTo := Some(Resolver.defaultLocal))
.aggregate(embeddedKafka, kafkaStreams)


lazy val embeddedKafka = (project in file("embedded-kafka"))
.settings(name := "scalatest-embedded-kafka")
.settings(publishSettings: _*)
Expand All @@ -80,6 +78,6 @@ lazy val kafkaStreams = (project in file("kafka-streams"))
.settings(commonLibrarySettings)
.settings(releaseSettings: _*)
.settings(libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact)
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact)
))
.dependsOn(embeddedKafka)
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ object ConsumerExtensions {
* @param topic the topic to consume
* @return the next batch of messages
*/
def getNextBatch(topic: String): Seq[(K, V)] = Try {
import scala.collection.JavaConversions._
consumer.subscribe(List(topic))
consumer.partitionsFor(topic)
val records = consumer.poll(2000)
// use toList to force eager evaluation. toSeq is lazy
records.iterator().toList.map(r => r.key -> r.value)
}.recover {
case ex: KafkaException => throw new KafkaUnavailableException(ex)
}.get
def getNextBatch(topic: String): Seq[(K, V)] =
Try {
import scala.collection.JavaConversions._
consumer.subscribe(List(topic))
consumer.partitionsFor(topic)
val records = consumer.poll(2000)
// use toList to force eager evaluation. toSeq is lazy
records.iterator().toList.map(r => r.key -> r.value)
}.recover {
case ex: KafkaException => throw new KafkaUnavailableException(ex)
}.get
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.apache.kafka.common.serialization.Deserializer

/** Utility trait for easily creating Kafka consumers and accessing their consumed messages. */
trait Consumers {

/** Loaner pattern that allows running a code block with a newly created consumer.
* The consumer's lifecycle will be automatically handled and closed at the end of the
* given code block.
Expand All @@ -18,8 +19,9 @@ trait Consumers {
* @tparam T the type of the block's returning result
* @return the result of the executed block
*/
def withConsumer[K: Deserializer, V: Deserializer, T](block: KafkaConsumer[K, V] => T)
(implicit config: EmbeddedKafkaConfig): T = {
def withConsumer[K: Deserializer, V: Deserializer, T](
block: KafkaConsumer[K, V] => T)(
implicit config: EmbeddedKafkaConfig): T = {
val consumer = newConsumer[K, V]()
try {
val result = block(consumer)
Expand All @@ -35,8 +37,8 @@ trait Consumers {
* @tparam T the type of the result of the code block
* @return the code block result
*/
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)
(implicit config: EmbeddedKafkaConfig): T = {
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)(
implicit config: EmbeddedKafkaConfig): T = {
import net.manub.embeddedkafka.Codecs.stringDeserializer
withConsumer(block)
}
Expand All @@ -47,13 +49,15 @@ trait Consumers {
* @tparam V the type of the consumer's Value
* @return the new consumer
*/
def newConsumer[K: Deserializer, V: Deserializer]()
(implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
def newConsumer[K: Deserializer, V: Deserializer]()(
implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
val props = new Properties()
props.put("group.id", UUIDs.newUuid().toString)
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
props.put("auto.offset.reset", "earliest")

new KafkaConsumer[K, V](props, implicitly[Deserializer[K]], implicitly[Deserializer[V]])
new KafkaConsumer[K, V](props,
implicitly[Deserializer[K]],
implicitly[Deserializer[V]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@ import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.producer.{
KafkaProducer,
ProducerConfig,
ProducerRecord
}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.serialization.{
Deserializer,
Serializer,
StringDeserializer,
StringSerializer
}
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
import org.scalatest.Suite

Expand Down Expand Up @@ -120,7 +129,8 @@ sealed trait EmbeddedKafkaSupport {
* @param body the function to execute
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def withRunningKafka(body: => Any)(implicit config: EmbeddedKafkaConfig): Any = {
def withRunningKafka(body: => Any)(
implicit config: EmbeddedKafkaConfig): Any = {

def cleanLogs(directories: Directory*): Unit = {
directories.foreach(_.deleteRecursively())
Expand Down Expand Up @@ -211,7 +221,8 @@ sealed trait EmbeddedKafkaSupport {
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
)

private def baseConsumerConfig(implicit config: EmbeddedKafkaConfig) : Properties = {
private def baseConsumerConfig(
implicit config: EmbeddedKafkaConfig): Properties = {
val props = new Properties()
props.put("group.id", s"embedded-kafka-spec")
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
Expand All @@ -220,13 +231,19 @@ sealed trait EmbeddedKafkaSupport {
props
}

def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
def consumeFirstStringMessageFrom(topic: String,
autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig): String =
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())
consumeFirstMessageFrom(topic, autoCommit)(config,
new StringDeserializer())

def consumeNumberStringMessagesFrom(topic: String, number: Int, autoCommit: Boolean = false)(
def consumeNumberStringMessagesFrom(topic: String,
number: Int,
autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig): List[String] =
consumeNumberMessagesFrom(topic, number, autoCommit)(config, new StringDeserializer())
consumeNumberMessagesFrom(topic, number, autoCommit)(
config,
new StringDeserializer())

/**
* Consumes the first message available in a given topic, deserializing it as a String.
Expand Down Expand Up @@ -300,9 +317,11 @@ sealed trait EmbeddedKafkaSupport {
* @throws TimeoutException if unable to consume a message within 5 seconds
* @throws KafkaUnavailableException if unable to connect to Kafka
*/
def consumeNumberMessagesFrom[T](topic: String, number: Int, autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig,
deserializer: Deserializer[T]): List[T] = {
def consumeNumberMessagesFrom[T](topic: String,
number: Int,
autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig,
deserializer: Deserializer[T]): List[T] = {

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -344,7 +363,6 @@ sealed trait EmbeddedKafkaSupport {
}.get
}


object aKafkaProducer {
private[this] var producers = Vector.empty[KafkaProducer[_, _]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package net.manub.embeddedkafka

case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
zooKeeperPort: Int = 6000,
customBrokerProperties: Map[String, String] = Map.empty)
customBrokerProperties: Map[String, String] =
Map.empty)

object EmbeddedKafkaConfig {
implicit val defaultConfig = EmbeddedKafkaConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.util.UUID
* Useful for separating IDs and directories across test cases.
*/
object UUIDs {

/** Create a new unique ID.
*
* @return the unique ID
Expand Down
Loading

0 comments on commit d1127eb

Please sign in to comment.