Skip to content

Example: Continuous Data Self Balancing

lyx edited this page Jan 17, 2025 · 1 revision

You can create a Topic with multiple partitions and manually reassign the partitions to specific nodes to create an uneven distribution of partitions. Then, send an even load to all partitions and observe whether the partitions automatically reassign between different Brokers. This automatic reassignment is a built-in feature of AutoMQ, which ensures that data is automatically balanced across the cluster. By monitoring the distribution of partitions and the load on the Brokers, you can verify if the self-balancing feature is working as expected.

After successfully installing AutoMQ using the Cluster Deployment on Linux▸ method, you will get a list of Bootstrap Server addresses similar to the following:

192.168.0.1:9092,192.168.0.2:9092

If you installed using Deploy Locally▸, the Bootstrap Server address will be:

broker1:9092,broker2:9092

In all steps, make sure to replace the Bootstrap Server address with the actual address you obtained.

Prerequisites

  • Linux/Mac/Windows Subsystem for Linux

  • Docker

If downloading the container image is slow, refer to Docker Hub Mirror Configuration▸

  • Ensure that your AutoMQ cluster has the self-balancing feature enabled. For local deployment mode, this feature is enabled automatically. For other deployment modes, add the parameter --override autobalancer.controller.enable=true when starting the Controller.

Create a Topic


CMD='docker run  --network automq_net  automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --partitions 8 --create --topic continuous-self-balancing-topic --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Check Partition Distribution


CMD='docker run  --network automq_net   automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic continuous-self-balancing-topic --describe --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD


Topic: continuous-self-balancing-topic TopicId: DNZe6gBQTrCOEAruQ_y2tg PartitionCount: 8       ReplicationFactor: 1    Configs: min.insync.replicas=1,segment.bytes=1073741824
    Topic: continuous-self-balancing-topic Partition: 0    Leader: 2   Replicas: 2     Isr: 2
    Topic: continuous-self-balancing-topic Partition: 1    Leader: 1   Replicas: 1     Isr: 1
    Topic: continuous-self-balancing-topic Partition: 2    Leader: 1   Replicas: 1     Isr: 1
    Topic: continuous-self-balancing-topic Partition: 3    Leader: 2   Replicas: 2     Isr: 2
    Topic: continuous-self-balancing-topic Partition: 4    Leader: 1   Replicas: 1     Isr: 1
    Topic: continuous-self-balancing-topic Partition: 5    Leader: 2   Replicas: 2     Isr: 2
    Topic: continuous-self-balancing-topic Partition: 6    Leader: 1   Replicas: 1     Isr: 1
    Topic: continuous-self-balancing-topic Partition: 7    Leader: 2   Replicas: 2     Isr: 2  

Manual Reassignment of Partitions

To facilitate observing continuous data self-balancing, we will manually reassign the partitions to node2.


echo '{
    "partitions": [
        {"topic": "continuous-self-balancing-topic", "partition": 0, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 1, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 2, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 3, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 4, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 5, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 6, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 7, "replicas": [2]}
    ],
    "version": 1
}' > move.json && (CMD='docker run --network automq_net -v $(pwd)/move.json:/move.json automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server  broker1:9092,broker2:9092 --reassignment-json-file /move.json --execute"' ; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD) && rm move.json

The partition distribution after manual reassignment is as follows


Topic: continuous-self-balancing-topic TopicId: HtVB3bM7TYaNKKKmm7khQw PartitionCount: 8   ReplicationFactor: 1    Configs: min.insync.replicas=1,segment.bytes=1073741824
    Topic: continuous-self-balancing-topic Partition: 0    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 4    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 5    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 6    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 7    Leader: 2   Replicas: 2 Isr: 2

Start the Producer


CMD='docker run  --network automq_net  automqinc/automq:latest /bin/bash -c  "/opt/kafka/kafka/bin/kafka-producer-perf-test.sh --topic continuous-self-balancing-topic --num-records=1024000 --throughput 5120 --record-size 1024 --producer-props bootstrap.servers=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Start the Consumer


CMD='docker run --network automq_net  automqinc/automq:1.0.4 /bin/bash -c "/opt/kafka/kafka/bin/kafka-consumer-perf-test.sh --topic continuous-self-balancing-topic --show-detailed-stats --timeout 300000 --messages=1024000 --reporting-interval 1000 --bootstrap-server=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Check the Partition Distribution Again

After some time, you will observe the producer generating the following logs.


[2024-05-16 10:29:50,448] 25622 records sent, 5123.4 records/sec (5.00 MB/sec), 15.7 ms avg latency, 41.0 ms max latency.
[2024-05-16 10:30:00,372] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10354 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10354 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,385] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10357 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,385] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10360 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10360 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,411] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10361 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10362 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:29:55,450] 25327 records sent, 5064.4 records/sec (4.95 MB/sec), 15.3 ms avg latency, 80.0 ms max latency.

Wait for several seconds, and the production will resume normal operation. Then, check the partition status again.


Topic: continuous-self-balancing-topic TopicId: HtVB3bM7TYaNKKKmm7khQw PartitionCount: 8   ReplicationFactor: 1    Configs: min.insync.replicas=1,segment.bytes=1073741824
    Topic: continuous-self-balancing-topic Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: continuous-self-balancing-topic Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 4    Leader: 1   Replicas: 1 Isr: 1
    Topic: continuous-self-balancing-topic Partition: 5    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 6    Leader: 1   Replicas: 1 Isr: 1
    Topic: continuous-self-balancing-topic Partition: 7    Leader: 1   Replicas: 1 Isr: 1

It was observed that because we reassigned all partitions to node2, all messages were sent to node2, causing a local hotspot on node2. This triggered AutoMQ's self-balancing. AutoMQ reassigned the partitions to achieve a balanced state across all nodes.

Create Topic


CMD='docker run automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --partitions 16 --create --topic continuous-self-balancing-topic --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Check Partition Distribution


CMD='docker run  automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic continuous-self-balancing-topic --describe --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD


Topic: continuous-self-balancing-topic        TopicId: h_uuZ0WNSJ2guoCGLMMuIg        PartitionCount: 16        ReplicationFactor: 1        Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: continuous-self-balancing-topic        Partition: 0        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 1        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 2        Leader: 1        Replicas: 1        Isr: 1
        Topic: continuous-self-balancing-topic        Partition: 3        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 4        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 5        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 6        Leader: 1        Replicas: 1        Isr: 1
        Topic: continuous-self-balancing-topic        Partition: 7        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 8        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 9        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 10        Leader: 1        Replicas: 1        Isr: 1
        Topic: continuous-self-balancing-topic        Partition: 11        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 12        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 13        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 14        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 15        Leader: 0        Replicas: 0        Isr: 0

Manually Reassign Partitions


echo '{
    "partitions": [
        {"topic": "continuous-self-balancing-topic", "partition": 0, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 1, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 2, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 3, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 4, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 5, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 6, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 7, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 8, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 9, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 10, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 11, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 12, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 13, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 14, "replicas": [2]},
        {"topic": "continuous-self-balancing-topic", "partition": 15, "replicas": [2]}
    ],
    "version": 1
}' > move.json && (CMD='docker run -v $(pwd)/move.json:/move.json automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092 --reassignment-json-file /move.json --execute"' ; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD) && rm move.json

The partition distribution after manual reassignment is as follows


Topic: continuous-self-balancing-topic        TopicId: h_uuZ0WNSJ2guoCGLMMuIg        PartitionCount: 16        ReplicationFactor: 1        Configs: min.insync.replicas=1,segment.bytes=1073741824
    Topic: continuous-self-balancing-topic Partition: 0    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 4    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 5    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 6    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 7    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 8    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 9    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 10    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 11    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 12    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 13    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 14    Leader: 2   Replicas: 2 Isr: 2
    Topic: continuous-self-balancing-topic Partition: 15    Leader: 2   Replicas: 2 Isr: 2

Start the Producer


CMD='docker run  automqinc/automq:latest /bin/bash -c  "/opt/kafka/kafka/bin/kafka-producer-perf-test.sh --topic continuous-self-balancing-topic --num-records=1024000 --throughput 5120 --record-size 1024 --producer-props bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Start the Consumer


CMD='docker run  automqinc/automq:1.0.4 /bin/bash -c "/opt/kafka/kafka/bin/kafka-consumer-perf-test.sh --topic continuous-self-balancing-topic --show-detailed-stats --timeout 300000 --messages=1024000 --reporting-interval 1000 --bootstrap-server=192.168.0.1:9092,192.168.0.2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Recheck the Partition Distribution.

After some time, you will observe the following logs generated by the producer.


[2024-05-14 19:32:53,392] 25549 records sent, 5108.8 records/sec (4.99 MB/sec), 2.9 ms avg latency, 19.0 ms max latency.
[2024-05-14 19:33:00,913] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9455 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,917] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,917] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9455 on topic-partition continuous-self-balancing-topic-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,918] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,921] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9456 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,923] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,924] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9456 on topic-partition continuous-self-balancing-topic-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,924] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,925] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9457 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,925] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,925] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9457 on topic-partition continuous-self-balancing-topic-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,925] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,928] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9458 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,928] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,928] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9458 on topic-partition continuous-self-balancing-topic-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,928] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,929] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9459 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,929] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,930] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 9459 on topic-partition continuous-self-balancing-topic-5, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:33:00,930] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-5 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-14 19:32:58,394] 17129 records sent, 3425.1 records/sec (3.34 MB/sec), 3.4 ms avg latency, 43.0 ms max latency.

After waiting for a few seconds, production will resume normal operation. Then, check the partition status again.


Topic: continuous-self-balancing-topic        TopicId: h_uuZ0WNSJ2guoCGLMMuIg        PartitionCount: 16        ReplicationFactor: 1        Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: continuous-self-balancing-topic        Partition: 0        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 1        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 2        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 3        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 4        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 5        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 6        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 7        Leader: 1        Replicas: 1        Isr: 1
        Topic: continuous-self-balancing-topic        Partition: 8        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 9        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 10        Leader: 4        Replicas: 4        Isr: 4
        Topic: continuous-self-balancing-topic        Partition: 11        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 12        Leader: 0        Replicas: 0        Isr: 0
        Topic: continuous-self-balancing-topic        Partition: 13        Leader: 3        Replicas: 3        Isr: 3
        Topic: continuous-self-balancing-topic        Partition: 14        Leader: 2        Replicas: 2        Isr: 2
        Topic: continuous-self-balancing-topic        Partition: 15        Leader: 2        Replicas: 2        Isr: 2

It is observed that since we reassigned all partitions to node2, all messages are sent to node2, creating a localized hotspot on node2. This triggers AutoMQ's self-balancing. AutoMQ will reassign partitions to balance across all nodes.

AutoMQ Wiki Key Pages

What is automq

Getting started

Architecture

Deployment

Migration

Observability

Integrations

Releases

Benchmarks

Reference

Articles

Clone this wiki locally