diff --git a/docs/src/main/mdoc/consumers.md b/docs/src/main/mdoc/consumers.md index c050673ee..5ff0ce042 100644 --- a/docs/src/main/mdoc/consumers.md +++ b/docs/src/main/mdoc/consumers.md @@ -122,6 +122,23 @@ ConsumerSettings( .withGroupId("group") ``` +### Sasl setting + +When interacting with a Kafka host requiring authentication via SASL (Confluent Cloud, for example), you can use the `withPlainSasl` sugar function: + +```scala mdoc:silent +ConsumerSettings( + keyDeserializer = Deserializer[IO, String], + valueDeserializer = Deserializer[IO, String] +).withAutoOffsetReset(AutoOffsetReset.Earliest) + .withBootstrapServers("pkc-instanceName.us-west1.gcp.confluent.cloud:9092") + .withGroupId("group") + .withPlainSasl( + , + +) +``` + [`ConsumerSettings`][consumersettings] provides functions for configuring both the Java Kafka consumer and options specific to the library. If functions for configuring certain properties of the Java Kafka consumer is missing, we can instead use `withProperty` or `withProperties` together with constants from [`ConsumerConfig`][consumerconfig]. Available properties for the Java Kafka consumer are described in the [documentation](http://kafka.apache.org/documentation/#consumerconfigs). ### Default Settings diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 73fb7e18e..fc4d16df2 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -9,7 +9,9 @@ package fs2.kafka import cats.Show import cats.effect.Resource import fs2.kafka.security.KafkaCredentialStore +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.requests.OffsetFetchResponse import scala.concurrent.ExecutionContext @@ -401,6 +403,12 @@ sealed abstract class ConsumerSettings[F[_], K, V] { * Includes the credentials properties from the provided [[KafkaCredentialStore]] */ def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V] + + /** + * provides SASL credentials to Kafka host. A typical use case for this would be + * interacting with Confluent Cloud. + */ + def withPlainSasl(usernameToken: String, passwordToken: String): ConsumerSettings[F, K, V] } object ConsumerSettings { @@ -417,6 +425,15 @@ object ConsumerSettings { override val recordMetadata: ConsumerRecord[K, V] => String, override val maxPrefetchBatches: Int ) extends ConsumerSettings[F, K, V] { + + def withPlainSasl(usernameToken: String, passwordToken: String): ConsumerSettings[F, K, V] = + withProperties( + SaslConfigs.SASL_MECHANISM -> "PLAIN", + SaslConfigs.SASL_JAAS_CONFIG -> + s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${usernameToken}" password="${passwordToken}";""", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SASL_SSL" + ) + override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] = copy(customBlockingContext = Some(ec)) diff --git a/modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala index 633e50eaf..eee193257 100644 --- a/modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala @@ -11,12 +11,34 @@ import cats.effect.kernel.Resource import cats.effect.unsafe.implicits.global import cats.syntax.all._ import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs import scala.concurrent.ExecutionContext import scala.concurrent.duration._ final class ConsumerSettingsSpec extends BaseSpec { describe("ConsumerSettings") { + + it("should provide withPlainSasl") { + + val usernameToken = "unToken" + val passwordToken = "pwToken" + val saslSettings = settings.withPlainSasl(usernameToken, passwordToken) + + assert { + saslSettings + .properties(SaslConfigs.SASL_MECHANISM) + .contains("PLAIN") && + saslSettings + .properties(SaslConfigs.SASL_JAAS_CONFIG) + .contains(s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${usernameToken}" password="${passwordToken}";""") && + saslSettings + .properties(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + .contains("SASL_SSL") + } + } + it("should be able to set a custom blocking context") { assert { settings.customBlockingContext.isEmpty &&