Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka healthcheck #436

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ lazy val skafka = (project in file("skafka")
scalatest % Test,
Cats.laws % Test,
discipline % Test,
CatsEffect.effectTestKit % Test,
`scala-java8-compat`,
`collection-compat`
)))
Expand Down
27 changes: 14 additions & 13 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ import sbt._

object Dependencies {

val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.9.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0"
val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.17"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.11.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0"
val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.17"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"

object Kafka {
private val version = "3.4.0"
val clients = "org.apache.kafka" % "kafka-clients" % version
val clients = "org.apache.kafka" % "kafka-clients" % version
}

object Logback {
Expand All @@ -38,8 +38,9 @@ object Dependencies {

object CatsEffect {
private val version = "3.4.8"
val effect = "org.typelevel" %% "cats-effect" % version
val effectStd = "org.typelevel" %% "cats-effect-std" % version
val effect = "org.typelevel" %% "cats-effect" % version
val effectStd = "org.typelevel" %% "cats-effect-std" % version
val effectTestKit = "org.typelevel" %% "cats-effect-testkit" % version
}

object Smetrics {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package com.evolutiongaming.skafka

import cats.effect._
import cats.data.{NonEmptySet => Nes}
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Monad}
import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, RandomIdOf}
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerOf, Consumer => SKafkaConsumer}
import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, ProducerOf, Producer => SKafkaProducer}

import scala.concurrent.CancellationException
import scala.concurrent.duration._

/**
* Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka.
*/
trait KafkaHealthCheck[F[_]] {

/**
* Returns the last error that occurred during the health check.
*/
def error: F[Option[Throwable]]

/**
* Blocks a fiber until the health check is done.
*/
def done: F[Unit]
}

object KafkaHealthCheck {

def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] {

def error = none[Throwable].pure[F]

def done = ().pure[F]
}

def of[F[_]: Temporal: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry](
config: Config,
consumerConfig: ConsumerConfig,
producerConfig: ProducerConfig
): Resource[F, KafkaHealthCheck[F]] = {

val result = for {
log <- LogOf[F].apply(KafkaHealthCheck.getClass)
randomId <- RandomIdOf[F].apply
} yield {
val key = randomId.value

val consumer = Consumer.of[F](key, consumerConfig)

val producer = Producer.of[F](config.topic, producerConfig)

of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log)
}

result
.toResource
.flatten
}

def of[F[_]: Temporal](
key: String,
config: Config,
stop: F[Boolean],
producer: Resource[F, Producer[F]],
consumer: Resource[F, Consumer[F]],
log: Log[F]
): Resource[F, KafkaHealthCheck[F]] = {

val result = for {
ref <- Ref.of[F, Option[Throwable]](None)
fiber <- (producer, consumer)
.tupled
.use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) }
.start
} yield {
val result = new KafkaHealthCheck[F] {
def error = ref.get
def done = fiber.join.flatMap {
case Outcome.Succeeded(_) => Temporal[F].unit
case Outcome.Errored(e) => Temporal[F].raiseError(e)
case Outcome.Canceled() => Temporal[F].raiseError(new CancellationException("HealthCheck cancelled"))
}
}
(result, fiber.cancel)
}

Resource(result)
}

def run[F[_]: Temporal](
key: String,
config: Config,
stop: F[Boolean],
producer: Producer[F],
consumer: Consumer[F],
set: Option[Throwable] => F[Unit],
log: Log[F]
): F[Unit] = {

val sleep = Temporal[F].sleep(config.interval)

def produce(value: String) = {
val record = Record(key = key.some, value = value.some)
for {
_ <- log.debug(s"$key send $value")
_ <- producer.send(record)
} yield {}
}

def produceConsume(n: Long) = {
val value = n.toString

def consume(retry: Long) = {
for {
records <- consumer.poll(config.pollTimeout)
found = records.find { record => record.key.contains_(key) && record.value.contains_(value) }
result <- found.fold {
for {
_ <- sleep
_ <- produce(s"$n:$retry")
} yield {
(retry + 1).asLeft[Unit]
}
} { _ =>
().asRight[Long].pure[F]
}
} yield result
}

val produceConsume = for {
_ <- produce(value)
_ <- 0L.tailRecM(consume)
} yield {}

produceConsume
.timeout(config.timeout)
.redeem(_.some, _ => none[Throwable])
}

def check(n: Long) = {
for {
error <- produceConsume(n)
_ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") }
_ <- set(error)
_ <- sleep
stop <- stop
} yield {
if (stop) ().asRight[Long]
else (n + 1).asLeft[Unit]
}
}

for {
_ <- Temporal[F].sleep(config.initial)
_ <- consumer.subscribe(config.topic)
_ <- consumer.poll(config.interval)
_ <- produceConsume(0L) // warmup
_ <- 1L.tailRecM(check)
} yield {}
}

trait Producer[F[_]] {
def send(record: Record): F[Unit]
}

object Producer {

def apply[F[_]](implicit F: Producer[F]): Producer[F] = F

def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = {
new Producer[F] {
def send(record: Record) = {
val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value)
producer.send(record1).void
}
}
}

def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = {
for {
producer <- implicitly[ProducerOf[F]].apply(config)
} yield {
Producer[F](topic = topic, producer = producer)
}
}
}

trait Consumer[F[_]] {

def subscribe(topic: Topic): F[Unit]

def poll(timeout: FiniteDuration): F[Iterable[Record]]
}

object Consumer {

def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F

def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = {

new Consumer[F] {

def subscribe(topic: Topic) = {
consumer.subscribe(Nes.of(topic))
}

def poll(timeout: FiniteDuration) = {
for {
records <- consumer.poll(timeout)
} yield for {
record <- records.values.values.flatMap(_.toList)
} yield {
Record(key = record.key.map(_.value), value = record.value.map(_.value))
}
}
}
}

def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = {
val config1 = {
val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" }
config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest)
}

for {
consumer <- implicitly[ConsumerOf[F]].apply[String, String](config1)
} yield {
Consumer[F](consumer)
}
}
}

final case class Record(key: Option[String], value: Option[String])

final case class Config(
topic: Topic = "healthcheck",
initial: FiniteDuration = 10.seconds,
interval: FiniteDuration = 1.second,
timeout: FiniteDuration = 2.minutes,
pollTimeout: FiniteDuration = 10.millis
)

object Config {
val default: Config = Config()
}
}
Loading
Loading