From 364ace7a7e670815aa0cf8fccb3de081fa3ad1b9 Mon Sep 17 00:00:00 2001
From: Quinten Parker <77176931+quintenp01@users.noreply.github.com>
Date: Tue, 14 Jan 2025 13:56:43 -0800
Subject: [PATCH] Fix error handling and partition config bugs in
 KafkaMessagingProvider.ensureTopic (#5527)

---
 .../connector/kafka/KafkaMessagingProvider.scala         | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index f61f6ec7a6e..dd388952708 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short, consumerLagCheckInterval: Finit
 object KafkaMessagingProvider extends MessagingProvider {
   import KafkaConfiguration._
 
+  private val topicPartitionsConfigKey = "partitions"
+
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging,
     actorSystem: ActorSystem): MessageConsumer =
@@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider {
 
     Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
       .flatMap(client => {
-        val partitions = topicConfig.getOrElse("partitions", "1").toInt
-        val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+        val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt
+        val safeTopicConfig = topicConfig - topicPartitionsConfigKey
+        val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)
 
         def createTopic(retries: Int = 5): Try[Unit] = {
           Try(client.listTopics().names().get())
-            .map(topics =>
+            .flatMap(topics =>
               if (topics.contains(topic)) {
                 Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
               } else {