diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 39296373..9fd06ab5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,6 +4,7 @@ object Dependencies { val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" + // TODO: `3.10.6-SNAPSHOT` is a local release, remote release depends on https://github.com/evolution-gaming/cats-helper/pull/288 val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala index 7393af29..78738ef8 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala @@ -12,24 +12,36 @@ import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, Prod import scala.concurrent.CancellationException import scala.concurrent.duration._ +/** + * Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka. + */ trait KafkaHealthCheck[F[_]] { + /** + * Returns the last error that occurred during the health check. + * + * @return + */ def error: F[Option[Throwable]] + /** + * Blocks a fiber until the health check is done. + * + * @return + */ def done: F[Unit] } object KafkaHealthCheck { - def empty[F[_] : Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { + def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { def error = none[Throwable].pure[F] def done = ().pure[F] } - - def of[F[_] : Temporal : LogOf : ConsumerOf : ProducerOf : RandomIdOf : FromTry]( + def of[F[_]: Temporal: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry]( config: Config, consumerConfig: ConsumerConfig, producerConfig: ProducerConfig @@ -45,13 +57,7 @@ object KafkaHealthCheck { val producer = Producer.of[F](config.topic, producerConfig) - of( - key = key, - config = config, - stop = false.pure[F], - producer = producer, - consumer = consumer, - log = log) + of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log) } result @@ -59,7 +65,7 @@ object KafkaHealthCheck { .flatten } - def of[F[_] : Temporal]( + def of[F[_]: Temporal]( key: String, config: Config, stop: F[Boolean], @@ -69,7 +75,7 @@ object KafkaHealthCheck { ): Resource[F, KafkaHealthCheck[F]] = { val result = for { - ref <- Ref.of[F, Option[Throwable]](None) + ref <- Ref.of[F, Option[Throwable]](None) fiber <- (producer, consumer) .tupled .use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) } @@ -89,7 +95,7 @@ object KafkaHealthCheck { Resource(result) } - def run[F[_] : Temporal]( + def run[F[_]: Temporal]( key: String, config: Config, stop: F[Boolean], @@ -116,7 +122,7 @@ object KafkaHealthCheck { for { records <- consumer.poll(config.pollTimeout) found = records.find { record => record.key.contains_(key) && record.value.contains_(value) } - result <- found.fold { + result <- found.fold { for { _ <- sleep _ <- produce(s"$n:$retry") @@ -141,11 +147,11 @@ object KafkaHealthCheck { def check(n: Long) = { for { - error <- produceConsume(n) - _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } - _ <- set(error) - _ <- sleep - stop <- stop + error <- produceConsume(n) + _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } + _ <- set(error) + _ <- sleep + stop <- stop } yield { if (stop) ().asRight[Long] else (n + 1).asLeft[Unit] @@ -161,7 +167,6 @@ object KafkaHealthCheck { } yield {} } - trait Producer[F[_]] { def send(record: Record): F[Unit] } @@ -170,7 +175,7 @@ object KafkaHealthCheck { def apply[F[_]](implicit F: Producer[F]): Producer[F] = F - def apply[F[_] : Monad : FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { + def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { new Producer[F] { def send(record: Record) = { val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value) @@ -179,7 +184,7 @@ object KafkaHealthCheck { } } - def of[F[_] : Monad : ProducerOf : FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { + def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { for { producer <- implicitly[ProducerOf[F]].apply(config) } yield { @@ -188,11 +193,10 @@ object KafkaHealthCheck { } } - trait Consumer[F[_]] { def subscribe(topic: Topic): F[Unit] - + def poll(timeout: FiniteDuration): F[Iterable[Record]] } @@ -200,7 +204,7 @@ object KafkaHealthCheck { def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F - def apply[F[_] : Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { + def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { new Consumer[F] { @@ -220,12 +224,10 @@ object KafkaHealthCheck { } } - def of[F[_] : Monad : ConsumerOf : FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { + def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { val config1 = { val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" } - config.copy( - groupId = groupId.some, - autoOffsetReset = AutoOffsetReset.Latest) + config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest) } for { @@ -236,16 +238,15 @@ object KafkaHealthCheck { } } - final case class Record(key: Option[String], value: Option[String]) - final case class Config( - topic: Topic = "healthcheck", - initial: FiniteDuration = 10.seconds, - interval: FiniteDuration = 1.second, - timeout: FiniteDuration = 2.minutes, - pollTimeout: FiniteDuration = 10.millis) + topic: Topic = "healthcheck", + initial: FiniteDuration = 10.seconds, + interval: FiniteDuration = 1.second, + timeout: FiniteDuration = 2.minutes, + pollTimeout: FiniteDuration = 10.millis + ) object Config { val default: Config = Config() diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala index 37977b5d..b24b5931 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala @@ -57,7 +57,6 @@ class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers { records: List[Record] = List.empty ) - // TODO: mapK ? def logOf(ref: Ref[IO, State]): Log[IO] = { def add(log: String): IO[Unit] = ref.update(state => state.copy(logs = log :: state.logs))