-
Notifications
You must be signed in to change notification settings - Fork 231
Example: Self Balancing when Cluster Nodes Change
You can create a Topic with 16 partitions and send a balanced load. When starting and stopping Brokers, observe if partitions are reassigned automatically among different Brokers. This reassignment is a built-in feature of AutoMQ, ensuring automatic self-balancing distribution of data across the cluster. By monitoring the distribution of partitions and the load on Brokers, you can verify if the self-balancing feature works as expected.
After successfully installing AutoMQ using Cluster Deployment on Linux▸, you will get a list of Bootstrap Server addresses similar to the following:
192.168.0.1:9092,192.168.0.2:9092
If you installed it via Deploy Locally▸, the Bootstrap Server addresses will be:
broker1:9092,broker2:9092
In all steps, ensure to replace the Bootstrap Server addresses with the actual addresses you obtained.
-
Linux/Mac/Windows Subsystem for Linux
-
Docker
If downloading the container image is slow, refer to Docker Hub Mirror Configuration▸
- Ensure that the self-balancing feature is enabled for your AutoMQ cluster. If using local deployment mode, this feature is enabled automatically. For other deployment modes, add the parameter --override autobalancer.controller.enable=true when starting the Controller.
CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --partitions 16 --create --topic self-balancing-topic --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic self-balancing-topic --describe --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Topic: self-balancing-topic TopicId: AjoAB22YRRq7w6MdtZ4hDA PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,elasticstream.replication.factor=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 7 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 8 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 11 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 12 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 14 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 15 Leader: 2 Replicas: 2 Isr: 2
CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-producer-perf-test.sh --topic self-balancing-topic --num-records=1024000 --throughput 5120 --record-size 1024 --producer-props bootstrap.servers=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
CMD='docker run --network automq_net automqinc/automq:1.0.4 /bin/bash -c "/opt/kafka/kafka/bin/kafka-consumer-perf-test.sh --topic self-balancing-topic --show-detailed-stats --timeout 300000 --messages=1024000 --reporting-interval 1000 --bootstrap-server=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Shutdown one Broker, causing its partitions to be reassigned to other Brokers. After shutdown, you can observe the recovery of the producer and consumer.
docker stop broker2
When shutting down the broker, you will see the following logs from the producer:
[2024-04-29 05:00:03,436] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 49732 on topic-partition self-balancing-topic-7, retrying (2147483641 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-04-29 05:00:03,438] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
After waiting a few seconds, you will see that production and consumption return to normal.
2024-05-07 11:56:08,920] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-3 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 11:56:08,920] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 42141 on topic-partition self-balancing-topic-3, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 11:56:08,920] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-3 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 11:56:08,588] 25693 records sent, 5138.6 records/sec (5.02 MB/sec), 8.9 ms avg latency, 1246.0 ms max latency.
[2024-05-07 11:56:13,589] 25607 records sent, 5120.4 records/sec (5.00 MB/sec), 1.8 ms avg latency, 44.0 ms max latency.
[2024-05-07 11:56:18,591] 25621 records sent, 5121.1 records/sec (5.00 MB/sec), 1.6 ms avg latency, 10.0 ms max latency.
After the producer resumes writing, we check the partition distribution again and see that all partitions are distributed on broker1. AutoMQ automatically and quickly completes the reassignment of partitions from the stopped Broker and the self-balancing of traffic.
CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic self-balancing-topic --describe --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Topic: self-balancing-topic TopicId: AjoAB22YRRq7w6MdtZ4hDA PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,elasticstream.replication.factor=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 8 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 9 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 11 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 12 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 14 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 15 Leader: 1 Replicas: 1 Isr: 1
Restart the broker.
Restart broker2 again to trigger automatic partition reassignment; the producer and consumer can resume after several seconds of retrying.
docker start broker2
At this point, confirm the partition distribution again, and you will see that the partitions have completed automatic reassignment.
Topic: self-balancing-topic TopicId: AjoAB22YRRq7w6MdtZ4hDA PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,elasticstream.replication.factor=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 8 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 11 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 12 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 13 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 14 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 15 Leader: 1 Replicas: 1 Isr: 1
CMD='docker run automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --partitions 16 --create --topic self-balancing-topic --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
CMD='docker run automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic self-balancing-topic --describe --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Topic: self-balancing-topic TopicId: 7mYxhYliSMq-FCrl4pcD5Q PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 8 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 9 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 11 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 12 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 13 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 14 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 15 Leader: 3 Replicas: 3 Isr: 3
CMD='docker run automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-producer-perf-test.sh --topic self-balancing-topic --num-records=1024000 --throughput 5120 --record-size 1024 --producer-props bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
CMD='docker run automqinc/automq:1.0.4 /bin/bash -c "/opt/kafka/kafka/bin/kafka-consumer-perf-test.sh --topic self-balancing-topic --show-detailed-stats --timeout 300000 --messages=1024000 --reporting-interval 1000 --bootstrap-server=192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Shut down a Broker to reassign its partitions to other Brokers. After shutting it down, you can observe the recovery process of producers and consumers.
jps | grep Kafka | awk '{print $1}' | xargs kill
When shutting down the broker, you will see the following logs from the producer:
[2024-04-29 05:00:03,436] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 49732 on topic-partition self-balancing-topic-7, retrying (2147483641 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-04-29 05:00:03,438] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
After waiting for a few seconds, you will see the production and consumption return to normal.
[2024-05-07 10:59:16,474] 25616 records sent, 5123.2 records/sec (5.00 MB/sec), 1.7 ms avg latency, 16.0 ms max latency.
[2024-05-07 10:59:26,238] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 121226 on topic-partition self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 10:59:26,240] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 10:59:26,241] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 121227 on topic-partition self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 10:59:26,241] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-07 10:59:21,477] 25599 records sent, 5114.7 records/sec (4.99 MB/sec), 1.8 ms avg latency, 19.0 ms max latency.
[2024-05-07 10:59:26,486] 25667 records sent, 5132.4 records/sec (5.01 MB/sec), 4.8 ms avg latency, 2284.0 ms max latency.
After the producer resumes writing, recheck the partition distribution. You will see all partitions are now distributed on broker1. AutoMQ automatically and swiftly completed the reassignment of partitions and the self-balancing of the traffic from the stopped Broker.
CMD='docker run automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic self-balancing-topic --describe --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD
Topic: self-balancing-topic TopicId: 7mYxhYliSMq-FCrl4pcD5Q PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 7 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 8 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 9 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 11 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 12 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 13 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 14 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 15 Leader: 2 Replicas: 2 Isr: 2
Restart broker2 to trigger the automatic reassignment of partitions. After several seconds of retries, the producers and consumers will resume normal operation.
Refer to Cluster Deployment on Linux▸ for the method to start Broker2.
At this point, recheck the partition distribution, and you will find that the partitions have already completed automatic reassignment.
Topic: self-balancing-topic TopicId: 7mYxhYliSMq-FCrl4pcD5Q PartitionCount: 16 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: self-balancing-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 2 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 3 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 6 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 8 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 9 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: self-balancing-topic Partition: 11 Leader: 4 Replicas: 4 Isr: 4
Topic: self-balancing-topic Partition: 12 Leader: 3 Replicas: 3 Isr: 3
Topic: self-balancing-topic Partition: 13 Leader: 2 Replicas: 2 Isr: 2
Topic: self-balancing-topic Partition: 14 Leader: 0 Replicas: 0 Isr: 0
Topic: self-balancing-topic Partition: 15 Leader: 1 Replicas: 1 Isr: 1
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration