Skip to content

Commit

Permalink
Merge pull request #911 from fd4s/settings-with-serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer authored Mar 21, 2022
2 parents 36c2882 + f1e5053 commit f503479
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.metrics"),

// package-private
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from"),

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers")
)
// format: on
}
Expand Down
32 changes: 32 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package fs2.kafka

import cats.effect.Sync
import cats.{Applicative, Show}
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down Expand Up @@ -47,6 +48,14 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def valueDeserializer: F[Deserializer[F, V]]

/** Creates a new `ConsumerSettings` instance that replaces the serializers with those provided.
* Note that this will remove any custom `recordMetadata` configuration.
**/
def withDeserializers[K0, V0](
keyDeserializer: F[Deserializer[F, K0]],
valueDeserializer: F[Deserializer[F, V0]]
): ConsumerSettings[F, K0, V0]

/**
* A custom `ExecutionContext` to use for blocking Kafka operations. If not
* provided, a default single-threaded `ExecutionContext` will be created
Expand Down Expand Up @@ -360,6 +369,8 @@ sealed abstract class ConsumerSettings[F[_], K, V] {

/**
* Creates a new [[ConsumerSettings]] with the specified [[recordMetadata]].
* Note that replacing the serializers via `withSerializers` will reset
* this to the default.
*/
def withRecordMetadata(recordMetadata: ConsumerRecord[K, V] => String): ConsumerSettings[F, K, V]

Expand Down Expand Up @@ -527,6 +538,16 @@ object ConsumerSettings {

override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"

override def withDeserializers[K0, V0](
keyDeserializer: F[Deserializer[F, K0]],
valueDeserializer: F[Deserializer[F, V0]]
): ConsumerSettings[F, K0, V0] =
copy(
keyDeserializer = keyDeserializer,
valueDeserializer = valueDeserializer,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA
)
}

private[this] def create[F[_], K, V](
Expand Down Expand Up @@ -587,6 +608,17 @@ object ConsumerSettings {
valueDeserializer = valueDeserializer.forValue
)

/**
* Create a `ConsumerSettings` instance using placeholder deserializers that return unit.
* These can be subsequently replaced using `withDeserializers`, allowing configuration of
* deserializers to be decoupled from other configuration.
*/
def unit[F[_]](implicit F: Sync[F]): ConsumerSettings[F, Unit, Unit] =
create(
keyDeserializer = F.pure(Deserializer.unit),
valueDeserializer = F.pure(Deserializer.unit)
)

implicit def consumerSettingsShow[F[_], K, V]: Show[ConsumerSettings[F, K, V]] =
Show.fromToString
}
31 changes: 28 additions & 3 deletions modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package fs2.kafka

import cats.effect.Sync
import cats.{Applicative, Show}
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.producer.ProducerConfig
Expand Down Expand Up @@ -38,6 +39,14 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def valueSerializer: F[Serializer[F, V]]

/**
* Replace the serializers with those provided in the arguments.
*/
def withSerializers[K1, V1](
keySerializer: F[Serializer[F, K1]],
valueSerializer: F[Serializer[F, V1]]
): ProducerSettings[F, K1, V1]

/**
* A custom [[ExecutionContext]] to use for blocking Kafka operations.
* If not provided, the default blocking ExecutionContext provided by
Expand Down Expand Up @@ -309,6 +318,12 @@ object ProducerSettings {

override def toString: String =
s"ProducerSettings(closeTimeout = $closeTimeout)"

override def withSerializers[K1, V1](
keySerializer: F[Serializer[F, K1]],
valueSerializer: F[Serializer[F, V1]]
): ProducerSettings[F, K1, V1] =
copy(keySerializer = keySerializer, valueSerializer = valueSerializer)
}

private[this] def create[F[_], K, V](
Expand Down Expand Up @@ -357,10 +372,20 @@ object ProducerSettings {
implicit keySerializer: RecordSerializer[F, K],
valueSerializer: RecordSerializer[F, V]
): ProducerSettings[F, K, V] =
create(
keySerializer = keySerializer.forKey,
valueSerializer = valueSerializer.forValue
create(keySerializer = keySerializer.forKey, valueSerializer = valueSerializer.forValue)

/**
* Create a `ProducerSettings` instance using placeholder serializers that serialize nothing.
* These can be subsequently replaced using `withSerializers`, allowing configuration of
* serializers to be decoupled from other configuration.
*/
def nothing[F[_]](implicit F: Sync[F]): ProducerSettings[F, Nothing, Nothing] = {
val nothingSerializer = F.pure(Serializer.fail[F, Nothing](new AssertionError("impossible")))
create[F, Nothing, Nothing](
keySerializer = nothingSerializer,
valueSerializer = nothingSerializer
)
}

implicit def producerSettingsShow[F[_], K, V]: Show[ProducerSettings[F, K, V]] =
Show.fromToString
Expand Down

0 comments on commit f503479

Please sign in to comment.