Skip to content

Commit

Permalink
Add chunk-based consumer API (#1281)
Browse files Browse the repository at this point in the history
* Add first draft of a chunk-based consumer API

* Add missing methods to mirror the API surface from `KafkaConsume`

This also introduces a `CommitNow` object instead of `Unit`

* Provide a single `consumeChunk` method instead of mirroring the full consumer API

* Add test for `consumeChunk`

* Add Scaladoc for `consumeChunk`

* Describe `consumeChunk` in the docs

* Add Stream extension method for `consumeChunk`

* Use `consumeChunk` in the quick example in the docs

* Tweaks to the docs

* Tweak the docs

* Rename private method

* Use `mapAccumulate` instead of `foldLeft`

* Run prePR to fix headers

* Fix compile error

* Fix compilation issues with different Scala versions

* Update docs/src/main/mdoc/quick-example.md

Co-authored-by: Alan Artigao Carreño <[email protected]>

* Update docs/src/main/mdoc/consumers.md

Co-authored-by: Alan Artigao Carreño <[email protected]>

* Update docs/src/main/mdoc/quick-example.md

Co-authored-by: Alan Artigao Carreño <[email protected]>

* Use `onlyOrError` instead of `>> F.never`

---------

Co-authored-by: Alan Artigao Carreño <[email protected]>
  • Loading branch information
L7R7 and aartigao authored Feb 16, 2024
1 parent f36c355 commit b4dd5a4
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 30 deletions.
42 changes: 42 additions & 0 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import scala.concurrent.duration._

import cats.effect._
import cats.syntax.all._
import fs2._
import fs2.kafka._
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
```

## Deserializers
Expand Down Expand Up @@ -272,6 +274,46 @@ object ConsumerMapAsyncExample extends IOApp.Simple {

Offsets commits are managed manually, which is important for ensuring at-least-once delivery. This means that, by [default](#default-settings), automatic offset commits are disabled. If you're sure you don't need at-least-once delivery, you can re-enable automatic offset commits using `withEnableAutoCommit` on [`ConsumerSettings`][consumersettings], and then ignore the [`CommittableOffset`][committableoffset] part of [`CommittableConsumerRecord`][committableconsumerrecord], keeping only the [`ConsumerRecord`][consumerrecord].

### Working on `Chunk`

Use cases that require at-least-once delivery make it necessary to commit the offset of messages only after the message has been successfully processed. Implementing this correctly can be challenging, especially when the business logic requires advanced data manipulation with concurrency, batching, filtering and the like:

- When consuming multiple messages from the same partition concurrently, a consumer might lose messages if the commits happen out of order and a message that is not the last one on its partition can't be processed and has to be retried.
- When filtering messages, it's important to still commit the offset of the filtered message because if this message is the latest one on its partition, it will get re-sent infinitely.
- For performance reasons, it makes sense to batch the offsets when committing them.

The recommended pattern for these use cases is by working on the `Chunk`s of records that are part of the `Stream`. The library supports that with the `consumeChunk` method:

```scala mdoc:silent
object ConsumerChunkExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecords(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] =
records.traverse(record => IO.println(s"Processing record: $record")).as(CommitNow)

KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.consumeChunk(processRecords)
}
}
```

Note that this method uses `partitionedStream`, which means that all the partitions assigned to the consumer will be processed concurrently.

As a user, you don't have to care about the offset commits, all you have to do is implement a function that processes all records in the `Chunk`, and return a `IO[CommitNow]`. After this action finished, the offsets for all messages in the `Chunk` will be committed. `CommitNow` is basically the same as `Unit`, but helps in making it clear when the processing of messages has been finished and it's time to commit.

This brings several benefits:

- **Correctness:** You can focus on implementing your business logic, without having to worry about offset commits or propagating the correct offsets through your code. Offsets are committed correctly afterwards.
- **Performance:** Typical performance improvements are bulk-writes to a database, or using concurrency to speed things up. These patterns can be used liberally when working on the records in a `Chunk`, without having to sacrifice correctness.
- **Flexibility:** Besides using batching and concurrency, you might want to filter out messages, or process them in a different order than they appear on the partitions. As long as you work on a single `Chunk` and make sure that the processing is finished when you return `CommitNow`, you can do all that.
- A concrete example that makes use of these ideas is to group all the messages in the `Chunk` by key and then only process the last message for each key (basically doing what Kafka's log compaction does). In many occasions, it's also possible to process the messages for different keys concurrently, which drastically increases the available concurrency.

If the chunk size doesn't fit your needs, the first way to start tuning is the `max.poll.records` config property of your consumer.

### Committing manually

If `consumeChunk` doesn't work for you, you can always commit your offsets manually.

Offset commits are usually done in batches for performance reasons. We normally don't need to commit every offset, but only the last processed offset. There is a trade-off in how much reprocessing we have to do when we restart versus the performance implication of committing more frequently. Depending on our situation, we'll then choose an appropriate frequency for offset commits.

We should keep the [`CommittableOffset`][committableoffset] in our `Stream` once we've finished processing the record. For at-least-once delivery, it's essential that offset commits preserve topic-partition ordering, so we have to make sure we keep offsets in the same order as we receive them. There is one convenience function for the most common batch committing scenario, `commitBatchWithin`.
Expand Down
45 changes: 15 additions & 30 deletions docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,37 @@ Following is an example showing how to:
- use `commitBatchWithin` to commit consumed offsets in batches.

```scala mdoc
import scala.concurrent.duration._

import cats.effect.{IO, IOApp}
import fs2._
import fs2.kafka._
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow

object Main extends IOApp.Simple {

val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] =
IO.pure(record.key -> record.value)

val consumerSettings =
ConsumerSettings[IO, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")

val producerSettings =
ProducerSettings[IO, String, String].withBootstrapServers("localhost:9092")
ProducerSettings[IO, String, String]
.withBootstrapServers("localhost:9092")

def processRecords(producer: KafkaProducer[IO, String, String])(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] = {
val producerRecords = records.map(consumerRecord => ProducerRecord("topic", consumerRecord.key, consumerRecord.value))
producer.produce(producerRecords).flatten.as(CommitNow)
}

val stream =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic")
.records
.mapAsync(25) { committable =>
processRecord(committable.record).map { case (key, value) =>
val record = ProducerRecord("topic", key, value)
committable.offset -> ProducerRecords.one(record)
}
}
.through { offsetsAndProducerRecords =>
KafkaProducer
.stream(producerSettings)
.flatMap { producer =>
offsetsAndProducerRecords
.evalMap { case (offset, producerRecord) =>
producer.produce(producerRecord).map(_.as(offset))
}
.parEvalMap(Int.MaxValue)(identity)
}
}
.through(commitBatchWithin(500, 15.seconds))
KafkaProducer.stream(producerSettings).evalMap { producer =>
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic")
.consumeChunk(chunk => processRecords(producer)(chunk))
}

stream.compile.drain
}

}
```
10 changes: 10 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cats.effect.std.*
import cats.syntax.all.*
import fs2.{Chunk, Stream}
import fs2.kafka.consumer.*
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
import fs2.kafka.instances.*
import fs2.kafka.internal.*
import fs2.kafka.internal.converters.collection.*
Expand Down Expand Up @@ -63,6 +64,7 @@ import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartitio
*/
sealed abstract class KafkaConsumer[F[_], K, V]
extends KafkaConsume[F, K, V]
with KafkaConsumeChunk[F, K, V]
with KafkaAssignment[F]
with KafkaOffsetsV2[F]
with KafkaSubscription[F]
Expand Down Expand Up @@ -817,6 +819,14 @@ object KafkaConsumer {
def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
self.flatMap(_.partitionedRecords)

/**
* Consume from all assigned partitions concurrently, processing the messages in `Chunk`s. See
* [[KafkaConsumeChunk#consumeChunk]]
*/
def consumeChunk(processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow])(implicit
F: Concurrent[F]
): F[Nothing] = self.evalMap(_.consumeChunk(processor)).compile.onlyOrError

}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2018-2024 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.consumer

import cats.effect.Concurrent
import cats.syntax.flatMap.*
import cats.Monad
import fs2.*
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
import fs2.kafka.CommittableConsumerRecord
import fs2.kafka.CommittableOffsetBatch
import fs2.kafka.ConsumerRecord

trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V] {

/**
* Consume from all assigned partitions concurrently, processing the records in `Chunk`s. For
* each `Chunk`, the provided `processor` is called, after that has finished the offsets for all
* messages in the chunk are committed.<br><br>
*
* This method is intended to be used in cases that require at-least-once-delivery, where
* messages have to be processed before offsets are committed. By relying on the methods like
* [[partitionedStream]], [[records]], and similar, you have to correctly implement not only your
* processing logic but also the correct mechanism for committing offsets. This can be tricky to
* do in a correct and efficient way.<br><br>
*
* Working with `Chunk`s of records has several benefits:<br>
* - As a user, you don't have to care about committing offsets correctly. You can focus on
* implementing your business logic<br>
* - It's very straightforward to batch several messages from a `Chunk` together, e.g. for
* efficient writes to a persistent storage<br>
* - You can liberally use logic that involves concurrency, filtering, and re-ordering of
* messages without having to worry about incorrect offset commits<br>
*
* <br>
*
* The `processor` is a function that takes a `Chunk[ConsumerRecord[K, V]]` and returns a
* `F[CommitNow]`. [[CommitNow]] is isomorphic to `Unit`, but helps in transporting the intention
* that processing of a `Chunk` is done, offsets should be committed, and no important processing
* should be done afterwards.<br><br>
*
* The returned value has the type `F[Nothing]`, because it's a never-ending process that doesn't
* terminate, and therefore doesn't return a result.
*
* @note
* This method does not make any use of Kafka's auto-commit feature, it implements "manual"
* commits in a way that suits most of the common use cases.
* @note
* you have to first use `subscribe` or `assign` the consumer before using this `Stream`. If
* you forgot to subscribe, there will be a [[NotSubscribedException]] raised in the `Stream`.
* @see
* [[partitionedStream]]
* @see
* [[CommitNow]]
*/
final def consumeChunk(
processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow]
)(implicit F: Concurrent[F]): F[Nothing] = partitionedStream
.map(
_.chunks.evalMap(consume(processor))
)
.parJoinUnbounded
.drain
.compile
.onlyOrError

private def consume(processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow])(
chunk: Chunk[CommittableConsumerRecord[F, K, V]]
)(implicit F: Monad[F]): F[Unit] = {
val (offsets, records) = chunk
.mapAccumulate(CommittableOffsetBatch.empty)((offsetBatch, committableRecord) =>
(offsetBatch.updated(committableRecord.offset), committableRecord.record)
)

processor(records) >> offsets.commit
}

}

object KafkaConsumeChunk {

type CommitNow = CommitNow.type

/**
* Token to indicate that a `Chunk` has been processed and the corresponding offsets are ready to
* be committed.<br>
*
* Isomorphic to `Unit`, but more intention revealing.
*/
object CommitNow

}
38 changes: 38 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.effect.unsafe.implicits.global
import cats.effect.Ref
import cats.syntax.all.*
import fs2.concurrent.SignallingRef
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
import fs2.kafka.internal.converters.collection.*
import fs2.Stream

Expand Down Expand Up @@ -1174,6 +1175,43 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

describe("KafkaConsumer#consumeChunk") {
it("should process the messages and commit the offsets") {
withTopic { topic =>
val produced = (0 until 5).map(n => s"key-$n" -> s"value-$n")
publishToKafka(topic, produced)

val consumed = for {
ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty)
_ <- KafkaConsumer
.stream(consumerSettings[IO])
.evalTap(_.assign(topic))
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.evalMap(
_.consumeChunk(chunk =>
chunk
.traverse(record => ref.getAndUpdate(_ :+ (record.key -> record.value)))
.as(CommitNow)
)
).interruptAfter(10.seconds).compile.drain
res <- ref.get
} yield res

val res = consumed.unsafeRunSync()

(res should contain).theSameElementsInOrderAs(produced)

val topicPartition = new TopicPartition(topic, 0)

val actuallyCommitted = withKafkaConsumer(defaultConsumerProperties) { consumer =>
consumer.committed(Set(topicPartition).asJava).asScala.toMap
}.map { case (k, v) => k -> v.offset() }.toMap

actuallyCommitted shouldBe Map(topicPartition -> 5L)
}
}
}

private def commitTest(
commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit]
): Assertion =
Expand Down

0 comments on commit b4dd5a4

Please sign in to comment.