Skip to content

Commit

Permalink
Fix error handling and partition config bugs in KafkaMessagingProvide…
Browse files Browse the repository at this point in the history
…r.ensureTopic (#5527)
  • Loading branch information
quintenp01 authored Jan 14, 2025
1 parent 8f8a4aa commit 364ace7
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short, consumerLagCheckInterval: Finit
object KafkaMessagingProvider extends MessagingProvider {
import KafkaConfiguration._

private val topicPartitionsConfigKey = "partitions"

def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageConsumer =
Expand All @@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider {

Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
.flatMap(client => {
val partitions = topicConfig.getOrElse("partitions", "1").toInt
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt
val safeTopicConfig = topicConfig - topicPartitionsConfigKey
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)

def createTopic(retries: Int = 5): Try[Unit] = {
Try(client.listTopics().names().get())
.map(topics =>
.flatMap(topics =>
if (topics.contains(topic)) {
Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
} else {
Expand Down

0 comments on commit 364ace7

Please sign in to comment.