Skip to content

Commit

Permalink
QU-1366 fix spurious Kafka validation errors
Browse files Browse the repository at this point in the history
* Only check for conflicting keys when conflicts are actually a problem (i.e., before config is finalized)

* Also blacklists KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG because they don't work right

GitOrigin-RevId: d792016d34b8670d71ad077a45d97c990af58baf
  • Loading branch information
emanb29 authored and thatbot-copy[bot] committed Jun 16, 2023
1 parent a3c2645 commit b507137
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import cats.data.NonEmptyList
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.{ConfigDef, ConfigValue}
import org.apache.kafka.common.config.ConfigDef

import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.{ErrorString, underlyingValidator}
import com.thatdot.quine.routes.KafkaIngest.KafkaProperties
Expand Down Expand Up @@ -60,6 +60,12 @@ case class KafkaSettingsValidator(
}
val validator: ConfigDef = underlyingValidator.get

val unrecognizedPropertiesError: List[String] = properties.keySet.diff(underlyingKnownKeys) match {
case s if s.isEmpty => Nil
case s @ _ =>
List(s"Unrecognized properties: ${s.mkString(",")}")
}

/*
these values have no direct analogues in Kafka settings:
Expand All @@ -68,48 +74,48 @@ case class KafkaSettingsValidator(
- ingest.format
*/
val additionalErrorsFromUnderlying: Seq[String] =
if (assumeConfigIsFinal)
val errors: Seq[String] =
if (assumeConfigIsFinal) {
// config is already merged, so we can rely on the kafka-provided validator for any errors
for {
validatedConfigEntry: ConfigValue <- validator.validate(properties.asJava).asScala
validatedConfigEntry <- validator.validate(properties.asJava).asScala
configName = validatedConfigEntry.name()
// TODO why does a finalized config not have key.deserializer set?
// Does akka tack it on in settings.consumerFactory?
if configName != ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
if configName != ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
err: ErrorString <- validatedConfigEntry.errorMessages.asScala
} yield s"Error in kafka setting $configName: $err"
else Seq.empty

val errors = List(
findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId),
findConflict(
Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
Some(explicitOffsetCommitting)
),
//boostrap servers is mandatory on ingest. If it is set in properties that's a conflict
disallowField(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Please use the Kafka ingest `bootstrapServers` field."
),
disallowField(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"Please use the Kafka ingest `securityProtocol` field."
),
disallowField(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "Please use the Kafka ingest `autoOffsetReset` field."),
properties.keySet.diff(underlyingKnownKeys) match {
case s if s.isEmpty => None
case s @ _ =>
logger.warn(s"Ingest fails validation with errors ${s.mkString(",")}");
Some(s"Unrecognized properties: ${s.mkString(",")}")
} yield s"Error in Kafka setting $configName: $err"
} else {
// config is not yet merged (multiple sources of truth), so we can look for conflicts between the parts of config
List(
findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId),
findConflict(
Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
Some(explicitOffsetCommitting)
),
//boostrap servers is mandatory on ingest. If it is set in properties that's a conflict
disallowField(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Please use the Kafka ingest `bootstrapServers` field."
),
disallowField(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"Please use the Kafka ingest `securityProtocol` field."
),
disallowField(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "Please use the Kafka ingest `autoOffsetReset` field.")
).flatten
}
) collect { case Some(c) => c }

NonEmptyList.fromList(errors ++ additionalErrorsFromUnderlying)
NonEmptyList.fromList(unrecognizedPropertiesError ++ errors)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class KafkaSettingsValidatorTest extends AnyFunSuite {
test("empty settings map accepted") {
assert(KafkaSettingsValidator(Map()).validate().isEmpty)
}
test("final empty settings map rejected") {
assert(KafkaSettingsValidator(Map()).validate(assumeConfigIsFinal = true).nonEmpty)
test("final empty settings map accepted") {
assert(KafkaSettingsValidator(Map()).validate(assumeConfigIsFinal = true).isEmpty)
}

test("Unrecognized setting disallowed") {
Expand Down Expand Up @@ -52,14 +52,6 @@ class KafkaSettingsValidatorTest extends AnyFunSuite {
).validate(false).get.size == 1
)

// finalized config
assert(
KafkaSettingsValidator(
Map("enable.auto.commit" -> "true"),
explicitOffsetCommitting = Some(ExplicitCommit(1000, 1000, 1100))
).validate(true).get.size == 3 // the original error plus two for missing serializer and deserializer
)

}
test("Unsupported settings disallowed") {
//value.deserializer
Expand Down

0 comments on commit b507137

Please sign in to comment.