Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More specific errors than Throwable for Consumer #1361

Draft
wants to merge 1 commit into
base: series/3.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import zio.kafka.consumer.Consumer.ConsumerError
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.serde.Serde
Expand Down Expand Up @@ -49,7 +50,7 @@ object Main extends ZIOAppDefault {
.withGroupId("test")
}

private val runConsumerStream: ZIO[Consumer, Throwable, Unit] =
private val runConsumerStream: ZIO[Consumer, ConsumerError, Unit] =
for {
_ <- ZIO.logInfo("Consuming messages...")
consumed <- Consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import zio._
import zio.kafka.admin._
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, ConsumerError, OffsetRetrieval }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.producer._
Expand Down Expand Up @@ -192,7 +192,7 @@ object KafkaTestUtils {
* Utility function to make a Consumer. It requires a ConsumerSettings layer.
*/
@deprecated("Use [[KafkaTestUtils.minimalConsumer]] instead", "2.3.1")
def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] =
def simpleConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] =
ZLayer.makeSome[ConsumerSettings, Consumer](
ZLayer.succeed(diagnostics) >>> Consumer.live
)
Expand All @@ -203,7 +203,7 @@ object KafkaTestUtils {
* "minimal" because, unlike the other functions returning a `ZLayer[..., ..., Consumer]` of this file, you need to
* provide the `ConsumerSettings` layer yourself.
*/
def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, Throwable, Consumer] =
def minimalConsumer(diagnostics: Diagnostics = Diagnostics.NoOp): ZLayer[ConsumerSettings, ConsumerError, Consumer] =
ZLayer.makeSome[ConsumerSettings, Consumer](
ZLayer.succeed(diagnostics) >>> Consumer.live
)
Expand All @@ -223,7 +223,7 @@ object KafkaTestUtils {
maxRebalanceDuration: Duration = 3.minutes,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
): ZLayer[Kafka, ConsumerError, Consumer] =
(ZLayer(
consumerSettings(
clientId = clientId,
Expand Down Expand Up @@ -253,7 +253,7 @@ object KafkaTestUtils {
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty,
rebalanceListener: RebalanceListener = RebalanceListener.noop
): ZLayer[Kafka, Throwable, Consumer] =
): ZLayer[Kafka, ConsumerError, Consumer] =
(ZLayer(
transactionalConsumerSettings(
groupId = groupId,
Expand All @@ -274,7 +274,7 @@ object KafkaTestUtils {
*/
def consumeWithStrings(clientId: String, groupId: Option[String] = None, subscription: Subscription)(
r: ConsumerRecord[String, String] => URIO[Any, Unit]
): RIO[Kafka, Unit] =
): ZIO[Kafka, ConsumerError, Unit] =
consumerSettings(clientId, groupId, None).flatMap { settings =>
Consumer.consumeWith[Any, Any, String, String](
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.Consumer.{ CommitError, DeserializationError }
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }
import zio.{ IO, ZIO }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
keyDeserializer: Deserializer[R, K1],
valueDeserializer: Deserializer[R, V1]
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] =
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): ZIO[R, DeserializationError, CommittableRecord[K1, V1]] =
for {
key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
Expand Down Expand Up @@ -52,7 +53,7 @@ final case class CommittableRecord[K, V](
object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => IO[CommitError, Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
Expand Down
100 changes: 75 additions & 25 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import org.apache.kafka.clients.consumer.{
OffsetAndTimestamp
}
import org.apache.kafka.common._
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
import zio.kafka.consumer.Consumer.{ CommitError, ConsumerError, PartitionStreamError }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics
Expand All @@ -17,7 +19,6 @@ import zio.kafka.utils.SslHelper
import zio.stream._

import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace

trait Consumer {

Expand Down Expand Up @@ -68,7 +69,7 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]
): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]]

/**
* Create a stream with messages on the subscribed topic-partitions by topic-partition
Expand All @@ -91,7 +92,7 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
): Stream[ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]

/**
* Create a stream with all messages on the subscribed topic-partitions
Expand All @@ -116,7 +117,7 @@ trait Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZStream[R, Throwable, CommittableRecord[K, V]]
): ZStream[R, ConsumerError, CommittableRecord[K, V]]

/**
* Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit
Expand All @@ -131,10 +132,10 @@ trait Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit]
): ZIO[R & R1, ConsumerError, Unit]

/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest
Expand All @@ -161,12 +162,56 @@ trait Consumer {
}

object Consumer {
case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace
sealed trait ConsumerError {
def message: String
}

final case class AuthorizationError(cause: AuthorizationException) extends ConsumerError {
override def message: String = cause.getMessage
}

final case class AuthenticationError(cause: AuthenticationException) extends ConsumerError {
override def message: String = cause.getMessage
}

final case class SslValidationError(message: String) extends ConsumerError
final case class UnknownConsumerException(cause: Throwable) extends ConsumerError {
override def message: String = cause.getMessage
}
sealed trait SubscriptionError extends ConsumerError
final case class InvalidSubscriptionUnion(subscriptions: Chunk[Subscription]) extends SubscriptionError {
override def message: String = s"Unable to calculate union of subscriptions: ${subscriptions.mkString(",")}"
}
final case class InvalidTopic(topic: String, validationError: String) extends SubscriptionError {
override def message: String = s"Invalid topic '${topic}': ${validationError}"
}
final case class GetManualOffsetsError(cause: Throwable) extends ConsumerError {
override def message: String = s"Unable to retrieve manual offsets: ${cause.getMessage}"
}
sealed trait PartitionStreamError extends ConsumerError
final case class DeserializationError(message: String, cause: Option[Throwable] = None) extends PartitionStreamError
final case class PartitionStreamPullTimeout(topicPartition: TopicPartition, maxPollInterval: Duration)
extends PartitionStreamError {
override def message: String =
s"No records were polled for more than $maxPollInterval for topic partition $topicPartition. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
}

sealed trait CommitError extends ConsumerError
object CommitError {
final case object CommitTimeout extends CommitError {
override def message: String = "Commit timed out"
}
final case class UnknownCommitException(cause: Throwable) extends CommitError {
override def message: String = cause.getMessage
}
}

val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _)

def live: RLayer[ConsumerSettings & Diagnostics, Consumer] =
def live: ZLayer[ConsumerSettings & Diagnostics, ConsumerError, Consumer] =
ZLayer.scoped {
for {
settings <- ZIO.service[ConsumerSettings]
Expand All @@ -185,13 +230,15 @@ object Consumer {
def make(
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
): ZIO[Scope, SslValidationError, Consumer] =
for {
wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics)
_ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
_ <- SslHelper
.validateEndpoint(settings.driverSettings)
.mapError(e => SslValidationError(e.getMessage))
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand All @@ -203,7 +250,7 @@ object Consumer {
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
): ZIO[Scope, ConsumerError, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
Expand Down Expand Up @@ -258,7 +305,9 @@ object Consumer {
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] =
): ZStream[Consumer, ConsumerError, Chunk[
(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])
]] =
ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer))

/**
Expand All @@ -270,8 +319,8 @@ object Consumer {
valueDeserializer: Deserializer[R, V]
): ZStream[
Consumer,
Throwable,
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
ConsumerError,
(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])
] =
ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer))

Expand All @@ -283,7 +332,7 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] =
): ZStream[R & Consumer, ConsumerError, CommittableRecord[K, V]] =
ZStream.serviceWithStream[Consumer](
_.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize)
)
Expand Down Expand Up @@ -354,7 +403,7 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] =
)(f: ConsumerRecord[K, V] => URIO[R1, Unit]): ZIO[R & R1, ConsumerError, Unit] =
ZIO.scoped[R & R1] {
Consumer
.make(settings)
Expand Down Expand Up @@ -435,6 +484,7 @@ private[consumer] final class ConsumerLive private[consumer] (
override def assignment: Task[Set[TopicPartition]] =
consumer.withConsumer(_.assignment().asScala.toSet)

// TODO should we change all of these too..?
override def beginningOffsets(
partitions: Set[TopicPartition],
timeout: Duration = Duration.Infinity
Expand Down Expand Up @@ -469,7 +519,7 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = {
): Stream[ConsumerError, Chunk[(TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])]] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)

ZStream.unwrapScoped {
Expand All @@ -481,9 +531,9 @@ private[consumer] final class ConsumerLive private[consumer] (
.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
val stream: ZStream[R, PartitionStreamError, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
partitionStream.asInstanceOf[ZStream[R, PartitionStreamError, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
Expand All @@ -496,15 +546,15 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] =
): ZStream[Any, ConsumerError, (TopicPartition, ZStream[R, PartitionStreamError, CommittableRecord[K, V]])] =
partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks

override def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int
): ZStream[R, Throwable, CommittableRecord[K, V]] =
): ZStream[R, ConsumerError, CommittableRecord[K, V]] =
partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar(
n = Int.MaxValue,
bufferSize = bufferSize
Expand All @@ -518,10 +568,10 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
commitRetryPolicy: Schedule[Any, CommitError, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit] =
): ZIO[R & R1, ConsumerError, Unit] =
for {
r <- ZIO.environment[R & R1]
_ <- partitionedStream(subscription, keyDeserializer, valueDeserializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.Consumer.{ ConsumerError, OffsetRetrieval }
import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy }
import zio.kafka.security.KafkaCredentialStore
import zio.metrics.MetricLabel
Expand Down Expand Up @@ -31,7 +31,7 @@ final case class ConsumerSettings(
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
authErrorRetrySchedule: Schedule[Any, ConsumerError, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
) {
// Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType:
require(
Expand Down Expand Up @@ -316,7 +316,7 @@ final case class ConsumerSettings(
*
* The default is {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} which is, to retry 5 times, spaced by 500ms.
*/
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings =
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, ConsumerError, Any]): ConsumerSettings =
copy(authErrorRetrySchedule = authErrorRetrySchedule)

}
Expand Down

This file was deleted.

Loading
Loading