Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements a KafkaConsumerResource to solve consumer already closed i… #306

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: build

on: [push, pull_request]

jobs:

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest

strategy:
fail-fast: true
matrix:
java: [8]
scala: [2.11.12, 2.12.15]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Cache SBT Coursier directory
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }}
restore-keys: |
${{ runner.os }}-coursier-
- name: Cache SBT directory
uses: actions/cache@v1
with:
path: ~/.sbt
key: |
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }}
restore-keys: ${{ runner.os }}-sbt-

- name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka10/test

- name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka11/test

- name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka1x/test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

.bsp/
33 changes: 11 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ lazy val warnUnusedImport = Seq(

lazy val sharedSettings = warnUnusedImport ++ Seq(
organization := "io.monix",
scalaVersion := "2.12.14",
crossScalaVersions := Seq("2.11.12", "2.12.14", "2.13.6"),

scalaVersion := "2.12.15",
crossScalaVersions := Seq("2.11.12", "2.12.15", "2.13.6"),
scalacOptions ++= Seq(
// warnings
"-unchecked", // able additional warnings where generated code depends on assumptions
Expand Down Expand Up @@ -84,11 +83,11 @@ lazy val sharedSettings = warnUnusedImport ++ Seq(
scalacOptions ++= Seq(
// Turns all warnings into errors ;-)
// TODO: enable after fixing deprecations for Scala 2.13
"-Xfatal-warnings",
//"-Xfatal-warnings",
// Enables linter options
"-Xlint:adapted-args", // warn if an argument list is modified to match the receiver
"-Xlint:nullary-unit", // warn when nullary methods return Unit
"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f'
//"-Xlint:nullary-unit", // warn when nullary methods return Unit
//"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f'
"-Xlint:infer-any", // warn when a type argument is inferred to be `Any`
"-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id
"-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element
Expand Down Expand Up @@ -197,8 +196,10 @@ lazy val commonDependencies = Seq(
// For testing ...
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
"org.scalatest" %% "scalatest" % "3.0.9" % "test",
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test"
)
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test",
"io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test force()
),
dependencyOverrides += "io.github.embeddedkafka" %% "embedded-kafka" % "2.1.0" % Test
)

lazy val monixKafka = project.in(file("."))
Expand All @@ -212,10 +213,6 @@ lazy val kafka1x = project.in(file("kafka-1.0.x"))
.settings(mimaSettings("monix-kafka-1x"))
.settings(
name := "monix-kafka-1x",
libraryDependencies ++= {
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j"))
else Seq.empty[ModuleID]
},
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.0.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
)

Expand All @@ -225,10 +222,6 @@ lazy val kafka11 = project.in(file("kafka-0.11.x"))
.settings(mimaSettings("monix-kafka-11"))
.settings(
name := "monix-kafka-11",
libraryDependencies ++= {
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j"))
else Seq.empty[ModuleID]
},
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.3" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
)

Expand All @@ -237,12 +230,8 @@ lazy val kafka10 = project.in(file("kafka-0.10.x"))
.settings(commonDependencies)
.settings(mimaSettings("monix-kafka-10"))
.settings(
name := "monix-kafka-10",
libraryDependencies ++= {
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "0.16.0" % "test" exclude ("log4j", "log4j"))
else Seq.empty[ModuleID]
},
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" force(),
//dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" // exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
)

lazy val kafka9 = project.in(file("kafka-0.9.x"))
Expand Down
2 changes: 1 addition & 1 deletion kafka-0.11.x/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>

<root level="WARN">
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import scala.util.matching.Regex
trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
protected def config: KafkaConsumerConfig
protected def consumer: Task[Consumer[K, V]]
protected val shouldClose: Boolean

/** Creates a task that polls the source, then feeds the downstream
* subscriber, returning the resulting acknowledgement
Expand Down Expand Up @@ -90,7 +91,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = {
// Forced asynchronous boundary
val cancelTask = Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
if (shouldClose) { consumer.synchronized(blocking(consumer.close())) }
}

// By applying memoization, we are turning this
Expand All @@ -113,10 +114,11 @@ object KafkaConsumerObservable {
* `org.apache.kafka.clients.consumer.KafkaConsumer`
* instance to use for consuming from Kafka
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def apply[K, V](
cfg: KafkaConsumerConfig,
consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] =
new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer)
new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer, true)

/** Builds a [[KafkaConsumerObservable]] instance.
*
Expand All @@ -126,6 +128,7 @@ object KafkaConsumerObservable {
*
* @param topics is the list of Kafka topics to subscribe to.
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {
Expand All @@ -142,6 +145,7 @@ object KafkaConsumerObservable {
*
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {
Expand Down Expand Up @@ -173,12 +177,13 @@ object KafkaConsumerObservable {
* `org.apache.kafka.clients.consumer.KafkaConsumer`
* instance to use for consuming from Kafka
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def manualCommit[K, V](
cfg: KafkaConsumerConfig,
consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {

val manualCommitConfig = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false)
new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer)
new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer, shouldClose = true)
}

/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
Expand All @@ -202,6 +207,7 @@ object KafkaConsumerObservable {
*
* @param topics is the list of Kafka topics to subscribe to.
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {
Expand Down Expand Up @@ -231,6 +237,7 @@ object KafkaConsumerObservable {
*
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
*/
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import scala.util.{Failure, Success}
*/
final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
override protected val config: KafkaConsumerConfig,
override protected val consumer: Task[Consumer[K, V]])
override protected val consumer: Task[Consumer[K, V]],
override protected val shouldClose: Boolean)
extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] {

/* Based on the [[KafkaConsumerConfig.observableCommitType]] it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import scala.jdk.CollectionConverters._
*/
final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
override protected val config: KafkaConsumerConfig,
override protected val consumer: Task[Consumer[K, V]])
override protected val consumer: Task[Consumer[K, V]],
override protected val shouldClose: Boolean)
extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] {

// Caching value to save CPU cycles
Expand Down
Loading