Skip to content

Commit

Permalink
Add scaladoc and reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz-bielski committed Aug 6, 2024
1 parent 671fe86 commit a5be568
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ object Dependencies {

val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7"
// TODO: `3.10.6-SNAPSHOT` is a local release, remote release depends on https://github.com/evolution-gaming/cats-helper/pull/288
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0"
val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,36 @@ import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, Prod
import scala.concurrent.CancellationException
import scala.concurrent.duration._

/**
* Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka.
*/
trait KafkaHealthCheck[F[_]] {

/**
* Returns the last error that occurred during the health check.
*
* @return
*/
def error: F[Option[Throwable]]

/**
* Blocks a fiber until the health check is done.
*
* @return
*/
def done: F[Unit]
}

object KafkaHealthCheck {

def empty[F[_] : Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] {
def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] {

def error = none[Throwable].pure[F]

def done = ().pure[F]
}


def of[F[_] : Temporal : LogOf : ConsumerOf : ProducerOf : RandomIdOf : FromTry](
def of[F[_]: Temporal: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry](
config: Config,
consumerConfig: ConsumerConfig,
producerConfig: ProducerConfig
Expand All @@ -45,21 +57,15 @@ object KafkaHealthCheck {

val producer = Producer.of[F](config.topic, producerConfig)

of(
key = key,
config = config,
stop = false.pure[F],
producer = producer,
consumer = consumer,
log = log)
of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log)
}

result
.toResource
.flatten
}

def of[F[_] : Temporal](
def of[F[_]: Temporal](
key: String,
config: Config,
stop: F[Boolean],
Expand All @@ -69,7 +75,7 @@ object KafkaHealthCheck {
): Resource[F, KafkaHealthCheck[F]] = {

val result = for {
ref <- Ref.of[F, Option[Throwable]](None)
ref <- Ref.of[F, Option[Throwable]](None)
fiber <- (producer, consumer)
.tupled
.use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) }
Expand All @@ -89,7 +95,7 @@ object KafkaHealthCheck {
Resource(result)
}

def run[F[_] : Temporal](
def run[F[_]: Temporal](
key: String,
config: Config,
stop: F[Boolean],
Expand All @@ -116,7 +122,7 @@ object KafkaHealthCheck {
for {
records <- consumer.poll(config.pollTimeout)
found = records.find { record => record.key.contains_(key) && record.value.contains_(value) }
result <- found.fold {
result <- found.fold {
for {
_ <- sleep
_ <- produce(s"$n:$retry")
Expand All @@ -141,11 +147,11 @@ object KafkaHealthCheck {

def check(n: Long) = {
for {
error <- produceConsume(n)
_ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") }
_ <- set(error)
_ <- sleep
stop <- stop
error <- produceConsume(n)
_ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") }
_ <- set(error)
_ <- sleep
stop <- stop
} yield {
if (stop) ().asRight[Long]
else (n + 1).asLeft[Unit]
Expand All @@ -161,7 +167,6 @@ object KafkaHealthCheck {
} yield {}
}


trait Producer[F[_]] {
def send(record: Record): F[Unit]
}
Expand All @@ -170,7 +175,7 @@ object KafkaHealthCheck {

def apply[F[_]](implicit F: Producer[F]): Producer[F] = F

def apply[F[_] : Monad : FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = {
def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = {
new Producer[F] {
def send(record: Record) = {
val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value)
Expand All @@ -179,7 +184,7 @@ object KafkaHealthCheck {
}
}

def of[F[_] : Monad : ProducerOf : FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = {
def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = {
for {
producer <- implicitly[ProducerOf[F]].apply(config)
} yield {
Expand All @@ -188,19 +193,18 @@ object KafkaHealthCheck {
}
}


trait Consumer[F[_]] {

def subscribe(topic: Topic): F[Unit]

def poll(timeout: FiniteDuration): F[Iterable[Record]]
}

object Consumer {

def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F

def apply[F[_] : Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = {
def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = {

new Consumer[F] {

Expand All @@ -220,12 +224,10 @@ object KafkaHealthCheck {
}
}

def of[F[_] : Monad : ConsumerOf : FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = {
def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = {
val config1 = {
val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" }
config.copy(
groupId = groupId.some,
autoOffsetReset = AutoOffsetReset.Latest)
config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest)
}

for {
Expand All @@ -236,16 +238,15 @@ object KafkaHealthCheck {
}
}


final case class Record(key: Option[String], value: Option[String])


final case class Config(
topic: Topic = "healthcheck",
initial: FiniteDuration = 10.seconds,
interval: FiniteDuration = 1.second,
timeout: FiniteDuration = 2.minutes,
pollTimeout: FiniteDuration = 10.millis)
topic: Topic = "healthcheck",
initial: FiniteDuration = 10.seconds,
interval: FiniteDuration = 1.second,
timeout: FiniteDuration = 2.minutes,
pollTimeout: FiniteDuration = 10.millis
)

object Config {
val default: Config = Config()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers {
records: List[Record] = List.empty
)

// TODO: mapK ?
def logOf(ref: Ref[IO, State]): Log[IO] = {
def add(log: String): IO[Unit] =
ref.update(state => state.copy(logs = log :: state.logs))
Expand Down

0 comments on commit a5be568

Please sign in to comment.