Skip to content

Commit

Permalink
[PARTITIONER] Take partition into consider (opensource4you#1654)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored Aug 24, 2023
1 parent d4c3d4b commit e181abd
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,42 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.BrokerTopic;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.cost.BrokerCost;
import org.astraea.common.cost.CostFunction;
import org.astraea.common.cost.HasBrokerCost;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.cost.NodeLatencyCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricStore;
import org.astraea.common.producer.ProducerConfigs;

/**
* this partitioner scores the nodes by multiples cost functions. Each function evaluate the target
* node by different metrics. The default cost function ranks nodes by replica leader. It means the
* node having lower replica leaders get higher score.
* node by different metrics. The default cost function ranks nodes by request latency. It means the
* node having lower request latency get higher score. After determining the node, this partitioner
* scores the partitions with partition cost function. The default partition cost function ranks
* partition by partition size. It means the node having lower size get higher score.
*
* <p>The important config is JMX port. Most cost functions need the JMX metrics to score nodes.
* Normally, all brokers use the same JMX port, so you can just define the `jmx.port=12345`. If one
Expand All @@ -67,7 +76,11 @@ public class StrictCostPartitioner extends Partitioner {
MetricStore metricStore = null;

private Duration roundRobinLease = Duration.ofSeconds(4);
HasBrokerCost costFunction = new NodeLatencyCost();
HasBrokerCost brokerCost = new NodeLatencyCost();
HasPartitionCost partitionCost = new ReplicaLeaderSizeCost();
// The minimum partition cost of every topic of every broker.
final Map<String, Map<Integer, Integer>> minPartition = new HashMap<>();
long partitionUpdateTime = 0L;
Function<Integer, Integer> jmxPortGetter =
(id) -> {
throw new NoSuchElementException("must define either broker.x.jmx.port or jmx.port");
Expand All @@ -86,7 +99,7 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
try {
roundRobinKeeper.tryToUpdate(
clusterInfo,
() -> costToScore(costFunction.brokerCost(clusterInfo, metricStore.clusterBean())));
() -> costToScore(brokerCost.brokerCost(clusterInfo, metricStore.clusterBean())));
} catch (NoSufficientMetricsException e) {
// There is not enough metrics for the cost functions computing teh broker cost. We should not
// update the round-robin keeper. Reuse the weights that were kept in the round-robin keeper.
Expand All @@ -97,13 +110,69 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster

var target = roundRobinKeeper.next();

// Choose a preferred partition from candidate by partition cost function
var preferredPartition =
tryUpdateMinPartition(
topic,
target,
(tp, id) -> {
// Update the preferred partition according to the topic and target broker id
// The target broker id may be determined previously by broker cost
// The returned value may be a special value "-1" which represents no preferred
// partition.
// There are three conditions that the special value "-1" appears:
// 1. the target broker id is not valid
// 2. the target broker id has no partition leader
// 3. no partition cost in the target broker id
if (id == -1) return -1;
var candidate = clusterInfo.replicaLeaders(BrokerTopic.of(target, topic));
if (candidate.isEmpty()) return -1;
var candidateSet =
candidate.stream()
.map(Replica::topicPartition)
.collect(HashSet::new, HashSet::add, HashSet::addAll);
var preferred =
partitionCost
.partitionCost(clusterInfo, metricStore.clusterBean())
.value()
.entrySet()
.stream()
.filter(e -> candidateSet.contains(e.getKey()))
.min(Comparator.comparingDouble(Map.Entry::getValue));

return preferred.map(e -> e.getKey().partition()).orElse(-1);
});
// Check if we can get preferred partition from partition cost function
if (preferredPartition != -1) return preferredPartition;

// TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases
// Check "target valid" and the target "has partition leader".
var candidate =
target < 0 ? partitionLeaders : clusterInfo.replicaLeaders(BrokerTopic.of(target, topic));
candidate = candidate.isEmpty() ? partitionLeaders : candidate;
// Randomly choose from candidate.
return candidate.get((int) (Math.random() * candidate.size())).partition();
}

/**
* @param topic the topic we send record
* @param brokerId the broker id that has been determined by the broker cost function
* @param partition update function
* @return the cached partition if the update time is not expired; otherwise update the partition
* by the given supplier
*/
private int tryUpdateMinPartition(
String topic, int brokerId, BiFunction<String, Integer, Integer> partition) {
synchronized (minPartition) {
if (Utils.isExpired(partitionUpdateTime, roundRobinLease)) {
partitionUpdateTime = System.currentTimeMillis();
minPartition.clear();
}
return minPartition
.computeIfAbsent(topic, (tp) -> new HashMap<>())
.computeIfAbsent(brokerId, (id) -> partition.apply(topic, id));
}
}
/**
* The value of cost returned from cost function is conflict to score, since the higher cost
* represents lower score. This helper reverses the cost by subtracting the cost from "max cost".
Expand Down Expand Up @@ -131,8 +200,23 @@ static Map<Integer, Double> costToScore(BrokerCost cost) {
public void configure(Configuration config) {
var configuredFunctions =
Utils.costFunctions(
config.filteredPrefixConfigs(COST_PREFIX).raw(), HasBrokerCost.class, config);
if (!configuredFunctions.isEmpty()) this.costFunction = HasBrokerCost.of(configuredFunctions);
config.filteredPrefixConfigs(COST_PREFIX).raw(), CostFunction.class, config);
if (!configuredFunctions.isEmpty()) {
this.brokerCost =
HasBrokerCost.of(
configuredFunctions.entrySet().stream()
.filter(e -> e.getKey() instanceof HasBrokerCost)
.collect(
Collectors.toUnmodifiableMap(
e -> (HasBrokerCost) e.getKey(), Map.Entry::getValue)));
this.partitionCost =
HasPartitionCost.of(
configuredFunctions.entrySet().stream()
.filter(e -> e.getKey() instanceof HasPartitionCost)
.collect(
Collectors.toUnmodifiableMap(
e -> (HasPartitionCost) e.getKey(), Map.Entry::getValue)));
}
var customJmxPort = PartitionerUtils.parseIdJMXPort(config);
var defaultJmxPort = config.integer(JMX_PORT);
this.jmxPortGetter =
Expand Down Expand Up @@ -182,8 +266,14 @@ public void configure(Configuration config) {
};
metricStore =
MetricStore.builder()
.sensorsSupplier(
() ->
Map.of(
this.brokerCost.metricSensor(),
(brokerId, e) -> {},
this.partitionCost.metricSensor(),
(brokerId, e) -> {}))
.receivers(receivers)
.sensorsSupplier(() -> Map.of(this.costFunction.metricSensor(), (integer, e) -> {}))
.build();

this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.ClusterInfoTest;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.cost.BrokerCost;
import org.astraea.common.cost.BrokerInputCost;
import org.astraea.common.cost.HasBrokerCost;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.cost.NodeThroughputCost;
import org.astraea.common.cost.PartitionCost;
import org.astraea.common.cost.ReplicaLeaderCost;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.common.metrics.collector.MetricStore;
Expand Down Expand Up @@ -78,7 +81,7 @@ void testNegativeWeight() {
"2",
"jmx.port",
"1111")));
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
}
}

Expand All @@ -94,7 +97,7 @@ void testConfigureCostFunctions() {
"2",
"jmx.port",
"1111")));
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
}
}

Expand Down Expand Up @@ -134,6 +137,13 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
}
}

public static class DumbHasPartitionCost implements HasPartitionCost {
@Override
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return Map::of;
}
}

@Test
void testCostFunctionWithoutSensor() {
var replicaInfo0 =
Expand All @@ -143,14 +153,16 @@ void testCostFunctionWithoutSensor() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(
new Configuration(
(Map.of(Partitioner.COST_PREFIX + "." + DumbHasBrokerCost.class.getName(), "1"))));
(Map.of(
Partitioner.COST_PREFIX + "." + DumbHasBrokerCost.class.getName(), "1",
Partitioner.COST_PREFIX + "." + DumbHasPartitionCost.class.getName(), "1"))));
partitioner.partition(
"topic",
new byte[0],
new byte[0],
ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1)));
Utils.sleep(Duration.ofSeconds(1));
Assertions.assertEquals(1, partitioner.metricStore.sensors().size());
Assertions.assertEquals(2, partitioner.metricStore.sensors().size());
}
}

Expand All @@ -165,11 +177,16 @@ void testEmptyJmxPort() {
}
}

public static class MyFunction implements HasBrokerCost {
public static class MyFunction implements HasBrokerCost, HasPartitionCost {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(22, 10D);
}

@Override
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(TopicPartition.of("topic", 123), 10D);
}
}

@Test
Expand All @@ -195,22 +212,33 @@ void testReturnedPartition() {
.path("/tmp/aa")
.brokerId(1111)
.buildLeader();
var replicaInfo2 =
Replica.builder()
.topic("topic")
.partition(2)
.path("/tmp/aa")
.brokerId(1111)
.buildLeader();

// MyFunction returns the partition "partitionId" only. So the partitioner will choose the
// only one.
Assertions.assertEquals(
partitionId,
partitioner.partition(
"topic",
new byte[0],
new byte[0],
ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1))));
ClusterInfoTest.of(List.of(replicaInfo0, replicaInfo1, replicaInfo2))));
}
}

@Test
void testDefaultFunction() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(Configuration.EMPTY);
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction);
Utils.waitFor(() -> partitioner.metricStore.sensors().size() == 1);
Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.brokerCost);
Assertions.assertNotEquals(HasPartitionCost.EMPTY, partitioner.partitionCost);
Utils.waitFor(() -> partitioner.metricStore.sensors().size() == 2);
}
}

Expand Down Expand Up @@ -262,7 +290,7 @@ void testCostNoSufficientMetricException() {
try (var partitioner = new StrictCostPartitioner()) {
partitioner.configure(Configuration.EMPTY);
// The cost function always throws exception
partitioner.costFunction =
partitioner.brokerCost =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Expand Down
8 changes: 5 additions & 3 deletions docs/partitioner/strict_cost_partitioner.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Strict Cost Partitioner

Strict Cost Partitioner 是 [Astraea partitioner](./README.md) 之一,功能在於 "依據 broker cost" 來選擇發送的 partition 順序。"broker cost" 是使用者自定義的效能指標,目前使用 "request 平均延遲" 作為效能指標。(TODO: 允許使用者選擇效能指標)
Strict Cost Partitioner 是 [Astraea partitioner](./README.md) 之一,功能在於 "依據 cost function" 來選擇發送的 partition 順序。"cost function" 是使用者自定義的效能指標,目前使用 "request 平均延遲" 作為效能指標。

### 於本專案的 [performance tool](../performance_benchmark.md) 使用

Expand Down Expand Up @@ -28,7 +28,7 @@ var producer = new KafkaProducer<String, String>(props);

Strict Cost Partitioner 實做了 Apache Kafka 的 `org.apache.kafka.clients.producer.Partitioner` 介面,當 record 沒有指定要發送到哪個 partition 時,producer 便會呼叫 partitioner 來決定要把 record 發送到哪個 partition 上?

此 Partitioner (or called partitioner) 便藉著 Apache Kafka 提供的這項自由度,來選擇 "適合" 的 partition 發送。Strict Cost Partitioner 在選擇 partition 前,會
此 Partitioner 便藉著 Apache Kafka 提供的這項自由度,來選擇 "適合" 的 partition 發送。Strict Cost Partitioner 在選擇 partition 前,會

1. 獲取使用者定義的效能指標(預設是使用 producer 端的 request 平均延遲),下方範例示範指定以`record`大小為成本估計:
```java
Expand All @@ -41,8 +41,10 @@ var producer = new KafkaProducer<String, String>(props);
3. 加權各個效能指標計算出的分數
4. 利用分數建立 [Smooth Round Robin](../../common/src/main/java/org/astraea/common/partitioner/RoundRobin.java) 的排序
5. 紀錄前 `ROUND_ROBIN_LENGTH` 筆排序並重複使用
6. 套用各 cost function 計算 partition cost 並加權
7. 選出目標 broker 中 cost 最低的 partition

以上5個步驟每 `round.robin.lease` 時間會重新計算一次,預設的時間是4秒。可以在傳入的 `Properties` 中設定,
1~5 步驟每 `round.robin.lease` 時間會重新計算一次,預設的時間是4秒。可以在傳入的 `Properties` 中設定,

```bash
# 使用 performance tool 時,設定更新效能指標的時間
Expand Down

0 comments on commit e181abd

Please sign in to comment.