diff --git a/.bsp/sbt.json b/.bsp/sbt.json deleted file mode 100644 index 889c5517..00000000 --- a/.bsp/sbt.json +++ /dev/null @@ -1 +0,0 @@ -{"name":"sbt","version":"1.4.7","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.3.3/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp"]} \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..15f55dc3 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,45 @@ +name: build + +on: [push, pull_request] + +jobs: + + tests: + name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests + runs-on: ubuntu-latest + + strategy: + fail-fast: true + matrix: + java: [8] + scala: [2.11.12, 2.12.15] + + steps: + - uses: actions/checkout@v2 + - uses: olafurpg/setup-scala@v10 + with: + java-version: "adopt@1.${{ matrix.java }}" + + - name: Cache SBT Coursier directory + uses: actions/cache@v1 + with: + path: ~/.cache/coursier/v1 + key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }} + restore-keys: | + ${{ runner.os }}-coursier- + - name: Cache SBT directory + uses: actions/cache@v1 + with: + path: ~/.sbt + key: | + ${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }} + restore-keys: ${{ runner.os }}-sbt- + + - name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka10/test + + - name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka11/test + + - name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka1x/test diff --git a/.gitignore b/.gitignore index 1310ac33..de2c6fed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ project/plugins/project/ .scala_dependencies .worksheet .idea + +.bsp/ diff --git a/build.sbt b/build.sbt index 5984497a..e9e36e69 100644 --- a/build.sbt +++ b/build.sbt @@ -197,8 +197,9 @@ lazy val commonDependencies = Seq( "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", "org.scalatest" %% "scalatest" % "3.0.9" % "test", "org.scalacheck" %% "scalacheck" % "1.15.2" % "test", - "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1" force() + "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test force() ), + dependencyOverrides += "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test ) lazy val monixKafka = project.in(file(".")) diff --git a/kafka-0.11.x/src/test/resources/logback.xml b/kafka-0.11.x/src/test/resources/logback.xml index 2beb83c9..5a3e8a64 100644 --- a/kafka-0.11.x/src/test/resources/logback.xml +++ b/kafka-0.11.x/src/test/resources/logback.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala index 9268df54..41ea84c3 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerResource.scala @@ -25,11 +25,12 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsume import scala.concurrent.blocking import scala.util.matching.Regex -/** Exposes an `Observable` that consumes a Kafka stream by - * means of a Kafka Consumer client. +/** Acquires and releases a [[KafkaConsumer]] within a [[Resource]] + * is exposed in form of [[KafkaConsumerObservable]], which consumes + * and emits records from the specified topic. * - * In order to get initialized, it needs a configuration. See the - * [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`, + * In order to get initialized, it needs a configuration. + * @see the [[KafkaConsumerConfig]] needed and `monix/kafka/default.conf`, * (in the resource files) that is exposing all default values. */ object KafkaConsumerResource { diff --git a/kafka-1.0.x/src/test/resources/logback-test.xml b/kafka-1.0.x/src/test/resources/logback-test.xml index cc97f771..157645e0 100644 --- a/kafka-1.0.x/src/test/resources/logback-test.xml +++ b/kafka-1.0.x/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - + diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala index e7a5d1a7..80dd020c 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/KafkaConsumerResourceSpec.scala @@ -16,13 +16,13 @@ class KafkaConsumerResourceSpec extends FunSuite with KafkaTestKit with ScalaChe val consumerConf: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - groupId = "failing-logic", + groupId = "monix-closeable-consumer-test", autoOffsetReset = AutoOffsetReset.Earliest ) val producerCfg = KafkaProducerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-producer-test" + clientId = "monix-closeable-consumer-test" ) test("async commit fails when observable was already cancelled") { diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 7f0a8c7b..6a03f5b4 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -1,6 +1,5 @@ package monix.kafka -import cats.effect.Resource import monix.eval.Task import monix.kafka.config.AutoOffsetReset import monix.reactive.Observable @@ -8,8 +7,8 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ +import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global -import net.manub.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen @@ -30,43 +29,45 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe commit <- Gen.oneOf(commitCallbacks) } yield CommittableOffset(new TopicPartition("topic", partition), offset, commit) - private def logic(bootstrapServer: String, topic: String) = { - val kafkaConfig: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( - bootstrapServers = List(bootstrapServer), - groupId = "failing-logic", - autoOffsetReset = AutoOffsetReset.Earliest - ) - KafkaConsumerObservable - .manualCommit[String, String](kafkaConfig, List(topic)) - .timeoutOnSlowUpstreamTo(6.seconds, Observable.empty) - .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) } - .map{completeBatch => - {Task.unit >> Task.sleep(3.second) >> Task.evalAsync(println("Committing async!!!")) >> completeBatch.commitAsync()}.runSyncUnsafe() - } - .headL - } + test("merge by commit callback works") { + forAll(Gen.nonEmptyListOf(committableOffsetsGen)) { offsets => + val partitions = offsets.map(_.topicPartition) + val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - test("async commit finalizes successfully after cancellation") { - EmbeddedKafka.start() - val batchSize = 10 + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } - val topicName = "random_topic" + received.size should be <= 4 + } + } + + test("merge by commit callback for multiple consumers") { + withRunningKafka { + val count = 10000 + val topicName = "monix-kafka-merge-by-commit" val producerCfg = KafkaProducerConfig.default.copy( bootstrapServers = List("127.0.0.1:6001"), - clientId = "monix-kafka-producer-test" + clientId = "monix-kafka-1-0-producer-test" ) - val t = for { - _ <- Resource.liftF(Task(KafkaProducer[String, String](producerCfg, io))).use { producer => - Task(producer.send(new ProducerRecord(topicName, "message1"))) >> - Task(producer.send(new ProducerRecord(topicName, "message2"))) - } - _ <- logic("127.0.0.1:6001", topicName) - } yield () - t.runSyncUnsafe() + val producer = KafkaProducerSink[String, String](producerCfg, io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = Observable + .range(0, 4) + .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) + .bufferTumbling(2000) + .map(CommittableOffsetBatch.mergeByCommitCallback) + .map { offsetBatches => assert(offsetBatches.length == 4) } + .completedL - EmbeddedKafka.stop() + Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + } } private def createConsumer(i: Int, topicName: String): Observable[CommittableOffset] = { diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala new file mode 100644 index 00000000..e1941115 --- /dev/null +++ b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -0,0 +1,135 @@ +package monix.kafka + +import java.util + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.FunSuite +import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} +import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} + +import scala.concurrent.duration._ +import scala.concurrent.Await +import monix.execution.Scheduler.Implicits.global +import monix.execution.exceptions.DummyException + +class SerializationTest extends FunSuite with KafkaTestKit { + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-serialization-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-serialization-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("serialization/deserialization using kafka.common.serialization") { + withRunningKafka { + val topicName = "monix-kafka-serialization-tests" + val count = 10000 + + implicit val serializer: KafkaSerializer[A] = new ASerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).sum) + } + } + + test("allow to fail the stream if serialization throws") { + withRunningKafka { + val topicName = "monix-kafka-serialization-failing-tests" + val dummy = DummyException("boom") + + implicit val serializer: KafkaSerializer[A] = new AFailingSerializer + + val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy)) + + val pushT = Observable + .evalOnce(new ProducerRecord(topicName, "obs", A(1.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + assertThrows[DummyException] { + Await.result(pushT.runToFuture, 60.seconds) + } + } + } + + test("allow to recover from serialization errors") { + withRunningKafka { + val topicName = "monix-kafka-serialization-continuing-tests" + val count = 100 + + implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) + } + } + +} + +case class A(value: String) + +class ASerializer extends KafkaSerializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def serialize(topic: String, data: A): Array[Byte] = + if (data == null) null else data.value.getBytes + + override def close(): Unit = () +} + +class ADeserializer extends KafkaDeserializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data)) + + override def close(): Unit = () +} + +class AFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail") +} + +class AHalfFailingSerializer extends ASerializer { + + override def serialize(topic: String, data: A): Array[Byte] = { + if (data.value.toInt % 2 == 0) super.serialize(topic, data) + else throw new RuntimeException("fail") + } +}