-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implements a KafkaConsumerResource (#240)
Trigger ci 1 Remove printline Update scala version build Refinement Bring back serialization test
- Loading branch information
Showing
10 changed files
with
225 additions
and
41 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
name: build | ||
|
||
on: [push, pull_request] | ||
|
||
jobs: | ||
|
||
tests: | ||
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests | ||
runs-on: ubuntu-latest | ||
|
||
strategy: | ||
fail-fast: true | ||
matrix: | ||
java: [8] | ||
scala: [2.11.12, 2.12.15] | ||
|
||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: olafurpg/setup-scala@v10 | ||
with: | ||
java-version: "adopt@1.${{ matrix.java }}" | ||
|
||
- name: Cache SBT Coursier directory | ||
uses: actions/cache@v1 | ||
with: | ||
path: ~/.cache/coursier/v1 | ||
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }} | ||
restore-keys: | | ||
${{ runner.os }}-coursier- | ||
- name: Cache SBT directory | ||
uses: actions/cache@v1 | ||
with: | ||
path: ~/.sbt | ||
key: | | ||
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }} | ||
restore-keys: ${{ runner.os }}-sbt- | ||
|
||
- name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} | ||
run: sbt -J-Xmx6144m kafka10/test | ||
|
||
- name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} | ||
run: sbt -J-Xmx6144m kafka11/test | ||
|
||
- name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} | ||
run: sbt -J-Xmx6144m kafka1x/test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,3 +16,5 @@ project/plugins/project/ | |
.scala_dependencies | ||
.worksheet | ||
.idea | ||
|
||
.bsp/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
</encoder> | ||
</appender> | ||
|
||
<root level="WARN"> | ||
<root level="ERROR"> | ||
<appender-ref ref="STDOUT" /> | ||
</root> | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package monix.kafka | ||
|
||
import java.util | ||
|
||
import monix.eval.Task | ||
import monix.kafka.config.AutoOffsetReset | ||
import monix.reactive.Observable | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
import org.scalatest.FunSuite | ||
import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} | ||
import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} | ||
|
||
import scala.concurrent.duration._ | ||
import scala.concurrent.Await | ||
import monix.execution.Scheduler.Implicits.global | ||
import monix.execution.exceptions.DummyException | ||
|
||
class SerializationTest extends FunSuite with KafkaTestKit { | ||
|
||
val producerCfg = KafkaProducerConfig.default.copy( | ||
bootstrapServers = List("127.0.0.1:6001"), | ||
clientId = "monix-kafka-1-0-serialization-test" | ||
) | ||
|
||
val consumerCfg = KafkaConsumerConfig.default.copy( | ||
bootstrapServers = List("127.0.0.1:6001"), | ||
groupId = "kafka-tests", | ||
clientId = "monix-kafka-1-0-serialization-test", | ||
autoOffsetReset = AutoOffsetReset.Earliest | ||
) | ||
|
||
test("serialization/deserialization using kafka.common.serialization") { | ||
withRunningKafka { | ||
val topicName = "monix-kafka-serialization-tests" | ||
val count = 10000 | ||
|
||
implicit val serializer: KafkaSerializer[A] = new ASerializer | ||
implicit val deserializer: KafkaDeserializer[A] = new ADeserializer | ||
|
||
val producer = KafkaProducerSink[String, A](producerCfg, io) | ||
val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count) | ||
|
||
val pushT = Observable | ||
.range(0, count) | ||
.map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) | ||
.bufferIntrospective(1024) | ||
.consumeWith(producer) | ||
|
||
val listT = consumer | ||
.map(_.value()) | ||
.toListL | ||
|
||
val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) | ||
assert(result.map(_.value.toInt).sum === (0 until count).sum) | ||
} | ||
} | ||
|
||
test("allow to fail the stream if serialization throws") { | ||
withRunningKafka { | ||
val topicName = "monix-kafka-serialization-failing-tests" | ||
val dummy = DummyException("boom") | ||
|
||
implicit val serializer: KafkaSerializer[A] = new AFailingSerializer | ||
|
||
val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy)) | ||
|
||
val pushT = Observable | ||
.evalOnce(new ProducerRecord(topicName, "obs", A(1.toString))) | ||
.bufferIntrospective(1024) | ||
.consumeWith(producer) | ||
|
||
assertThrows[DummyException] { | ||
Await.result(pushT.runToFuture, 60.seconds) | ||
} | ||
} | ||
} | ||
|
||
test("allow to recover from serialization errors") { | ||
withRunningKafka { | ||
val topicName = "monix-kafka-serialization-continuing-tests" | ||
val count = 100 | ||
|
||
implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer | ||
implicit val deserializer: KafkaDeserializer[A] = new ADeserializer | ||
|
||
val producer = KafkaProducerSink[String, A](producerCfg, io) | ||
val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2) | ||
|
||
val pushT = Observable | ||
.range(0, count) | ||
.map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) | ||
.bufferIntrospective(1024) | ||
.consumeWith(producer) | ||
|
||
val listT = consumer | ||
.map(_.value()) | ||
.toListL | ||
|
||
val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) | ||
assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) | ||
} | ||
} | ||
|
||
} | ||
|
||
case class A(value: String) | ||
|
||
class ASerializer extends KafkaSerializer[A] { | ||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | ||
|
||
override def serialize(topic: String, data: A): Array[Byte] = | ||
if (data == null) null else data.value.getBytes | ||
|
||
override def close(): Unit = () | ||
} | ||
|
||
class ADeserializer extends KafkaDeserializer[A] { | ||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | ||
|
||
override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data)) | ||
|
||
override def close(): Unit = () | ||
} | ||
|
||
class AFailingSerializer extends ASerializer { | ||
override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail") | ||
} | ||
|
||
class AHalfFailingSerializer extends ASerializer { | ||
|
||
override def serialize(topic: String, data: A): Array[Byte] = { | ||
if (data.value.toInt % 2 == 0) super.serialize(topic, data) | ||
else throw new RuntimeException("fail") | ||
} | ||
} |