diff --git a/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java b/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java
index e06a2983fe..c1e90e873d 100644
--- a/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java
+++ b/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java
@@ -18,13 +18,16 @@
import java.time.Duration;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -32,10 +35,14 @@
import org.astraea.common.Utils;
import org.astraea.common.admin.BrokerTopic;
import org.astraea.common.admin.ClusterInfo;
+import org.astraea.common.admin.Replica;
import org.astraea.common.cost.BrokerCost;
+import org.astraea.common.cost.CostFunction;
import org.astraea.common.cost.HasBrokerCost;
+import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.cost.NodeLatencyCost;
+import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricStore;
@@ -43,8 +50,10 @@
/**
* this partitioner scores the nodes by multiples cost functions. Each function evaluate the target
- * node by different metrics. The default cost function ranks nodes by replica leader. It means the
- * node having lower replica leaders get higher score.
+ * node by different metrics. The default cost function ranks nodes by request latency. It means the
+ * node having lower request latency get higher score. After determining the node, this partitioner
+ * scores the partitions with partition cost function. The default partition cost function ranks
+ * partition by partition size. It means the node having lower size get higher score.
*
*
The important config is JMX port. Most cost functions need the JMX metrics to score nodes.
* Normally, all brokers use the same JMX port, so you can just define the `jmx.port=12345`. If one
@@ -67,7 +76,11 @@ public class StrictCostPartitioner extends Partitioner {
MetricStore metricStore = null;
private Duration roundRobinLease = Duration.ofSeconds(4);
- HasBrokerCost costFunction = new NodeLatencyCost();
+ HasBrokerCost brokerCost = new NodeLatencyCost();
+ HasPartitionCost partitionCost = new ReplicaLeaderSizeCost();
+ // The minimum partition cost of every topic of every broker.
+ final Map> minPartition = new HashMap<>();
+ long partitionUpdateTime = 0L;
Function jmxPortGetter =
(id) -> {
throw new NoSuchElementException("must define either broker.x.jmx.port or jmx.port");
@@ -86,7 +99,7 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
try {
roundRobinKeeper.tryToUpdate(
clusterInfo,
- () -> costToScore(costFunction.brokerCost(clusterInfo, metricStore.clusterBean())));
+ () -> costToScore(brokerCost.brokerCost(clusterInfo, metricStore.clusterBean())));
} catch (NoSufficientMetricsException e) {
// There is not enough metrics for the cost functions computing teh broker cost. We should not
// update the round-robin keeper. Reuse the weights that were kept in the round-robin keeper.
@@ -97,13 +110,69 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
var target = roundRobinKeeper.next();
+ // Choose a preferred partition from candidate by partition cost function
+ var preferredPartition =
+ tryUpdateMinPartition(
+ topic,
+ target,
+ (tp, id) -> {
+ // Update the preferred partition according to the topic and target broker id
+ // The target broker id may be determined previously by broker cost
+ // The returned value may be a special value "-1" which represents no preferred
+ // partition.
+ // There are three conditions that the special value "-1" appears:
+ // 1. the target broker id is not valid
+ // 2. the target broker id has no partition leader
+ // 3. no partition cost in the target broker id
+ if (id == -1) return -1;
+ var candidate = clusterInfo.replicaLeaders(BrokerTopic.of(target, topic));
+ if (candidate.isEmpty()) return -1;
+ var candidateSet =
+ candidate.stream()
+ .map(Replica::topicPartition)
+ .collect(HashSet::new, HashSet::add, HashSet::addAll);
+ var preferred =
+ partitionCost
+ .partitionCost(clusterInfo, metricStore.clusterBean())
+ .value()
+ .entrySet()
+ .stream()
+ .filter(e -> candidateSet.contains(e.getKey()))
+ .min(Comparator.comparingDouble(Map.Entry::getValue));
+
+ return preferred.map(e -> e.getKey().partition()).orElse(-1);
+ });
+ // Check if we can get preferred partition from partition cost function
+ if (preferredPartition != -1) return preferredPartition;
+
// TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases
+ // Check "target valid" and the target "has partition leader".
var candidate =
target < 0 ? partitionLeaders : clusterInfo.replicaLeaders(BrokerTopic.of(target, topic));
candidate = candidate.isEmpty() ? partitionLeaders : candidate;
+ // Randomly choose from candidate.
return candidate.get((int) (Math.random() * candidate.size())).partition();
}
+ /**
+ * @param topic the topic we send record
+ * @param brokerId the broker id that has been determined by the broker cost function
+ * @param partition update function
+ * @return the cached partition if the update time is not expired; otherwise update the partition
+ * by the given supplier
+ */
+ private int tryUpdateMinPartition(
+ String topic, int brokerId, BiFunction partition) {
+ synchronized (minPartition) {
+ if (Utils.isExpired(partitionUpdateTime, roundRobinLease)) {
+ partitionUpdateTime = System.currentTimeMillis();
+ minPartition.clear();
+ }
+ return minPartition
+ .computeIfAbsent(topic, (tp) -> new HashMap<>())
+ .computeIfAbsent(brokerId, (id) -> partition.apply(topic, id));
+ }
+ }
/**
* The value of cost returned from cost function is conflict to score, since the higher cost
* represents lower score. This helper reverses the cost by subtracting the cost from "max cost".
@@ -131,8 +200,23 @@ static Map costToScore(BrokerCost cost) {
public void configure(Configuration config) {
var configuredFunctions =
Utils.costFunctions(
- config.filteredPrefixConfigs(COST_PREFIX).raw(), HasBrokerCost.class, config);
- if (!configuredFunctions.isEmpty()) this.costFunction = HasBrokerCost.of(configuredFunctions);
+ config.filteredPrefixConfigs(COST_PREFIX).raw(), CostFunction.class, config);
+ if (!configuredFunctions.isEmpty()) {
+ this.brokerCost =
+ HasBrokerCost.of(
+ configuredFunctions.entrySet().stream()
+ .filter(e -> e.getKey() instanceof HasBrokerCost)
+ .collect(
+ Collectors.toUnmodifiableMap(
+ e -> (HasBrokerCost) e.getKey(), Map.Entry::getValue)));
+ this.partitionCost =
+ HasPartitionCost.of(
+ configuredFunctions.entrySet().stream()
+ .filter(e -> e.getKey() instanceof HasPartitionCost)
+ .collect(
+ Collectors.toUnmodifiableMap(
+ e -> (HasPartitionCost) e.getKey(), Map.Entry::getValue)));
+ }
var customJmxPort = PartitionerUtils.parseIdJMXPort(config);
var defaultJmxPort = config.integer(JMX_PORT);
this.jmxPortGetter =
@@ -182,8 +266,14 @@ public void configure(Configuration config) {
};
metricStore =
MetricStore.builder()
+ .sensorsSupplier(
+ () ->
+ Map.of(
+ this.brokerCost.metricSensor(),
+ (brokerId, e) -> {},
+ this.partitionCost.metricSensor(),
+ (brokerId, e) -> {}))
.receivers(receivers)
- .sensorsSupplier(() -> Map.of(this.costFunction.metricSensor(), (integer, e) -> {}))
.build();
this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease);
diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java
index 3535206515..f1f66792ae 100644
--- a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java
+++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java
@@ -29,11 +29,14 @@
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.ClusterInfoTest;
import org.astraea.common.admin.Replica;
+import org.astraea.common.admin.TopicPartition;
import org.astraea.common.cost.BrokerCost;
import org.astraea.common.cost.BrokerInputCost;
import org.astraea.common.cost.HasBrokerCost;
+import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.cost.NodeThroughputCost;
+import org.astraea.common.cost.PartitionCost;
import org.astraea.common.cost.ReplicaLeaderCost;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.common.metrics.collector.MetricStore;
@@ -78,7 +81,7 @@ void testNegativeWeight() {
"2",
"jmx.port",
"1111")));
- Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
+ Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
}
}
@@ -94,7 +97,7 @@ void testConfigureCostFunctions() {
"2",
"jmx.port",
"1111")));
- Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
+ Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
}
}
@@ -134,6 +137,13 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
}
}
+ public static class DumbHasPartitionCost implements HasPartitionCost {
+ @Override
+ public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
+ return Map::of;
+ }
+ }
+
@Test
void testCostFunctionWithoutSensor() {
var replicaInfo0 =
@@ -143,14 +153,16 @@ void testCostFunctionWithoutSensor() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(
new Configuration(
- (Map.of(Partitioner.COST_PREFIX + "." + DumbHasBrokerCost.class.getName(), "1"))));
+ (Map.of(
+ Partitioner.COST_PREFIX + "." + DumbHasBrokerCost.class.getName(), "1",
+ Partitioner.COST_PREFIX + "." + DumbHasPartitionCost.class.getName(), "1"))));
partitioner.partition(
"topic",
new byte[0],
new byte[0],
ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1)));
Utils.sleep(Duration.ofSeconds(1));
- Assertions.assertEquals(1, partitioner.metricStore.sensors().size());
+ Assertions.assertEquals(2, partitioner.metricStore.sensors().size());
}
}
@@ -165,11 +177,16 @@ void testEmptyJmxPort() {
}
}
- public static class MyFunction implements HasBrokerCost {
+ public static class MyFunction implements HasBrokerCost, HasPartitionCost {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(22, 10D);
}
+
+ @Override
+ public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
+ return () -> Map.of(TopicPartition.of("topic", 123), 10D);
+ }
}
@Test
@@ -195,13 +212,23 @@ void testReturnedPartition() {
.path("/tmp/aa")
.brokerId(1111)
.buildLeader();
+ var replicaInfo2 =
+ Replica.builder()
+ .topic("topic")
+ .partition(2)
+ .path("/tmp/aa")
+ .brokerId(1111)
+ .buildLeader();
+
+ // MyFunction returns the partition "partitionId" only. So the partitioner will choose the
+ // only one.
Assertions.assertEquals(
partitionId,
partitioner.partition(
"topic",
new byte[0],
new byte[0],
- ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1))));
+ ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1, replicaInfo2))));
}
}
@@ -209,8 +236,9 @@ void testReturnedPartition() {
void testDefaultFunction() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(Configuration.EMPTY);
- Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
- Utils.waitFor(() -> partitioner.metricStore.sensors().size() == 1);
+ Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
+ Assertions.assertNotEquals(HasPartitionCost.EMPTY, partitioner.partitionCost);
+ Utils.waitFor(() -> partitioner.metricStore.sensors().size() == 2);
}
}
@@ -262,7 +290,7 @@ void testCostNoSufficientMetricException() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(Configuration.EMPTY);
// The cost function always throws exception
- partitioner.costFunction =
+ partitioner.brokerCost =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
diff --git a/docs/partitioner/strict_cost_partitioner.md b/docs/partitioner/strict_cost_partitioner.md
index 6f9d0888d3..16d512809c 100644
--- a/docs/partitioner/strict_cost_partitioner.md
+++ b/docs/partitioner/strict_cost_partitioner.md
@@ -1,6 +1,6 @@
# Strict Cost Partitioner
-Strict Cost Partitioner 是 [Astraea partitioner](./README.md) 之一,功能在於 "依據 broker cost" 來選擇發送的 partition 順序。"broker cost" 是使用者自定義的效能指標,目前使用 "request 平均延遲" 作為效能指標。(TODO: 允許使用者選擇效能指標)
+Strict Cost Partitioner 是 [Astraea partitioner](./README.md) 之一,功能在於 "依據 cost function" 來選擇發送的 partition 順序。"cost function" 是使用者自定義的效能指標,目前使用 "request 平均延遲" 作為效能指標。
### 於本專案的 [performance tool](../performance_benchmark.md) 使用
@@ -28,7 +28,7 @@ var producer = new KafkaProducer(props);
Strict Cost Partitioner 實做了 Apache Kafka 的 `org.apache.kafka.clients.producer.Partitioner` 介面,當 record 沒有指定要發送到哪個 partition 時,producer 便會呼叫 partitioner 來決定要把 record 發送到哪個 partition 上?
-此 Partitioner (or called partitioner) 便藉著 Apache Kafka 提供的這項自由度,來選擇 "適合" 的 partition 發送。Strict Cost Partitioner 在選擇 partition 前,會
+此 Partitioner 便藉著 Apache Kafka 提供的這項自由度,來選擇 "適合" 的 partition 發送。Strict Cost Partitioner 在選擇 partition 前,會
1. 獲取使用者定義的效能指標(預設是使用 producer 端的 request 平均延遲),下方範例示範指定以`record`大小為成本估計:
```java
@@ -41,8 +41,10 @@ var producer = new KafkaProducer(props);
3. 加權各個效能指標計算出的分數
4. 利用分數建立 [Smooth Round Robin](../../common/src/main/java/org/astraea/common/partitioner/RoundRobin.java) 的排序
5. 紀錄前 `ROUND_ROBIN_LENGTH` 筆排序並重複使用
+6. 套用各 cost function 計算 partition cost 並加權
+7. 選出目標 broker 中 cost 最低的 partition
-以上5個步驟每 `round.robin.lease` 時間會重新計算一次,預設的時間是4秒。可以在傳入的 `Properties` 中設定,
+1~5 步驟每 `round.robin.lease` 時間會重新計算一次,預設的時間是4秒。可以在傳入的 `Properties` 中設定,
```bash
# 使用 performance tool 時,設定更新效能指標的時間