From d71bda80352b35a630c2fc55d75b4be0b2ff0b2d Mon Sep 17 00:00:00 2001 From: Davinder Pal Date: Mon, 26 Dec 2022 23:56:41 +0200 Subject: [PATCH 1/3] * Initial Docs for Kraft * move docs to separate folder --- Vagrantfile | 4 ++-- NewRelic-Infra.md => docs/NewRelic-Infra.md | 0 Splunk-Config.md => docs/Splunk-Config.md | 0 kafka-manager.md => docs/kafka-manager.md | 0 docs/kraft-installation.md | 25 +++++++++++++++++++++ inventory/development/cluster.ini | 14 ++++++++---- 6 files changed, 37 insertions(+), 6 deletions(-) rename NewRelic-Infra.md => docs/NewRelic-Infra.md (100%) rename Splunk-Config.md => docs/Splunk-Config.md (100%) rename kafka-manager.md => docs/kafka-manager.md (100%) create mode 100644 docs/kraft-installation.md diff --git a/Vagrantfile b/Vagrantfile index b015230..4b0ea12 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -6,11 +6,11 @@ Vagrant.configure("2") do |config| (1..cluster_nodes).each do |i| config.vm.define "kafka-#{i}" do |node| node.vm.box = "ubuntu/focal64" - node.vm.hostname = "kafka#{i}" + node.vm.hostname = "kafka#{i}.localhost" node.vm.network :private_network, ip: "192.168.56.10#{i}" # expose JMX port node.vm.network "forwarded_port", guest: 9999, host: "1000#{i}", protocol: "tcp" - # node.vm.provision :hosts, :sync_hosts => true + node.vm.provision :hosts, :add_localhost_hostnames => false, :sync_hosts => true # required to autogenerate /etc/hosts on all nodes end end # Setting CPU and Memory for All machines diff --git a/NewRelic-Infra.md b/docs/NewRelic-Infra.md similarity index 100% rename from NewRelic-Infra.md rename to docs/NewRelic-Infra.md diff --git a/Splunk-Config.md b/docs/Splunk-Config.md similarity index 100% rename from Splunk-Config.md rename to docs/Splunk-Config.md diff --git a/kafka-manager.md b/docs/kafka-manager.md similarity index 100% rename from kafka-manager.md rename to docs/kafka-manager.md diff --git a/docs/kraft-installation.md b/docs/kraft-installation.md new file mode 100644 index 0000000..4640f03 --- /dev/null +++ b/docs/kraft-installation.md @@ -0,0 +1,25 @@ +# Kraft based installation Notes + +## Variables +* **cluster id** should be a valid `guid/uuid` but converted to base64 + 23 characters only. +We can use https://www.fileformat.info/tool/guid-base64.htm or use python + +```python +import uuid +import base64 + +base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("utf-8").strip("=") +``` + +```bash +$ python3 +Python 3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0] on linux +>>> import uuid +>>> import base64 +>>> base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("utf-8").strip("=") +'8lcyWObISQm7F8E3oahF5w' +``` + + +## References +* https://kafka.apache.org/documentation/#quickstart_startserver diff --git a/inventory/development/cluster.ini b/inventory/development/cluster.ini index 7fd7950..f6e7c6b 100644 --- a/inventory/development/cluster.ini +++ b/inventory/development/cluster.ini @@ -6,17 +6,23 @@ ansible_user=vagrant #ansible_ssh_private_key_file = ~/.vagrant.d/insecure_private_key [clusterNodes] -192.168.56.10[1:3] kafkaBrokerRackId=1 +kafka1.localhost ansible_host=192.168.56.101 +kafka2.localhost ansible_host=192.168.56.102 +kafka3.localhost ansible_host=192.168.56.103 [clusterAddNodes] [clusterRemoveNodes] -192.168.56.10[1:3] +kafka1.localhost ansible_host=192.168.56.101 +kafka2.localhost ansible_host=192.168.56.102 +kafka3.localhost ansible_host=192.168.56.103 [kafka-manager] -192.168.56.101 +kafka1.localhost ansible_host=192.168.56.101 [kafka-mirror-maker] [kafka-mirror-maker-remove-nodes] -192.168.56.10[1:3] \ No newline at end of file +kafka1.localhost ansible_host=192.168.56.101 +kafka2.localhost ansible_host=192.168.56.102 +kafka3.localhost ansible_host=192.168.56.103 \ No newline at end of file From 156d7b1378caea62fe5a921e56717ffce357eb1b Mon Sep 17 00:00:00 2001 From: Davinder Pal Date: Fri, 10 Mar 2023 22:57:16 +0200 Subject: [PATCH 2/3] * update vagrant * update kafka version * update java version to 17 --- Vagrantfile | 2 +- inventory/development/group_vars/all.yml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index 4b0ea12..ba9c543 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -5,7 +5,7 @@ Vagrant.configure("2") do |config| (1..cluster_nodes).each do |i| config.vm.define "kafka-#{i}" do |node| - node.vm.box = "ubuntu/focal64" + node.vm.box = "ubuntu/jammy64" node.vm.hostname = "kafka#{i}.localhost" node.vm.network :private_network, ip: "192.168.56.10#{i}" # expose JMX port diff --git a/inventory/development/group_vars/all.yml b/inventory/development/group_vars/all.yml index 1f5396e..3678256 100644 --- a/inventory/development/group_vars/all.yml +++ b/inventory/development/group_vars/all.yml @@ -1,6 +1,6 @@ --- -javaVersion: 11 +javaVersion: 17 updateJava: false # only when you need to update java version useSystemFirewall: false @@ -36,12 +36,12 @@ kafkaZookeeperConnectionTimeoutMs: 10000 # please use name like -kafka, so it get distinct name in NewRelic kafkaClusterName: development-kafka -kafkaVersion: 3.2.1 +kafkaVersion: 3.4.0 kafkaScalaVersion: 2.13 kafkaTarLocation: "~/projects/kafka-cluster-ansible/kafka_{{ kafkaScalaVersion }}-{{ kafkaVersion }}.tgz" # below will be used while decommission/downgrade only -kafkaOldVersion: 3.0.0 +kafkaOldVersion: 3.2.1 kafkaOldScalaVersion: 2.13 ### Production Optimized Parameters From 8aa32d9d9b82c9195b2a64a02a5494efa39c9f7f Mon Sep 17 00:00:00 2001 From: Davinder Pal Date: Tue, 11 Apr 2023 23:23:32 +0300 Subject: [PATCH 3/3] commit before pc reset --- Readme.md | 2 + clusterAddNodes.yml | 2 +- docs/migrations/from_3_3_to_3_4.md | 0 inventory/development/cluster-raft.ini | 27 ++++ .../templates/kraft/broker.properties | 129 ++++++++++++++++++ .../templates/kraft/controller.properties | 122 +++++++++++++++++ .../templates/kraft/server.properties | 122 +++++++++++++++++ 7 files changed, 403 insertions(+), 1 deletion(-) create mode 100644 docs/migrations/from_3_3_to_3_4.md create mode 100644 inventory/development/cluster-raft.ini create mode 100644 roles/configure/templates/kraft/broker.properties create mode 100644 roles/configure/templates/kraft/controller.properties create mode 100644 roles/configure/templates/kraft/server.properties diff --git a/Readme.md b/Readme.md index 4bb2422..d6b2a2a 100644 --- a/Readme.md +++ b/Readme.md @@ -24,6 +24,7 @@ It is a group of playbooks to manage apache kafka. * **Open Source Web-UI** https://github.com/yahoo/kafka-manager * **Zookeeper Installation** https://github.com/116davinder/zookeeper-cluster-ansible +* [Migration Docs](./docs/migrations/) ## **Development Setup with Vagrant** @@ -183,6 +184,7 @@ It will be installed in similar way to apache kafka but it will start apache kaf ### **Kafka Tested Version** * 2.x +* 3.x ### **Tested OS** * CentOS 7 diff --git a/clusterAddNodes.yml b/clusterAddNodes.yml index d2991fa..9d86603 100644 --- a/clusterAddNodes.yml +++ b/clusterAddNodes.yml @@ -1,6 +1,6 @@ --- -- hosts: clusterAddNodes +- hosts: {{ target | default('clusterAddNodes') }} gather_facts: true become: true serial: 1 diff --git a/docs/migrations/from_3_3_to_3_4.md b/docs/migrations/from_3_3_to_3_4.md new file mode 100644 index 0000000..e69de29 diff --git a/inventory/development/cluster-raft.ini b/inventory/development/cluster-raft.ini new file mode 100644 index 0000000..75a4022 --- /dev/null +++ b/inventory/development/cluster-raft.ini @@ -0,0 +1,27 @@ +[all:vars] +ansible_connection=ssh +ansible_become_method=sudo +ansible_become=true +ansible_user=vagrant +#ansible_ssh_private_key_file = ~/.vagrant.d/insecure_private_key + +[clusterNodes:children] +clusterController +clusterBroker + +[clusterController] +kafka1.localhost ansible_host=192.168.56.101 + +[clusterBroker] +kafka2.localhost ansible_host=192.168.56.102 + +[clusterAddBrokerNodes] +kafka3.localhost ansible_host=192.168.56.103 + +[clusterRemoveNodes] +kafka1.localhost ansible_host=192.168.56.101 +kafka2.localhost ansible_host=192.168.56.102 +kafka3.localhost ansible_host=192.168.56.103 + +[kafka-manager] +kafka1.localhost ansible_host=192.168.56.101 diff --git a/roles/configure/templates/kraft/broker.properties b/roles/configure/templates/kraft/broker.properties new file mode 100644 index 0000000..13b8595 --- /dev/null +++ b/roles/configure/templates/kraft/broker.properties @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. See config/kraft/README.md for details. +# + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker + +# The node id associated with this instance's roles +node.id=2 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9093 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners = PLAINTEXT://:{{ kafkaPort }} + +# Name of listener used for communication between brokers. +inter.broker.listener.name=PLAINTEXT + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners = PLAINTEXT://:{{ kafkaPort }} + +# A comma-separated list of the names of the listeners used by the controller. +# This is required if running in KRaft mode. On a node with `process.roles=broker`, only the first listed listener will be used by the broker. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads={{ kafkaNumNetworkThreads | default(3) }} + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads={{ kafkaNumIoThreads | default(8) }} + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes={{ kafkaSocketSendBufferBytes | default(102400) }} + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes={{ kafkaSocetReceiveBufferBytes | default(102400) }} + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes={{ kafkaSocetRequestMaxBytes | default(104857600) }} + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs = {{ kafkaDataDir }} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={{ kafkaNumPartitions | default(1) }} + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir={{ kafkaNumRecoveryThreadsPerDataDir | default(1) }} + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor={{ kafkaOffsetsTopicReplicationFactor | default(1) }} +transaction.state.log.replication.factor={{ kafkaTransactionStateLogReplicationFactor | default(1) }} +transaction.state.log.min.isr={{ kafkaTransactionStateLogMinIsr | default(1) }} + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages={{ kafkaLogFlushIntervalMessages | default(10000)}} + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms={{ kafkaLogFlushIntervalMs | default(1000) }} + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours={{ kafkaLogRetentionHours | default(168) }} + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes={{ kafkaLogSegmentBytes | default(1073741824) }} + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms={{ kafkalogRetentionCheckIntervalMs | default(300000) }} diff --git a/roles/configure/templates/kraft/controller.properties b/roles/configure/templates/kraft/controller.properties new file mode 100644 index 0000000..95e98ae --- /dev/null +++ b/roles/configure/templates/kraft/controller.properties @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. See config/kraft/README.md for details. +# + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles=controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9093 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. +# Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=CONTROLLER://:{{ kafkaControllerPort | default(9093) }} + +# A comma-separated list of the names of the listeners used by the controller. +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads={{ kafkaNumNetworkThreads | default(3) }} + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads={{ kafkaNumIoThreads | default(8) }} + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes={{ kafkaSocketSendBufferBytes | default(102400) }} + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes={{ kafkaSocetReceiveBufferBytes | default(102400) }} + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes={{ kafkaSocetRequestMaxBytes | default(104857600) }} + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs = {{ kafkaDataDir }} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={{ kafkaNumPartitions | default(1) }} + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir={{ kafkaNumRecoveryThreadsPerDataDir | default(1) }} + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor={{ kafkaOffsetsTopicReplicationFactor | default(1) }} +transaction.state.log.replication.factor={{ kafkaTransactionStateLogReplicationFactor | default(1) }} +transaction.state.log.min.isr={{ kafkaTransactionStateLogMinIsr | default(1) }} + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages={{ kafkaLogFlushIntervalMessages | default(10000)}} + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms={{ kafkaLogFlushIntervalMs | default(1000) }} + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours={{ kafkaLogRetentionHours | default(168) }} + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes={{ kafkaLogSegmentBytes | default(1073741824) }} + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms={{ kafkalogRetentionCheckIntervalMs | default(300000) }} diff --git a/roles/configure/templates/kraft/server.properties b/roles/configure/templates/kraft/server.properties new file mode 100644 index 0000000..556b82d --- /dev/null +++ b/roles/configure/templates/kraft/server.properties @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. See config/kraft/README.md for details. +# + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles={{ kafkaProcessRoles | default(broker)}} + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9093 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. +# Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=CONTROLLER://:{{ kafkaControllerPort | default(9093) }} + +# A comma-separated list of the names of the listeners used by the controller. +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads={{ kafkaNumNetworkThreads | default(3) }} + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads={{ kafkaNumIoThreads | default(8) }} + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes={{ kafkaSocketSendBufferBytes | default(102400) }} + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes={{ kafkaSocetReceiveBufferBytes | default(102400) }} + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes={{ kafkaSocetRequestMaxBytes | default(104857600) }} + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs = {{ kafkaDataDir }} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={{ kafkaNumPartitions | default(1) }} + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir={{ kafkaNumRecoveryThreadsPerDataDir | default(1) }} + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor={{ kafkaOffsetsTopicReplicationFactor | default(1) }} +transaction.state.log.replication.factor={{ kafkaTransactionStateLogReplicationFactor | default(1) }} +transaction.state.log.min.isr={{ kafkaTransactionStateLogMinIsr | default(1) }} + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages={{ kafkaLogFlushIntervalMessages | default(10000)}} + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms={{ kafkaLogFlushIntervalMs | default(1000) }} + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours={{ kafkaLogRetentionHours | default(168) }} + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes={{ kafkaLogSegmentBytes | default(1073741824) }} + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms={{ kafkalogRetentionCheckIntervalMs | default(300000) }}