Skip to content

How to Implement Self Balancing for Kafka on S3

lyx edited this page Jan 17, 2025 · 1 revision

Introduction

In an online Apache Kafka® cluster, fluctuations in traffic, creation and deletion of Topics, and the disappearance and startup of Brokers can occur at any time. These changes may lead to uneven distribution of traffic across cluster nodes, resulting in resource wastage and impacting business stability. To address this, it is necessary to proactively reassign different partitions of a Topic across the nodes to balance traffic and data.

Currently, Apache Kafka® only offers tools for partition reassignment, but the specific reassignment plans need to be determined by the operations staff. For Kafka clusters that often scale to hundreds or even thousands of nodes, it is nearly impossible for humans to monitor the cluster status and devise a comprehensive partition reassignment plan. As a result, the community has developed third-party plugins like Cruise Control for Apache Kafka[1] to assist in creating reassignment plans. However, due to the complexity involved in the self-balancing process in Apache Kafka® (decisions on replica distribution, leader traffic distribution, node resource utilization, etc.), and the resource contention and hours or even days-long time consumption due to data synchronization during the self-balancing process, existing solutions are complex and have low timeliness in decision-making. In practice, executing a self-balancing strategy still relies on the scrutiny and continuous monitoring by operations staff, which does not truly resolve the issues brought about by data self-balancing in Apache Kafka®.

Shared storage simplifies the implementation of self-balancing

The reason our project, AutoMQ[7], can achieve Kafka cluster self-balancing in a more elegant manner is key to its S3-based shared storage architecture. AutoMQ has completely re-implemented the underlying storage of Apache Kafka based on cloud object storage [2], resulting in the following advantages:

  • A complete separation of storage and compute architecture, where the Broker only needs to retain a minimal amount of Block storage for Delta WAL[3], with the remaining data relegated to Object storage, visible within the cluster.

  • High availability is ensured with EBS and Object storage, requiring only a single replica per partition.

Based on these advantages, the decision-making factors for partition reassignment have been greatly simplified:

  • There is no need to consider the disk resources of the nodes.

  • There is no need to consider the distribution of the partition's Leader and replicas.

  • Partition reassignment does not involve data synchronization or copying.

Hence, we have the opportunity to implement a built-in, lightweight auto balancing component within AutoMQ, continuously monitoring the cluster status and automatically executing partition reassignments.

Self-balancing Component

Overall Architecture

import IRIUNC from './Ns7Nw3rKOi098xk0O8DcNULantd/1.png';

The implementation of the Continuous Self-Balancing Component (AutoBalancer) mainly consists of the following three parts:

  1. Metrics Collection

  2. State Maintenance

  3. Decision Scheduling

In addition to the Broker side completing metrics collection, state awareness and decision scheduling are handled by the Active Controller, ensuring the availability of the scheduling components is consistent with the Controller KRaft Group.

Metrics Collection

Apache Kafka® natively provides a metrics collection system implemented based on Yammer Metrics[4] and KafkaMetrics[5], and it is possible to monitor these two types of metrics through the MetricsRegistry and MetricsReporter interfaces. We has implemented AutoBalancerMetricsReporter based on these interfaces, which can periodically collect preset metrics such as network traffic throughput or other custom metrics or parameters.

Similarly to most Kafka monitoring implementations in the industry, we utilizes an internal Topic to transmit metrics between the Broker and Controller. After completing a metric collection, the AutoBalancerMetricsReporter compiles all metrics into multiple messages and sends them to the internal Topic, thereby accomplishing metric reporting on the Broker side.

State Maintenance

On the Controller side, we maintains a ClusterModel, a model representing the current state of the cluster’s Brokers, as well as the distribution and load of partitions across them. Structural changes to the ClusterModel, such as Broker additions and removals, or the reassignment and deletion of partitions, are achieved by monitoring changes to KRaft records, ensuring that the ClusterModel structure is consistent with the metadata.

Meanwhile, the Controller continuously consumes from the internal Topic, preprocesses the extracted metrics, and updates the ClusterModel, thus obtaining a model that accurately reflects the current state of the cluster.

Decision Scheduling

Each Controller in AutoMQ maintains a respective ClusterModel, but only the Active Controller engages in decision-making and scheduling. When the Active Controller changes, the responsibility for decision-making and scheduling also shifts to the current Active Controller. Before starting the decision-making process, AutoMQ takes a snapshot of the ClusterModel and uses this snapshot state for subsequent scheduling. Once the snapshot is complete, the ClusterModel can continue to be updated. AutoMQ’s decision-making process employs a heuristic scheduling algorithm similar to Cruise Control, as illustrated below:

import VNIMOA from './Ns7Nw3rKOi098xk0O8DcNULantd/2.png';

The focus of the decision-making lies in defining a reasonable target. A target is a desired outcome to be achieved through scheduling, such as achieving traffic balance, limiting the number of partitions per Broker, or capping the traffic on a single Broker. Once the target is set, two key issues need to be addressed:

  1. How to determine whether a Broker meets the current target

  2. How to determine if a partition reassignment is feasible

Assessing whether the Broker meets the target

We can represent whether a Broker's current state meets the target using a simple mathematical model:

$$ f(broker) \in {true,:false} $$

Here, the input is the Broker State Model (which includes the partitions and traffic on the Broker), and the output is either true or false, indicating whether the current target is met.

Taking traffic balance as an example, first calculate the average traffic $load_{avg} $based on the total traffic of the cluster and the number of brokers, then calculate the desired traffic distribution range using a preset deviation coefficient $ratio$ (acceptable traffic deviation from the average):

$$ [load_{lo},:load_{hi}] = [load_{avg} \cdot (1 - ratio),: load_{avg} \cdot (1 + ratio)] $$

If the Broker's current traffic falls within this range, then it is considered that the Broker meets the target. Considering real-world scenarios, when the cluster traffic is low, the scheduling error is significant, and conducting traffic balancing scheduling is less meaningful, thus an additional traffic scheduling threshold is set. If the Broker’s traffic falls below this threshold, the target is also considered met. Overall, the model representing whether the traffic balancing target is met can be described as follows:

$$ f(broker) = \begin{cases} true &\text {if}: load \le throshold \ true &\text{if}: load_{lo} \le load \le load_{hi} \ false &\text {otherwise}\end{cases} $$

Assessing the feasibility of a partition reassignment

A partition reassignment (hereafter referred to as Action) includes three parts: the partition to be moved, the source Broker, and the target Broker. By utilizing the previously defined model to assess changes in the Broker's state before and after the Action, it can be represented as follows:

$$ {f(src_{before}),:f(dest_{before})} \Rightarrow {f(src_{after}),:f(dest_{after})} $$

Each $f(broker)$ function has two possible values, resulting in 16 possible state transitions. Among these, only a subset allows for a definite conclusion about the feasibility of an Action, while the rest do not provide enough information to make a judgment due to constraints in the length of this article. Here we provide some examples for illustration.

  1. ${false,:false} \Rightarrow {true,:true}$: The movement causes both brokers to shift from not meeting the target to meeting the target, indicating that this Action has a positive effect and is acceptable.

  2. ${true,: true} \Rightarrow {false,:false}$: The movement causes both brokers to shift from meeting the target to not meeting the target, indicating that this Action has a negative effect and should be rejected.

  3. ${false,false} \Rightarrow {false,:false}$: It is difficult to judge; although the assessment of the brokers in terms of meeting the target remains unchanged before and after the move, it's unclear whether this action brings a positive effect, making the action unacceptable.

From the above, it is clear that representing the state of brokers with a binary output function does not yield clear scheduling decisions in all scenarios. To address this issue, we need to define a more flexible mathematical model that produces consistent and idempotent decision results when the input cluster state is consistent, achieving a stable, interpretable multi-objective scheduling mechanism. We define the function$f_{score}(action) \in [0,:1.0] $to represent the score of an Action under a single objective, with scoring as follows:

  • score < 0.5: Indicates that the Action negatively impacts the current objective. If the current objective is a mandatory goal (Hard Goal), such as limiting broker partition numbers or capping broker traffic, this Action is outright rejected. Non-mandatory goals include traffic balancing, QPS balancing, etc.

  • score = 0.5: Indicates that the Action does not impact the current objective. For example, the number of partitions and traffic on the Broker are within limits, or the load balancing of the Broker remains unchanged.

  • score > 0.5: Indicates that the Action positively impacts the current objective, such as bringing the Broker's traffic from exceeding the limit back within bounds, or making the distribution of Broker traffic more balanced.

Action scores for a single objective

To calculate this function, we first define the score of a Broker under the current objective as:

$$ f_{score}(broker) \in[0.0,:1.0] $$

Then, the difference in the minimum score of the Broker after one partition reassignment can be calculated as follows:

$$ diff = \min(f_{score}(src_{after}), f_{score}(dest_{after})) - \min(f_{score}(src_{before}), f_{score}(dest_{before})) \in [-1.0, 1.0] $$

After normalizing the score differences, we obtain:

$$ f_{score}(action) = \frac{diff + 1}{2} \in [0.0, 1.0] $$

Comprehensive score of Action across multiple objectives

Following the calculations above, we can now determine the scores of Action for various objectives, thus computing the comprehensive score across multiple objectives. Due to the normalization, we can directly add the weighted sum of scores for different objectives on the same scale:

$$ score_{multi} = \sum_{0 \le i \lt n}{w_i * f_{score_i}} $$

When it's necessary to choose the best Action among multiple options, simply select the one with the highest score among all Actions with scores greater than 0.5.

Broker scores for a single objective

Now, we only need to determine the scoring model for a broker on a singular target, $f_{score}(broker)$, to calculate the composite score of an Action. According to the aforementioned agreement, this model must meet the following conditions:

  1. The score range should be normalized to $[0.0,:1.0]$; otherwise, the scales of scores from different targets are inconsistent, rendering the weighted sum non-referential.

  2. When an Action is applied to a broker, if it is believed that the Action has no impact on the broker, then the score before and after application should remain the same.

  3. When an Action is applied to a broker, if it is believed that the Action has a negative impact on the broker, then the score before application should be greater than after.

  4. When an Action is applied to a broker, if it is believed that the Action has a positive impact on the broker, then the score before application should be less than after.

Taking the auto balancing goal currently built into AutoMQ as an example, the defined Broker scoring model is:

$$ ua = \lvert{usage_{avg}} - usage\rvert \in [0.0, +inf) $$

$$ f(ua) = \begin{cases} 1.0 &\text{if } ua \le bound \1.0 - 0.9 * \dfrac{ua - bound}{var - bound} &\text {if } bound \le ua \le var \\dfrac{0.1}{log_{var}ua} &\text {if ua > var}\end{cases} \in [0.0, 1.0] $$

Where:

$ua$: Represents the absolute value of the difference between current traffic and the average traffic.

$bound$: Values within this range indicate that the current traffic is within the average range.

$var$: The base of the logarithmic function, this parameter determines when the step function transitions from a linear to a logarithmic decline.

The larger the $var$ value, the greater the deviation from the expected value, resulting in a lower score. When the deviation is within the average range, the score remains unchanged, indicating that reassignment of partitions within this range has no impact on the target. When the deviation is greater than the average range but less than the $var$ value, the score decreases linearly with the offset. When the deviation exceeds the $var$ value, as the offset approaches infinity, the score tends towards zero. The function curve is as follows (axes are scaled for readability):

import RDRNAE from './Ns7Nw3rKOi098xk0O8DcNULantd/3.png';

The semantics of this function model are:

  1. When an Action keeps the Broker's traffic within the balanced range, it is considered to have no impact on the cluster.

  2. When an Action reduces the deviation of the Broker's traffic from the expected value, it is considered to have a positive impact.

  3. When an Action increases the deviation of the Broker's utilization from the expected value, it is considered to have a negative impact. Since the balancing target is a non-mandatory goal (Soft goal), even if the Action has a negative impact on the current target, whether it is adopted still depends on the comprehensive score of the Action across all targets. As long as the comprehensive score is greater than 0.5, the Action is still considered executable.

The following figure demonstrates the traffic self-balancing effects using the above mathematical model (Image source: AutoMQ internal LongRunning monitoring dashboard):

import JTOGBV from './Ns7Nw3rKOi098xk0O8DcNULantd/4.png';

Conclusion

In this article, we have extensively explored how AutoMQ, through its built-in auto-balancing component, AutoBalancer, achieves continuous self-balancing of partitions. We also discussed how mathematical models are utilized to produce explainable and observable scheduling decisions. Going forward, we will continue to refine our scheduling models to better suit complex production environments and contribute advanced scheduling capabilities such as cold-read identification and traffic prediction to the community. We also welcome community developers to collaborate with us in developing more efficient and diverse self-balancing capabilities.

References

[1] Cruise Control for Apache Kafka: https://github.com/linkedin/cruise-control

[2] A Distinctive Cloud-Native Architecture of AutoMQ: https://docs.automq.com/docs/automq-s3kafka/X1DBwDdzWiCMmYkglGHcKdjqn9f

[3] Analysis of Principles: How AutoMQ Achieves High Performance WAL on Bare Devices: Analysis of Principles: How AutoMQ Achieves High Performance WAL on Bare Devices

[4] Yammer metrics maven dependency: https://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0

[5] KafkaMetrics.java: https://github.com/AutoMQ/automq/blob/main/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java

[6] Introduction to Kafka Cruise Control (Slides 37/47): https://www.slideshare.net/slideshow/introduction-to-kafka-cruise-control-68180931/68180931

[7] AutoMQ: A Cloud-Native fork of Kafka by separating storage to S3: https://github.com/AutoMQ/automq

AutoMQ Wiki Key Pages

What is automq

Getting started

Architecture

Deployment

Migration

Observability

Integrations

Releases

Benchmarks

Reference

Articles

Clone this wiki locally