diff --git a/README.md b/README.md
index 28801750e..89fecce6b 100644
--- a/README.md
+++ b/README.md
@@ -25,7 +25,7 @@ KubeBlocks add-ons.
| etcd | etcd-3.5.15
etcd-3.5.6 | Etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines. | free6om |
| greptimedb | greptimedb-0.3.2 | An open-source, cloud-native, distributed time-series database with PromQL/SQL/Python supported. | GreptimeTeam sh2 |
| influxdb | influxdb-2.7.4 | InfluxDB(TM) is an open source time-series database. It is a core component of the TICK (Telegraf, InfluxDB(TM), Chronograf, Kapacitor) stack. | |
-| kafka | kafka-broker-3.3.2
kafka-combine-3.3.2
kafka-controller-3.3.2
kafka-exporter-1.6.0 | Apache Kafka is a distributed streaming platform designed to build real-time pipelines and can be used as a message broker or as a replacement for a log aggregation solution for big data applications. | caiq1nyu |
+| kafka | kafka-broker-2.7.0
kafka-broker-3.3.2
kafka-combine-3.3.2
kafka-controller-3.3.2
kafka-exporter-1.6.0 | Apache Kafka is a distributed streaming platform designed to build real-time pipelines and can be used as a message broker or as a replacement for a log aggregation solution for big data applications. | caiq1nyu |
| loki | loki-1.0.0 | Loki is a horizontally-scalable, highly-available, multi-tenant log aggregation system inspired by Prometheus. It is designed to be very cost effective and easy to operate. | Chen-speculation |
| mariadb | mariadb-10.6.15 | MariaDB is a high performance open source relational database management system that is widely used for web and application servers | yinmin |
| milvus | milvus-v2.3.2 | A cloud-native vector database, storage for next generation AI applications. | leon-inf |
diff --git a/addons/kafka/configs/2.7/kafka-27-server-constraint.cue b/addons/kafka/configs/2.7/kafka-27-server-constraint.cue
new file mode 100644
index 000000000..ad77737c6
--- /dev/null
+++ b/addons/kafka/configs/2.7/kafka-27-server-constraint.cue
@@ -0,0 +1,222 @@
+#KafkaParameter: {
+ "advertised.host.name"?: string | *""
+ "advertised.listeners"?: string | *""
+ "advertised.port"?: int | *9092
+ "alter.config.policy.class.name"?: string | *""
+ "alter.log.dirs.replication.quota.window.num"?: int | *11
+ "alter.log.dirs.replication.quota.window.size.seconds"?: int | *1
+ "authorizer.class.name"?: string | *""
+ "auto.create.topics.enable"?: bool | *true
+ "auto.leader.rebalance.enable"?: bool | *true
+ "background.threads"?: int | *10
+ "broker.id.generation.enable"?: bool | *true
+ "broker.id"?: int | *-1
+ "broker.rack"?: string | *""
+ "client.quota.callback.class"?: string | *""
+ "compression.type"?: string | *"producer"
+ "connection.failed.authentication.delay.ms"?: int | *100
+ "connections.max.idle.ms"?: int | *600000
+ "connections.max.reauth.ms"?: int | *0
+ "control.plane.listener.name"?: string | *""
+ "controlled.shutdown.enable"?: bool | *true
+ "controlled.shutdown.max.retries"?: int | *3
+ "controlled.shutdown.retry.backoff.ms"?: int | *5000
+ "controller.quota.window.num"?: int | *11
+ "controller.quota.window.size.seconds"?: int | *1
+ "controller.socket.timeout.ms"?: int | *30000
+ "create.topic.policy.class.name"?: string | *""
+ "default.replication.factor"?: int | *1
+ "delegation.token.expiry.check.interval.ms"?: int | *3600000
+ "delegation.token.expiry.time.ms"?: int | *86400000
+ "delegation.token.master.key"?: string | *""
+ "delegation.token.max.lifetime.ms"?: int | *604800000
+ "delete.records.purgatory.purge.interval.requests"?: int | *1
+ "delete.topic.enable"?: bool | *true
+ "fetch.max.bytes"?: int | *57671680
+ "fetch.purgatory.purge.interval.requests"?: int | *1000
+ "group.initial.rebalance.delay.ms"?: int | *3000
+ "group.max.session.timeout.ms"?: int | *1800000
+ "group.max.size"?: int | *2147483647
+ "group.min.session.timeout.ms"?: int | *6000
+ "host.name"?: string | *""
+ "inter.broker.listener.name"?: string | *""
+ "inter.broker.protocol.version"?: string | *"2.7-IV2"
+ "kafka.metrics.polling.interval.secs"?: int | *10
+ "kafka.metrics.reporters"?: string | *""
+ "leader.imbalance.check.interval.seconds"?: int | *300
+ "leader.imbalance.per.broker.percentage"?: int | *10
+ "listener.security.protocol.map"?: string | *"PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL"
+ "listeners"?: string | *""
+ "log.cleaner.backoff.ms"?: int | *15000
+ "log.cleaner.dedupe.buffer.size"?: int | *134217728
+ "log.cleaner.delete.retention.ms"?: int | *86400000
+ "log.cleaner.enable"?: bool | *true
+ "log.cleaner.io.buffer.load.factor"?: float | *0.9
+ "log.cleaner.io.buffer.size"?: int | *524288
+ "log.cleaner.io.max.bytes.per.second"?: float | *1.7976931348623157e308
+ "log.cleaner.max.compaction.lag.ms"?: int | *9223372036854775807
+ "log.cleaner.min.cleanable.ratio"?: float | *0.5
+ "log.cleaner.min.compaction.lag.ms"?: int | *0
+ "log.cleaner.threads"?: int | *1
+ "log.cleanup.policy"?: string | *"delete"
+ "log.dir"?: string | *"/tmp/kafka-logs"
+ "log.dirs"?: string | *""
+ "log.flush.interval.messages"?: int | *9223372036854775807
+ "log.flush.interval.ms"?: int | *0
+ "log.flush.offset.checkpoint.interval.ms"?: int | *60000
+ "log.flush.scheduler.interval.ms"?: int | *9223372036854775807
+ "log.flush.start.offset.checkpoint.interval.ms"?: int | *60000
+ "log.index.interval.bytes"?: int | *4096
+ "log.index.size.max.bytes"?: int | *10485760
+ "log.message.downconversion.enable"?: bool | *true
+ "log.message.format.version"?: string | *"2.7-IV2"
+ "log.message.timestamp.difference.max.ms"?: int | *9223372036854775807
+ "log.message.timestamp.type"?: string | *"CreateTime"
+ "log.preallocate"?: bool | *false
+ "log.retention.bytes"?: int | *-1
+ "log.retention.check.interval.ms"?: int | *300000
+ "log.retention.hours"?: int | *168
+ "log.retention.minutes"?: int | *0
+ "log.retention.ms"?: int | *0
+ "log.roll.hours"?: int | *168
+ "log.roll.jitter.hours"?: int | *0
+ "log.roll.jitter.ms"?: int | *0
+ "log.roll.ms"?: int | *0
+ "log.segment.bytes"?: int | *1073741824
+ "log.segment.delete.delay.ms"?: int | *60000
+ "max.connection.creation.rate"?: int | *2147483647
+ "max.connections.per.ip.overrides"?: string | *""
+ "max.connections.per.ip"?: int | *2147483647
+ "max.connections"?: int | *2147483647
+ "max.incremental.fetch.session.cache.slots"?: int | *1000
+ "message.max.bytes"?: int | *1048588
+ "metric.reporters"?: string | *""
+ "metrics.num.samples"?: int | *2
+ "metrics.recording.level"?: string | *"INFO"
+ "metrics.sample.window.ms"?: int | *30000
+ "min.insync.replicas"?: int | *1
+ "num.io.threads"?: int | *8
+ "num.network.threads"?: int | *3
+ "num.partitions"?: int | *1
+ "num.recovery.threads.per.data.dir"?: int | *1
+ "num.replica.alter.log.dirs.threads"?: int | *0
+ "num.replica.fetchers"?: int | *1
+ "offset.metadata.max.bytes"?: int | *4096
+ "offsets.commit.required.acks"?: int | *-1
+ "offsets.commit.timeout.ms"?: int | *5000
+ "offsets.load.buffer.size"?: int | *5242880
+ "offsets.retention.check.interval.ms"?: int | *600000
+ "offsets.retention.minutes"?: int | *10080
+ "offsets.topic.compression.codec"?: int | *0
+ "offsets.topic.num.partitions"?: int | *50
+ "offsets.topic.replication.factor"?: int | *3
+ "offsets.topic.segment.bytes"?: int | *104857600
+ "password.encoder.cipher.algorithm"?: string | *"AES/CBC/PKCS5Padding"
+ "password.encoder.iterations"?: int | *4096
+ "password.encoder.key.length"?: int | *128
+ "password.encoder.keyfactory.algorithm"?: string | *""
+ "password.encoder.old.secret"?: string | *""
+ "password.encoder.secret"?: string | *""
+ "port"?: int | *9092
+ "principal.builder.class"?: string | *""
+ "producer.purgatory.purge.interval.requests"?: int | *1000
+ "queued.max.request.bytes"?: int | *-1
+ "queued.max.requests"?: int | *500
+ "quota.consumer.default"?: int | *9223372036854775807
+ "quota.producer.default"?: int | *9223372036854775807
+ "quota.window.num"?: int | *11
+ "quota.window.size.seconds"?: int | *1
+ "replica.fetch.backoff.ms"?: int | *1000
+ "replica.fetch.max.bytes"?: int | *1048576
+ "replica.fetch.min.bytes"?: int | *1
+ "replica.fetch.response.max.bytes"?: int | *10485760
+ "replica.fetch.wait.max.ms"?: int | *500
+ "replica.high.watermark.checkpoint.interval.ms"?: int | *5000
+ "replica.lag.time.max.ms"?: int | *30000
+ "replica.selector.class"?: string | *""
+ "replica.socket.receive.buffer.bytes"?: int | *65536
+ "replica.socket.timeout.ms"?: int | *30000
+ "replication.quota.window.num"?: int | *11
+ "replication.quota.window.size.seconds"?: int | *1
+ "request.timeout.ms"?: int | *30000
+ "reserved.broker.max.id"?: int | *1000
+ "sasl.client.callback.handler.class"?: string | *""
+ "sasl.enabled.mechanisms"?: string | *"GSSAPI"
+ "sasl.jaas.config"?: string | *""
+ "sasl.kerberos.kinit.cmd"?: string | *"/usr/bin/kinit"
+ "sasl.kerberos.min.time.before.relogin"?: int | *60000
+ "sasl.kerberos.principal.to.local.rules"?: string | *"DEFAULT"
+ "sasl.kerberos.service.name"?: string | *""
+ "sasl.kerberos.ticket.renew.jitter"?: float | *0.05
+ "sasl.kerberos.ticket.renew.window.factor"?: float | *0.8
+ "sasl.login.callback.handler.class"?: string | *""
+ "sasl.login.class"?: string | *""
+ "sasl.login.refresh.buffer.seconds"?: int | *300
+ "sasl.login.refresh.min.period.seconds"?: int | *60
+ "sasl.login.refresh.window.factor"?: float | *0.8
+ "sasl.login.refresh.window.jitter"?: float | *0.05
+ "sasl.mechanism.inter.broker.protocol"?: string | *"GSSAPI"
+ "sasl.server.callback.handler.class"?: string | *""
+ "security.inter.broker.protocol"?: string | *"PLAINTEXT"
+ "security.providers"?: string | *""
+ "socket.connection.setup.timeout.max.ms"?: int | *127000
+ "socket.connection.setup.timeout.ms"?: int | *10000
+ "socket.receive.buffer.bytes"?: int | *102400
+ "socket.request.max.bytes"?: int | *104857600
+ "socket.send.buffer.bytes"?: int | *102400
+ "ssl.cipher.suites"?: string | *""
+ "ssl.client.auth"?: string | *"none"
+ "ssl.enabled.protocols"?: string | *"TLSv1.2"
+ "ssl.endpoint.identification.algorithm"?: string | *"https"
+ "ssl.engine.factory.class"?: string | *""
+ "ssl.key.password"?: string | *""
+ "ssl.keymanager.algorithm"?: string | *"SunX509"
+ "ssl.keystore.certificate.chain"?: string | *""
+ "ssl.keystore.key"?: string | *""
+ "ssl.keystore.location"?: string | *""
+ "ssl.keystore.password"?: string | *""
+ "ssl.keystore.type"?: string | *"JKS"
+ "ssl.principal.mapping.rules"?: string | *"DEFAULT"
+ "ssl.protocol"?: string | *"TLSv1.2"
+ "ssl.provider"?: string | *""
+ "ssl.secure.random.implementation"?: string | *""
+ "ssl.trustmanager.algorithm"?: string | *"PKIX"
+ "ssl.truststore.certificates"?: string | *""
+ "ssl.truststore.location"?: string | *""
+ "ssl.truststore.password"?: string | *""
+ "ssl.truststore.type"?: string | *"JKS"
+ "transaction.abort.timed.out.transaction.cleanup.interval.ms"?: int | *10000
+ "transaction.max.timeout.ms"?: int | *900000
+ "transaction.remove.expired.transaction.cleanup.interval.ms"?: int | *3600000
+ "transaction.state.log.load.buffer.size"?: int | *5242880
+ "transaction.state.log.min.isr"?: int | *2
+ "transaction.state.log.num.partitions"?: int | *50
+ "transaction.state.log.replication.factor"?: int | *3
+ "transaction.state.log.segment.bytes"?: int | *104857600
+ "transactional.id.expiration.ms"?: int | *604800000
+ "unclean.leader.election.enable"?: bool | *false
+ "zookeeper.clientCnxnSocket"?: string | *""
+ "zookeeper.connect"?: string | *""
+ "zookeeper.connection.timeout.ms"?: int | *0
+ "zookeeper.max.in.flight.requests"?: int | *10
+ "zookeeper.session.timeout.ms"?: int | *18000
+ "zookeeper.set.acl"?: bool | *false
+ "zookeeper.ssl.cipher.suites"?: string | *""
+ "zookeeper.ssl.client.enable"?: bool | *false
+ "zookeeper.ssl.crl.enable"?: bool | *false
+ "zookeeper.ssl.enabled.protocols"?: string | *""
+ "zookeeper.ssl.endpoint.identification.algorithm"?: string | *"HTTPS"
+ "zookeeper.ssl.keystore.location"?: string | *""
+ "zookeeper.ssl.keystore.password"?: string | *""
+ "zookeeper.ssl.keystore.type"?: string | *""
+ "zookeeper.ssl.ocsp.enable"?: bool | *false
+ "zookeeper.ssl.protocol"?: string | *"TLSv1.2"
+ "zookeeper.ssl.truststore.location"?: string | *""
+ "zookeeper.ssl.truststore.password"?: string | *""
+ "zookeeper.ssl.truststore.type"?: string | *""
+ "zookeeper.sync.time.ms"?: int | *2000
+ ...
+}
+
+configuration: #KafkaParameter & {
+}
\ No newline at end of file
diff --git a/addons/kafka/configs/2.7/kafka-27-server.prop.tpl b/addons/kafka/configs/2.7/kafka-27-server.prop.tpl
new file mode 100644
index 000000000..00dc927c1
--- /dev/null
+++ b/addons/kafka/configs/2.7/kafka-27-server.prop.tpl
@@ -0,0 +1,219 @@
+# broker config. generate according to https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaConfig.scala#L984
+
+# advertised.host.name=
+# advertised.listeners=
+# advertised.port=
+# alter.config.policy.class.name=
+alter.log.dirs.replication.quota.window.num=11
+alter.log.dirs.replication.quota.window.size.seconds=1
+# authorizer.class.name=
+auto.create.topics.enable=true
+auto.leader.rebalance.enable=true
+background.threads=10
+broker.id.generation.enable=true
+# broker.id=-1
+# broker.rack=
+# client.quota.callback.class=
+compression.type=producer
+connection.failed.authentication.delay.ms=100
+connections.max.idle.ms=600000
+connections.max.reauth.ms=0
+# control.plane.listener.name=
+controlled.shutdown.enable=true
+controlled.shutdown.max.retries=3
+controlled.shutdown.retry.backoff.ms=5000
+controller.quota.window.num=11
+controller.quota.window.size.seconds=1
+controller.socket.timeout.ms=30000
+# create.topic.policy.class.name=
+default.replication.factor=1
+delegation.token.expiry.check.interval.ms=3600000
+delegation.token.expiry.time.ms=86400000
+# delegation.token.master.key=
+delegation.token.max.lifetime.ms=604800000
+delete.records.purgatory.purge.interval.requests=1
+delete.topic.enable=true
+fetch.max.bytes=57671680
+fetch.purgatory.purge.interval.requests=1000
+group.initial.rebalance.delay.ms=3000
+group.max.session.timeout.ms=1800000
+group.max.size=2147483647
+group.min.session.timeout.ms=6000
+# host.name=
+# inter.broker.listener.name=
+inter.broker.protocol.version=2.7-IV2
+kafka.metrics.polling.interval.secs=10
+# kafka.metrics.reporters=
+leader.imbalance.check.interval.seconds=300
+leader.imbalance.per.broker.percentage=10
+# listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+# listeners=
+log.cleaner.backoff.ms=15000
+log.cleaner.dedupe.buffer.size=134217728
+log.cleaner.delete.retention.ms=86400000
+log.cleaner.enable=true
+log.cleaner.io.buffer.load.factor=0.9
+log.cleaner.io.buffer.size=524288
+log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
+log.cleaner.max.compaction.lag.ms=9223372036854775807
+log.cleaner.min.cleanable.ratio=0.5
+log.cleaner.min.compaction.lag.ms=0
+log.cleaner.threads=1
+log.cleanup.policy=delete
+log.dir=/tmp/kafka-logs
+# log.dirs=
+log.flush.interval.messages=9223372036854775807
+log.flush.interval.ms=1000
+log.flush.offset.checkpoint.interval.ms=60000
+log.flush.scheduler.interval.ms=9223372036854775807
+log.flush.start.offset.checkpoint.interval.ms=60000
+log.index.interval.bytes=4096
+log.index.size.max.bytes=10485760
+log.message.downconversion.enable=true
+log.message.format.version=2.7-IV2
+log.message.timestamp.difference.max.ms=9223372036854775807
+log.message.timestamp.type=CreateTime
+log.preallocate=false
+log.retention.bytes=-1
+log.retention.check.interval.ms=300000
+log.retention.hours=168
+# log.retention.minutes=
+# log.retention.ms=
+log.roll.hours=168
+log.roll.jitter.hours=0
+# log.roll.jitter.ms=
+# log.roll.ms=
+log.segment.bytes=1073741824
+log.segment.delete.delay.ms=60000
+max.connection.creation.rate=2147483647
+# max.connections.per.ip.overrides=
+max.connections.per.ip=2147483647
+max.connections=2147483647
+max.incremental.fetch.session.cache.slots=1000
+message.max.bytes=1048588
+metric.reporters=
+metrics.num.samples=2
+metrics.recording.level=INFO
+metrics.sample.window.ms=30000
+min.insync.replicas=1
+num.io.threads=8
+num.network.threads=3
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+# num.replica.alter.log.dirs.threads=
+num.replica.fetchers=1
+offset.metadata.max.bytes=4096
+offsets.commit.required.acks=-1
+offsets.commit.timeout.ms=5000
+offsets.load.buffer.size=5242880
+offsets.retention.check.interval.ms=600000
+offsets.retention.minutes=10080
+offsets.topic.compression.codec=0
+offsets.topic.num.partitions=50
+offsets.topic.replication.factor=3
+offsets.topic.segment.bytes=104857600
+password.encoder.cipher.algorithm=AES/CBC/PKCS5Padding
+password.encoder.iterations=4096
+password.encoder.key.length=128
+# password.encoder.keyfactory.algorithm=
+# password.encoder.old.secret=
+# password.encoder.secret=
+port=9092
+# principal.builder.class=
+producer.purgatory.purge.interval.requests=1000
+queued.max.request.bytes=-1
+queued.max.requests=500
+quota.consumer.default=9223372036854775807
+quota.producer.default=9223372036854775807
+quota.window.num=11
+quota.window.size.seconds=1
+replica.fetch.backoff.ms=1000
+replica.fetch.max.bytes=1048576
+replica.fetch.min.bytes=1
+replica.fetch.response.max.bytes=10485760
+replica.fetch.wait.max.ms=500
+replica.high.watermark.checkpoint.interval.ms=5000
+replica.lag.time.max.ms=30000
+# replica.selector.class=
+replica.socket.receive.buffer.bytes=65536
+replica.socket.timeout.ms=30000
+replication.quota.window.num=11
+replication.quota.window.size.seconds=1
+request.timeout.ms=30000
+reserved.broker.max.id=1000
+# sasl.client.callback.handler.class=
+sasl.enabled.mechanisms=GSSAPI
+# sasl.jaas.config=
+sasl.kerberos.kinit.cmd=/usr/bin/kinit
+sasl.kerberos.min.time.before.relogin=60000
+sasl.kerberos.principal.to.local.rules=DEFAULT
+# sasl.kerberos.service.name=
+sasl.kerberos.ticket.renew.jitter=0.05
+sasl.kerberos.ticket.renew.window.factor=0.8
+# sasl.login.callback.handler.class=
+# sasl.login.class=
+sasl.login.refresh.buffer.seconds=300
+sasl.login.refresh.min.period.seconds=60
+sasl.login.refresh.window.factor=0.8
+sasl.login.refresh.window.jitter=0.05
+sasl.mechanism.inter.broker.protocol=GSSAPI
+# sasl.server.callback.handler.class=
+security.inter.broker.protocol=PLAINTEXT
+# security.providers=
+socket.connection.setup.timeout.max.ms=127000
+socket.connection.setup.timeout.ms=10000
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+socket.send.buffer.bytes=102400
+# ssl.cipher.suites=
+ssl.client.auth=none
+ssl.enabled.protocols=TLSv1.2
+ssl.endpoint.identification.algorithm=https
+# ssl.engine.factory.class=
+# ssl.key.password=
+ssl.keymanager.algorithm=SunX509
+# ssl.keystore.certificate.chain=
+# ssl.keystore.key=
+# ssl.keystore.location=
+# ssl.keystore.password=
+# ssl.keystore.type=JKS
+ssl.principal.mapping.rules=DEFAULT
+ssl.protocol=TLSv1.2
+# ssl.provider=
+# ssl.secure.random.implementation=
+ssl.trustmanager.algorithm=PKIX
+# ssl.truststore.certificates=
+# ssl.truststore.location=
+# ssl.truststore.password=
+# ssl.truststore.type=JKS
+transaction.abort.timed.out.transaction.cleanup.interval.ms=10000
+transaction.max.timeout.ms=900000
+transaction.remove.expired.transaction.cleanup.interval.ms=3600000
+transaction.state.log.load.buffer.size=5242880
+transaction.state.log.min.isr=2
+transaction.state.log.num.partitions=50
+transaction.state.log.replication.factor=3
+transaction.state.log.segment.bytes=104857600
+transactional.id.expiration.ms=604800000
+unclean.leader.election.enable=false
+zookeeper.clientCnxnSocket=
+# zookeeper.connect=
+# zookeeper.connection.timeout.ms=
+# zookeeper.max.in.flight.requests=10
+# zookeeper.session.timeout.ms=18000
+# zookeeper.set.acl=false
+# zookeeper.ssl.cipher.suites=
+# zookeeper.ssl.client.enable=false
+# zookeeper.ssl.crl.enable=false
+# zookeeper.ssl.enabled.protocols=
+# zookeeper.ssl.endpoint.identification.algorithm=HTTPS
+# zookeeper.ssl.keystore.location=
+# zookeeper.ssl.keystore.password=
+# zookeeper.ssl.keystore.type=
+# zookeeper.ssl.ocsp.enable=false
+# zookeeper.ssl.protocol=TLSv1.2
+# zookeeper.ssl.truststore.location=
+# zookeeper.ssl.truststore.password=
+# zookeeper.ssl.truststore.type=
+# zookeeper.sync.time.ms=2000
+
diff --git a/addons/kafka/scripts/kafka-27-server-setup.sh b/addons/kafka/scripts/kafka-27-server-setup.sh
new file mode 100644
index 000000000..ee81186b8
--- /dev/null
+++ b/addons/kafka/scripts/kafka-27-server-setup.sh
@@ -0,0 +1,247 @@
+#!/bin/bash
+
+# shellcheck disable=SC2153
+# shellcheck disable=SC2034
+ut_mode="false"
+test || __() {
+ # when running in non-unit test mode, set the options "set -ex".
+ set -ex;
+}
+
+kafka_config_certs_path="/opt/bitnami/kafka/config/certs"
+kafka_config_path="/opt/bitnami/kafka/config"
+
+load_common_library() {
+ # the common.sh scripts is mounted to the same path which is defined in the cmpd.spec.scripts
+ common_library_file="/scripts/common.sh"
+ # shellcheck disable=SC1090
+ source "${common_library_file}"
+}
+
+set_tls_configuration_if_needed() {
+ ## check env TLS_ENABLED and TLS_CERT_PATH env variables
+ ## TODO: how to pass TLS_ENABLED and TLS_CERT_PATH to kafka-server-setup.sh? currently, it is not supported.
+ if [[ -z "$TLS_ENABLED" ]] || [[ -z "$TLS_CERT_PATH" ]]; then
+ echo "TLS_ENABLED or TLS_CERT_PATH is not set, skipping TLS configuration"
+ return 0
+ fi
+
+ # override TLS and auth settings
+ export KAFKA_TLS_TYPE="PEM"
+ echo "[tls]KAFKA_TLS_TYPE=$KAFKA_TLS_TYPE"
+ export KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=""
+ echo "[tls]KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=$KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM"
+ export KAFKA_CERTIFICATE_PASSWORD=""
+ echo "[tls]KAFKA_CERTIFICATE_PASSWORD=$KAFKA_CERTIFICATE_PASSWORD"
+ export KAFKA_TLS_CLIENT_AUTH=none
+ echo "[tls]KAFKA_TLS_CLIENT_AUTH=$KAFKA_TLS_CLIENT_AUTH"
+
+ # override TLS protocol
+ export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:SSL
+ echo "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP"
+ # Todo: enable encrypted transmission inside the service
+ #export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INTERNAL:SSL,CLIENT:SSL
+ #export KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL=SSL
+ #echo "KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL=SSL"
+
+ mkdir -p "$kafka_config_certs_path"
+ PEM_CA="$TLS_CERT_PATH/ca.crt"
+ PEM_CERT="$TLS_CERT_PATH/tls.crt"
+ PEM_KEY="$TLS_CERT_PATH/tls.key"
+ if [[ -f "$PEM_CERT" ]] && [[ -f "$PEM_KEY" ]]; then
+ CERT_DIR="$kafka_config_certs_path"
+ PEM_CA_LOCATION="${CERT_DIR}/kafka.truststore.pem"
+ PEM_CERT_LOCATION="${CERT_DIR}/kafka.keystore.pem"
+ if [[ -f "$PEM_CA" ]]; then
+ cp "$PEM_CA" "$PEM_CA_LOCATION"
+ cp "$PEM_CERT" "$PEM_CERT_LOCATION"
+ else
+ echo "[tls]PEM_CA not provided, and auth.tls.pemChainIncluded was not true. One of these values must be set when using PEM type for TLS." >&2
+ return 1
+ fi
+
+ # Ensure the key used PEM format with PKCS#8
+ openssl pkcs8 -topk8 -nocrypt -in "$PEM_KEY" > "${CERT_DIR}/kafka.keystore.key"
+ # combined the certificate and private-key for client use
+ cat ${CERT_DIR}/kafka.keystore.key ${PEM_CERT_LOCATION} > ${CERT_DIR}/client.combined.key
+ else
+ echo "[tls]Couldn't find the expected PEM files! They are mandatory when encryption via TLS is enabled." >&2
+ return 1
+ fi
+ return 0
+}
+
+convert_server_properties_to_env_var() {
+ # cfg setting with props
+ # convert server.properties to 'export KAFKA_CFG_{prop}' env variables
+ SERVER_PROP_PATH=${SERVER_PROP_PATH:-/bitnami/kafka/config/server.properties}
+ SERVER_PROP_FILE=${SERVER_PROP_FILE:-server.properties}
+
+ if [[ -f "$SERVER_PROP_FILE" ]]; then
+ IFS='='
+ while read -r line; do
+ if [[ "$line" =~ ^#.* ]]; then
+ continue
+ fi
+ echo "convert prop ${line}"
+ read -ra kv <<< "$line"
+ len=${#kv[@]}
+ if [[ $len != 2 ]]; then
+ echo "line '${line}' has no value; skipped"
+ continue
+ fi
+ env_suffix=${kv[0]^^}
+ env_suffix=${env_suffix//./_}
+ env_suffix=`eval echo "${env_suffix}"`
+ env_value=`eval echo "${kv[1]}"`
+ export KAFKA_CFG_${env_suffix}="${env_value}"
+ echo "[cfg]export KAFKA_CFG_${env_suffix}=${env_value}"
+ done <$SERVER_PROP_FILE
+ unset IFS
+ fi
+}
+
+override_sasl_configuration() {
+ # override SASL settings
+ if [[ "true" == "$KB_KAFKA_ENABLE_SASL" ]]; then
+ # bitnami default jaas setting: /opt/bitnami/kafka/config/kafka_jaas.conf
+ if [[ "${KB_KAFKA_SASL_CONFIG_PATH}" ]]; then
+ cp ${KB_KAFKA_SASL_CONFIG_PATH} $kafka_config_path/kafka_jaas.conf 2>/dev/null
+ echo "[sasl]do: cp ${KB_KAFKA_SASL_CONFIG_PATH} $kafka_config_path/kafka_jaas.conf "
+ fi
+ export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT
+ echo "[sasl]KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP"
+ export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
+ echo "[sasl]export KAFKA_CFG_SASL_ENABLED_MECHANISMS=${KAFKA_CFG_SASL_ENABLED_MECHANISMS}"
+ export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
+ echo "[sasl]export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=${KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL}"
+ fi
+}
+
+set_jvm_configuration() {
+ # jvm setting
+ if [[ -n "$KB_KAFKA_BROKER_HEAP" ]]; then
+ export KAFKA_HEAP_OPTS=${KB_KAFKA_BROKER_HEAP}
+ echo "[jvm][KB_KAFKA_BROKER_HEAP]export KAFKA_HEAP_OPTS=${KB_KAFKA_BROKER_HEAP}"
+ fi
+}
+
+extract_ordinal_from_object_name() {
+ local object_name="$1"
+ local ordinal="${object_name##*-}"
+ echo "$ordinal"
+}
+
+parse_advertised_svc_if_exist() {
+ local pod_name="${MY_POD_NAME}"
+
+ if [[ -z "${BROKER_ADVERTISED_PORT}" ]]; then
+ echo "Environment variable BROKER_ADVERTISED_PORT not found. Ignoring."
+ return 0
+ fi
+
+ # the value format of BROKER_ADVERTISED_PORT is "pod1Svc:advertisedPort1,pod2Svc:advertisedPort2,..."
+ IFS=',' read -ra advertised_ports <<< "${BROKER_ADVERTISED_PORT}"
+ echo "find advertised_ports:${advertised_ports}"
+ local found=false
+ pod_name_ordinal=$(extract_ordinal_from_object_name "$pod_name")
+ echo "find pod_name_ordinal:${pod_name_ordinal}"
+ for advertised_port in "${advertised_ports[@]}"; do
+ IFS=':' read -ra parts <<< "$advertised_port"
+ local svc_name="${parts[0]}"
+ local port="${parts[1]}"
+ svc_name_ordinal=$(extract_ordinal_from_object_name "$svc_name")
+ echo "find svc_name:${svc_name},port:${port},svc_name_ordinal:${svc_name_ordinal}"
+ if [[ "$svc_name_ordinal" == "$pod_name_ordinal" ]]; then
+ echo "Found matching svcName and port for podName '$pod_name', BROKER_ADVERTISED_PORT: $BROKER_ADVERTISED_PORT. svcName: $svc_name, port: $port."
+ advertised_svc_port_value="$port"
+ advertised_svc_host_value="$MY_POD_HOST_IP"
+ found=true
+ break
+ fi
+ done
+
+ if [[ "$found" == false ]]; then
+ echo "Error: No matching svcName and port found for podName '$pod_name', BROKER_ADVERTISED_PORT: $BROKER_ADVERTISED_PORT. Exiting." >&2
+ return 1
+ fi
+}
+
+set_cfg_metadata() {
+ # set advertised.listeners for broker
+ current_pod_fqdn=$(get_target_pod_fqdn_from_pod_fqdn_vars "$POD_FQDN_LIST" "$MY_POD_NAME")
+ if is_empty "$current_pod_fqdn"; then
+ echo "Error: Failed to get current pod: $MY_POD_NAME fqdn from pod fqdn list: $POD_FQDN_LIST. Exiting." >&2
+ return 1
+ fi
+
+ if ! parse_advertised_svc_if_exist ; then
+ echo "Error: Failed to parse advertised svc from BROKER_ADVERTISED_PORT: $BROKER_ADVERTISED_PORT. Exiting." >&2
+ return 1
+ fi
+
+ # Todo: currently only nodeport and clusterIp network modes are supported. LoadBalance is not supported yet and needs future support.
+ if [ -n "$advertised_svc_host_value" ] && [ -n "$advertised_svc_port_value" ] && [ "$advertised_svc_port_value" != "9092" ]; then
+ # enable NodePort, use node ip + mapped port as client connection
+ nodeport_domain="${advertised_svc_host_value}:${advertised_svc_port_value}"
+ export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${nodeport_domain}"
+ echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
+ elif [ "${KB_BROKER_DIRECT_POD_ACCESS}" == "true" ]; then
+ export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${MY_POD_IP}:9092"
+ echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
+ else
+ # default, use headless service url as client connection
+ export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${current_pod_fqdn}:9092"
+ echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
+ fi
+
+ # override node.id setting
+ # increments based on a specified base to avoid conflicts with controller settings
+ INDEX=$(echo $MY_POD_NAME | grep -o "\-[0-9]\+\$")
+ INDEX=${INDEX#-}
+ BROKER_NODE_ID=$(( $INDEX + $BROKER_MIN_NODE_ID ))
+ export KAFKA_CFG_NODE_ID="$BROKER_NODE_ID"
+ export KAFKA_CFG_BROKER_ID="$BROKER_NODE_ID"
+ echo "[cfg]KAFKA_CFG_NODE_ID=$KAFKA_CFG_NODE_ID"
+}
+
+set_zookeeper_connect() {
+ # Check if KB_KAFKA_ZOOKEEPER_CONN is set
+ if [[ -z "$KB_KAFKA_ZOOKEEPER_CONN" ]]; then
+ echo "Error: KB_KAFKA_ZOOKEEPER_CONN is not set"
+ return 1
+ fi
+
+ if [ -n "$KB_KAFKA_ZK_SUB_PATH" ]; then
+ # Set KAFKA_CFG_ZOOKEEPER_CONNECT to the concat of KB_KAFKA_ZOOKEEPER_CONN and KB_KAFKA_ZK_SUB_PATH
+ export KAFKA_CFG_ZOOKEEPER_CONNECT="$KB_KAFKA_ZOOKEEPER_CONN/$KB_KAFKA_ZK_SUB_PATH"
+ else
+ # Set KAFKA_CFG_ZOOKEEPER_CONNECT to the value of KB_KAFKA_ZOOKEEPER_CONN
+ export KAFKA_CFG_ZOOKEEPER_CONNECT="$KB_KAFKA_ZOOKEEPER_CONN"
+ fi
+
+ # Optionally, print the value to verify
+ echo "[cfg]export KAFKA_CFG_ZOOKEEPER_CONNECT=$KAFKA_CFG_ZOOKEEPER_CONNECT,for kafka-server."
+}
+
+start_server() {
+ load_common_library
+ set_tls_configuration_if_needed
+ convert_server_properties_to_env_var
+ override_sasl_configuration
+ set_jvm_configuration
+ set_zookeeper_connect
+ set_cfg_metadata
+
+ exec /entrypoint.sh /run.sh
+}
+
+# This is magic for shellspec ut framework.
+# Sometime, functions are defined in a single shell script.
+# You will want to test it. but you do not want to run the script.
+# When included from shellspec, __SOURCED__ variable defined and script
+# end here. The script path is assigned to the __SOURCED__ variable.
+${__SOURCED__:+false} : || return 0
+
+# main
+start_server
\ No newline at end of file
diff --git a/addons/kafka/scripts/kafka-server-setup.sh b/addons/kafka/scripts/kafka-server-setup.sh
index f2fedb3e5..f04ecd832 100644
--- a/addons/kafka/scripts/kafka-server-setup.sh
+++ b/addons/kafka/scripts/kafka-server-setup.sh
@@ -212,7 +212,7 @@ set_cfg_metadata() {
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${nodeport_domain}"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
elif [ "${KB_BROKER_DIRECT_POD_ACCESS}" == "true" ]; then
- export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${KB_POD_IP}:9092"
+ export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${MY_POD_IP}:9092"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
else
# default, use headless service url as client connection
diff --git a/addons/kafka/templates/_helpers.tpl b/addons/kafka/templates/_helpers.tpl
index 3269388a4..449dce14a 100644
--- a/addons/kafka/templates/_helpers.tpl
+++ b/addons/kafka/templates/_helpers.tpl
@@ -121,6 +121,20 @@ Define kafka-broker component definition regex pattern
^kafka-broker-
{{- end -}}
+{{/*
+Define kafka-broker component definition name
+*/}}
+{{- define "kafka2-broker.componentDefName" -}}
+kafka27-broker-{{ .Chart.Version }}
+{{- end -}}
+
+{{/*
+Define kafka-broker component definition regex pattern
+*/}}
+{{- define "kafka2-broker.cmpdRegexpPattern" -}}
+^kafka27-broker-
+{{- end -}}
+
{{/*
Define kafka config constraint name
*/}}
@@ -128,6 +142,13 @@ Define kafka config constraint name
kafka-config-constraints
{{- end -}}
+{{/*
+Define kafka config constraint name
+*/}}
+{{- define "kafka2.configConstraintName" -}}
+kafka2-config-constraints
+{{- end -}}
+
{{/*
Define kafka configuration tpl name
*/}}
@@ -135,6 +156,13 @@ Define kafka configuration tpl name
kafka-configuration-tpl
{{- end -}}
+{{/*
+Define kafka2 configuration tpl name
+*/}}
+{{- define "kafka2.configurationTplName" -}}
+kafka27-configuration-tpl-{{ .Chart.Version }}
+{{- end -}}
+
{{/*
Define kafka jmx configuration tpl name
*/}}
@@ -149,6 +177,20 @@ Define kafka server scripts tpl name
kafka-server-scripts-tpl
{{- end -}}
+{{/*
+Define kafka 2.x server scripts tpl name
+*/}}
+{{- define "kafka2.serverScriptsTplName" -}}
+kafka2-server-scripts-tpl
+{{- end -}}
+
+{{/*
+Define kafka 2.x broker env configmap tpl name
+*/}}
+{{- define "kafka2.brokerEnvTplName" -}}
+kafka2-broker-env-tpl
+{{- end -}}
+
{{/*
Define kafka tools scripts tpl name
*/}}
diff --git a/addons/kafka/templates/clusterdefinition.yaml b/addons/kafka/templates/clusterdefinition.yaml
index d3405f4a2..5b71d256f 100644
--- a/addons/kafka/templates/clusterdefinition.yaml
+++ b/addons/kafka/templates/clusterdefinition.yaml
@@ -72,3 +72,7 @@ spec:
update:
- kafka-broker
- kafka-controller
+ - name: kafka2-external-zk
+ components:
+ - name: kafka-broker
+ compDef: {{ include "kafka2-broker.cmpdRegexpPattern" . }}
\ No newline at end of file
diff --git a/addons/kafka/templates/cmpd-broker-27.yaml b/addons/kafka/templates/cmpd-broker-27.yaml
new file mode 100644
index 000000000..186e8bd48
--- /dev/null
+++ b/addons/kafka/templates/cmpd-broker-27.yaml
@@ -0,0 +1,246 @@
+apiVersion: apps.kubeblocks.io/v1
+kind: ComponentDefinition
+metadata:
+ name: {{ include "kafka2-broker.componentDefName" . }}
+ labels:
+ {{- include "kafka.labels" . | nindent 4 }}
+ {{- if .Values.commonLabels }}
+ {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
+ {{- end }}
+ annotations:
+ {{- include "kafka.annotations" . | nindent 4 }}
+ {{- if .Values.commonAnnotations }}
+ {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
+ {{- end }}
+spec:
+ provider: kubeblocks
+ description: Kafka 2.x broker component definition
+ serviceKind: kafka
+ serviceVersion: {{ .Values.defaultKafka2ServiceVersion.broker }}
+ services:
+ - name: advertised-listener
+ serviceName: advertised-listener
+ podService: true
+ spec:
+ type: ClusterIP
+ ports:
+ - name: broker
+ port: 9092
+ targetPort: kafka-client
+ serviceRefDeclarations:
+ - name: kafkaZookeeper
+ serviceRefDeclarationSpecs:
+ - serviceKind: zookeeper
+ serviceVersion: ^3\.[5-9]\.\d{1,2}$
+ optional: false
+ vars:
+ - name: CLUSTER_UID
+ valueFrom:
+ clusterVarRef:
+ clusterUID: Required
+ - name: SUPER_USER
+ valueFrom:
+ credentialVarRef:
+ name: admin
+ username: Required
+ optional: false
+ - name: POD_FQDN_LIST
+ valueFrom:
+ componentVarRef:
+ optional: false
+ podFQDNs: Required
+ - name: POD_NAME_LIST
+ valueFrom:
+ componentVarRef:
+ optional: false
+ podNames: Required
+ - name: COMPONENT_NAME
+ valueFrom:
+ componentVarRef:
+ optional: false
+ componentName: Required
+ - name: CLUSTER_NAME
+ valueFrom:
+ clusterVarRef:
+ clusterName: Required
+ - name: KB_KAFKA_ZOOKEEPER_CONN
+ valueFrom:
+ serviceRefVarRef:
+ name: kafkaZookeeper
+ endpoint: Required
+ optional: false
+ ## Todo: currently only nodeport and clusterIp network modes are supported. LoadBalance is not supported yet and needs future support.
+ - name: BROKER_ADVERTISED_PORT
+ valueFrom:
+ serviceVarRef:
+ compDef: {{ include "kafka2-broker.cmpdRegexpPattern" . }}
+ name: advertised-listener
+ optional: true
+ port:
+ name: broker
+ option: Optional
+ systemAccounts:
+ - name: client
+ secretRef:
+ name: {{ include "kafka.defaultClientSystemAccountSecretName" . }}
+ namespace: {{ .Release.Namespace }}
+ - name: admin
+ secretRef:
+ name: {{ include "kafka.defaultSuperUserSystemAccountSecretName" . }}
+ namespace: {{ .Release.Namespace }}
+ ## serial is not used because rsm currently does not support kafka's role detection. The lack of role label during restart will affect the pod restart.
+ updateStrategy: BestEffortParallel
+ configs:
+ - name: kafka-configuration-tpl
+ constraintRef: {{ include "kafka2.configConstraintName" . }}
+ templateRef: {{ include "kafka2.configurationTplName" . }}
+ volumeName: kafka-config
+ namespace: {{ .Release.Namespace }}
+ - name: kafka-jmx-configuration-tpl
+ templateRef: {{ include "kafka.jmxConfigurationTplName" . }}
+ volumeName: jmx-config
+ namespace: {{ .Release.Namespace }}
+ scripts:
+ - name: kafka-scripts-tpl
+ templateRef: {{ include "kafka2.serverScriptsTplName" . }}
+ volumeName: scripts
+ namespace: {{ .Release.Namespace }}
+ defaultMode: 0755
+ - name: kafka-tools-scripts-tpl
+ templateRef: {{ include "kafka.toolsScriptsTplName" . }}
+ volumeName: tools
+ namespace: {{ .Release.Namespace }}
+ defaultMode: 0755
+ runtime:
+ securityContext:
+ fsGroup: 1001
+ containers:
+ - name: kafka
+ image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafka.repository }}:{{ default .Values.images.kafka2.tag }}
+ imagePullPolicy: {{ default "IfNotPresent" .Values.images.pullPolicy }}
+ securityContext:
+ allowPrivilegeEscalation: false
+ runAsNonRoot: true
+ runAsUser: 1001
+ command:
+ - /scripts/kafka-server-setup.sh
+ env:
+ - name: BITNAMI_DEBUG
+ value: {{ .Values.debugEnabled | quote }}
+ - name: MY_POD_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ - name: MY_POD_NAME
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: metadata.name
+ - name: MY_POD_HOST_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.hostIP
+ - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
+ value: "INTERNAL"
+ - name: KB_KAFKA_ZK_SUB_PATH
+ value: $(CLUSTER_NAME)
+ - name: KAFKA_CFG_LISTENERS # required for KRaft
+ value: "INTERNAL://:9094,CLIENT://:9092"
+ - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
+ value: "INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT"
+ - name: KAFKA_CFG_ADVERTISED_LISTENERS
+ value: "INTERNAL://$(MY_POD_IP):9094,CLIENT://$(MY_POD_IP):9092"
+ - name: KAFKA_CFG_INITIAL_BROKER_REGISTRATION_TIMEOUT_MS
+ value: "240000"
+ - name: ALLOW_PLAINTEXT_LISTENER
+ value: "yes"
+ - name: JMX_PORT
+ value: "5555"
+ - name: KAFKA_VOLUME_DIR
+ value: "/bitnami/kafka"
+ - name: KAFKA_CFG_METADATA_LOG_DIR
+ value: "/bitnami/kafka/metadata"
+ - name: KAFKA_LOG_DIR
+ value: "/bitnami/kafka/data"
+ - name: KAFKA_HEAP_OPTS
+ #value: "-Xmx1024m -Xms1024m"
+ value: "-XshowSettings:vm -XX:MaxRAMPercentage=100 -Ddepth=64"
+ - name: SERVER_PROP_FILE
+ value: /scripts/server.properties
+ - name: KAFKA_KRAFT_CLUSTER_ID
+ value: $(CLUSTER_UID)
+ - name: KAFKA_CFG_SUPER_USERS
+ value: "User:$(SUPER_USER)"
+ # - name: KB_KAFKA_ENABLE_SASL # enable the SASL with plain mode
+ # value: "true"
+ - name: KB_KAFKA_SASL_CONFIG_PATH # specify the SASL jaas users
+ value: /tools/server-jaas.properties
+ - name: BROKER_MIN_NODE_ID
+ value: {{ .Values.kafkaBroker.minNodeId | quote }}
+ ports:
+ - name: kafka-client
+ containerPort: 9092
+ - name: kafka-internal
+ containerPort: 9094
+ livenessProbe:
+ failureThreshold: 3
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ successThreshold: 1
+ timeoutSeconds: 5
+ tcpSocket:
+ port: kafka-client
+ startupProbe:
+ failureThreshold: 30
+ initialDelaySeconds: 5
+ periodSeconds: 10
+ successThreshold: 1
+ timeoutSeconds: 5
+ tcpSocket:
+ port: kafka-client
+ volumeMounts:
+ - name: data
+ mountPath: /bitnami/kafka
+ - name: metadata
+ mountPath: /bitnami/kafka/metadata
+ - name: scripts
+ mountPath: /scripts/kafka-server-setup.sh
+ subPath: kafka-server-setup.sh
+ - name: scripts
+ mountPath: /scripts/common.sh
+ subPath: common.sh
+ - name: kafka-config
+ mountPath: /scripts/server.properties
+ subPath: server.properties
+ - name: tools
+ mountPath: /tools/client-ssl.properties
+ subPath: client-ssl.properties
+ - name: tools
+ mountPath: /tools/server-jaas.properties
+ subPath: server-jaas.properties
+ - name: jmx-exporter
+ image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.jmxExporter.repository }}:{{ .Values.images.jmxExporter.tag }}
+ imagePullPolicy: {{ default "IfNotPresent" .Values.images.pullPolicy }}
+ securityContext:
+ runAsNonRoot: true
+ runAsUser: 1001
+ command:
+ - java
+ args:
+ - -XX:MaxRAMPercentage=100
+ - -XshowSettings:vm
+ - -jar
+ - jmx_prometheus_httpserver.jar
+ - "5556"
+ - /etc/jmx-kafka/jmx-kafka-prometheus.yml
+ ports:
+ - name: metrics
+ containerPort: 5556
+ env:
+ - name: SERVICE_PORT
+ value: "5556"
+ volumeMounts:
+ - name: jmx-config
+ mountPath: /etc/jmx-kafka
diff --git a/addons/kafka/templates/cmpv-broker.yaml b/addons/kafka/templates/cmpv-broker.yaml
index d80234b28..68a63e257 100644
--- a/addons/kafka/templates/cmpv-broker.yaml
+++ b/addons/kafka/templates/cmpv-broker.yaml
@@ -12,9 +12,18 @@ spec:
- {{ include "kafka-broker.cmpdRegexpPattern" . }}
releases:
- 3.3.2-debian-11-r54
+ - compDefs:
+ - {{ include "kafka2-broker.cmpdRegexpPattern" . }}
+ releases:
+ - 2.7.0-debian-10-r124
releases:
- name: 3.3.2-debian-11-r54
changes:
serviceVersion: 3.3.2
images:
kafka: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafka.repository }}:{{ default .Chart.AppVersion .Values.images.kafka.tag }}
+ - name: 2.7.0-debian-10-r124
+ changes:
+ serviceVersion: 2.7.0
+ images:
+ kafka: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafka.repository }}:{{ default .Values.images.kafka2.tag }}
diff --git a/addons/kafka/templates/configconstraint.yaml b/addons/kafka/templates/configconstraint.yaml
index a3007454f..27a1449d6 100644
--- a/addons/kafka/templates/configconstraint.yaml
+++ b/addons/kafka/templates/configconstraint.yaml
@@ -22,4 +22,32 @@ spec:
{{- .Files.Get "configs/kafka-server-constraint.cue" | nindent 6 }}
fileFormatConfig:
- format: properties
\ No newline at end of file
+ format: properties
+
+---
+apiVersion: apps.kubeblocks.io/v1beta1
+kind: ConfigConstraint
+metadata:
+ name: {{ include "kafka2.configConstraintName" . }}
+ labels:
+ {{- include "kafka.labels" . | nindent 4 }}
+ {{- if .Values.commonLabels }}
+ {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
+ {{- end }}
+ {{- if .Values.commonAnnotations }}
+ annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
+ {{- end }}
+spec:
+
+ # ConfigurationSchema that impose restrictions on engine parameter's rule
+ parametersSchema:
+ # top level mysql configuration type
+ topLevelKey: KafkaParameter
+
+ # schemaInJSON: auto generate from cue scripts
+ cue: |-
+ {{- .Files.Get "configs/2.7/kafka-27-server-constraint.cue" | nindent 6 }}
+
+ fileFormatConfig:
+ format: properties
+
diff --git a/addons/kafka/templates/configuration-template.yaml b/addons/kafka/templates/configuration-template.yaml
index 5c2f565c1..722e57592 100644
--- a/addons/kafka/templates/configuration-template.yaml
+++ b/addons/kafka/templates/configuration-template.yaml
@@ -14,6 +14,23 @@ data:
server.properties: |-
{{- .Files.Get "configs/kafka-server.prop.tpl" | nindent 4 }}
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ include "kafka2.configurationTplName" . }}
+ namespace: {{ .Release.Namespace | quote }}
+ labels: {{- include "common.labels.standard" . | nindent 4 }}
+ {{- if .Values.commonLabels }}
+ {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
+ {{- end }}
+ {{- if .Values.commonAnnotations }}
+ annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
+ {{- end }}
+data:
+ server.properties: |-
+ {{- .Files.Get "configs/2.7/kafka-27-server.prop.tpl" | nindent 4 }}
+
---
apiVersion: v1
kind: ConfigMap
diff --git a/addons/kafka/templates/scripts-template.yaml b/addons/kafka/templates/scripts-template.yaml
index aa5a5caa8..725880825 100644
--- a/addons/kafka/templates/scripts-template.yaml
+++ b/addons/kafka/templates/scripts-template.yaml
@@ -24,6 +24,30 @@ data:
---
apiVersion: v1
kind: ConfigMap
+metadata:
+ name: {{ include "kafka2.serverScriptsTplName" . }}
+ namespace: {{ .Release.Namespace | quote }}
+ labels: {{- include "common.labels.standard" . | nindent 4 }}
+ {{- if .Values.commonLabels }}
+ {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
+ {{- end }}
+ {{- if .Values.commonAnnotations }}
+ annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
+ {{- end }}
+data:
+ common.sh: |-
+ #!/bin/bash
+ {{- include "kblib.compvars.get_target_pod_fqdn_from_pod_fqdn_vars" $ | nindent 4 }}
+ {{- include "kblib.strings.is_empty" $ | nindent 4 }}
+ kafka-server-setup.sh: |-
+ {{- .Files.Get "scripts/kafka-27-server-setup.sh" | nindent 4 }}
+ kafka-exporter-setup.sh: |-
+ {{- .Files.Get "scripts/kafka-exporter-setup.sh" | nindent 4 }}
+ kafka-env.sh: |-
+ {{- .Files.Get "scripts/kafka-env.sh" | nindent 4 }}
+---
+apiVersion: v1
+kind: ConfigMap
metadata:
name: {{ include "kafka.toolsScriptsTplName" . }}
namespace: {{ .Release.Namespace | quote }}
diff --git a/addons/kafka/values.yaml b/addons/kafka/values.yaml
index 0f6226897..83a7a9ab7 100644
--- a/addons/kafka/values.yaml
+++ b/addons/kafka/values.yaml
@@ -20,6 +20,8 @@ images:
repository: bitnami/kafka
# tag: 3.4.0-debian-11-r22
tag: 3.3.2-debian-11-r54
+ kafka2:
+ tag: 2.7.0-debian-10-r124
kafkaExporter:
repository: bitnami/kafka-exporter
tag: 1.6.0-debian-11-r67
@@ -34,6 +36,10 @@ defaultServiceVersion:
broker: 3.3.2
exporter: 1.6.0
+## @param define default kafka2 serviceVersion of each Component
+defaultKafka2ServiceVersion:
+ broker: 2.7.0
+
## @param debugEnabled enables containers' debug logging
##
debugEnabled: true
diff --git a/examples/kafka/cluster-2x-ext-zk-svc-descriptor.yaml b/examples/kafka/cluster-2x-ext-zk-svc-descriptor.yaml
new file mode 100644
index 000000000..bbfbe32d9
--- /dev/null
+++ b/examples/kafka/cluster-2x-ext-zk-svc-descriptor.yaml
@@ -0,0 +1,88 @@
+apiVersion: apps.kubeblocks.io/v1
+kind: Cluster
+metadata:
+ name: kafka2-ext-zk-descriptor
+ namespace: default
+spec:
+ terminationPolicy: Delete
+ # Specifies the name of the ClusterDefinition to use when creating a Cluster.
+ # Note: DO NOT UPDATE THIS FIELD
+ # The value must be `kafaka` to create a Kafka Cluster
+ clusterDef: kafka
+ # Specifies the name of the ClusterTopology to be used when creating the
+ # Cluster.
+ # - combined: combined Kafka controller (KRaft) and broker in one Component
+ # - combined_monitor: combined mode with monitor component
+ # - separated: separated KRaft and Broker Components.
+ # - separated_monitor: separated mode with monitor component
+ # Valid options are: [combined,combined_monitor,separated,separated_monitor,kafka2-external-zk]
+ topology: kafka2-external-zk
+ services:
+ - name: bootstrap
+ serviceName: bootstrap
+ componentSelector: kafka-broker
+ spec:
+ type: ClusterIP
+ ports:
+ - name: kafka-client
+ targetPort: 9092
+ port: 9092
+ componentSpecs:
+ - name: kafka-broker
+ componentDef: kafka27-broker
+ serviceRefs:
+ - name: kafkaZookeeper
+ namespace: default
+ serviceDescriptor: kafka-cluster-zookeeper-service
+ tls: false
+ replicas: 3
+ serviceAccountName: kb-kafka-cluster
+ env:
+ - name: KB_BROKER_DIRECT_POD_ACCESS
+ value: 'true'
+ - name: KAFKA_HEAP_OPTS
+ value: "-Xmx500m -Xms500m"
+ - name: KB_KAFKA_ZK_SUB_PATH
+ value: "test" # set sub path to use for sharing zk, default value is $(KB_CLUSTER_NAME), set empty string for root
+ resources:
+ limits:
+ cpu: '0.5'
+ memory: 0.5Gi
+ requests:
+ cpu: '0.5'
+ memory: 0.5Gi
+ volumeClaimTemplates:
+ - name: data
+ spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 20Gi
+ - name: metadata
+ spec:
+ storageClassName: null
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 5Gi
+---
+apiVersion: apps.kubeblocks.io/v1
+kind: ServiceDescriptor
+metadata:
+ name: kafka-cluster-zookeeper-service
+ namespace: default
+spec:
+ # Specifies the type or nature of the service.
+ # Should represent a well-known application cluster type, such as {mysql, redis, zookeeper}.
+ serviceKind: zookeeper
+ # Represents the version of the service reference.
+ serviceVersion: 3.8.5
+ # Represents the endpoint of the service connection credential.
+ endpoint:
+ # your external zk endpoints here
+ value: "zookeeper-cluster-zookeeper-0.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-1.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-2.zookeeper-cluster-zookeeper-headless.default:2181"
+ # Represents the port of the service connection credential.
+ port:
+ value: "2181"
\ No newline at end of file