Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SASL config properties sugar function to ConsumerSettings #1226

Open
wants to merge 4 commits into
base: series/3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ ConsumerSettings(
.withGroupId("group")
```

### Sasl setting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### Sasl setting
### SASL settings


When interacting with a Kafka host requiring authentication via SASL (Confluent Cloud, for example), you can use the `withPlainSasl` sugar function:

```scala mdoc:silent
ConsumerSettings(
keyDeserializer = Deserializer[IO, String],
valueDeserializer = Deserializer[IO, String]
).withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("pkc-instanceName.us-west1.gcp.confluent.cloud:9092")
.withGroupId("group")
.withPlainSasl(
<idToken>,
<passwordToken>
Comment on lines +137 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I don't think mentioning token is needed.

Also, here you are using id instead of username. I'd suggest to use the same convention everywhere.

)
```

[`ConsumerSettings`][consumersettings] provides functions for configuring both the Java Kafka consumer and options specific to the library. If functions for configuring certain properties of the Java Kafka consumer is missing, we can instead use `withProperty` or `withProperties` together with constants from [`ConsumerConfig`][consumerconfig]. Available properties for the Java Kafka consumer are described in the [documentation](http://kafka.apache.org/documentation/#consumerconfigs).

### Default Settings
Expand Down
17 changes: 17 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ package fs2.kafka
import cats.Show
import cats.effect.Resource
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -401,6 +403,12 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]

/**
* provides SASL credentials to Kafka host. A typical use case for this would be
* interacting with Confluent Cloud.
Comment on lines +408 to +409
Copy link
Contributor

@aartigao aartigao Sep 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the second sentence is not needed. A user should be aware of what SASL is at this point (either by reading the docs or because he/she knows what's doing).

Suggested change
* provides SASL credentials to Kafka host. A typical use case for this would be
* interacting with Confluent Cloud.
* Provides SASL credentials to Kafka host.

*/
def withPlainSasl(usernameToken: String, passwordToken: String): ConsumerSettings[F, K, V]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a new KafkaCredentialStore rather than being directly on the consumer settings? That would also add support for the producer and admin client.

}

object ConsumerSettings {
Expand All @@ -417,6 +425,15 @@ object ConsumerSettings {
override val recordMetadata: ConsumerRecord[K, V] => String,
override val maxPrefetchBatches: Int
) extends ConsumerSettings[F, K, V] {

def withPlainSasl(usernameToken: String, passwordToken: String): ConsumerSettings[F, K, V] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never worked with SASL but from what I can see here in PlainLoginModule there's no reference to any token.

Why not removing that prefix? 👇🏽

Suggested change
def withPlainSasl(usernameToken: String, passwordToken: String): ConsumerSettings[F, K, V] =
def withPlainSasl(username: String, password: String): ConsumerSettings[F, K, V] =

withProperties(
SaslConfigs.SASL_MECHANISM -> "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG ->
s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${usernameToken}" password="${passwordToken}";""",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SASL_SSL"
)

override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))

Expand Down
22 changes: 22 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,34 @@ import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global
import cats.syntax.all._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

final class ConsumerSettingsSpec extends BaseSpec {
describe("ConsumerSettings") {

it("should provide withPlainSasl") {

val usernameToken = "unToken"
val passwordToken = "pwToken"
val saslSettings = settings.withPlainSasl(usernameToken, passwordToken)

assert {
saslSettings
.properties(SaslConfigs.SASL_MECHANISM)
.contains("PLAIN") &&
saslSettings
.properties(SaslConfigs.SASL_JAAS_CONFIG)
.contains(s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${usernameToken}" password="${passwordToken}";""") &&
saslSettings
.properties(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
.contains("SASL_SSL")
}
}

it("should be able to set a custom blocking context") {
assert {
settings.customBlockingContext.isEmpty &&
Expand Down