diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java new file mode 100644 index 0000000000..834bdce8d8 --- /dev/null +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Duration; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.distribution.ParetoDistribution; +import org.apache.commons.math3.distribution.UniformIntegerDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.random.Well19937c; +import org.apache.commons.math3.util.Pair; +import org.astraea.common.Configuration; +import org.astraea.common.DataRate; +import org.astraea.common.DataSize; +import org.astraea.common.Utils; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartition; + +/** + * This class build up an imbalance scenario that one of the topic has significant more produce load + * than the others. + */ +public class BackboneImbalanceScenario implements Scenario { + + public static final String CONFIG_RANDOM_SEED = "seed"; + public static final String CONFIG_TOPIC_COUNT = "topicCount"; + public static final String CONFIG_TOPIC_DATA_RATE_PARETO_SCALE = "topicRateParetoScale"; + public static final String CONFIG_TOPIC_DATA_RATE_PARETO_SHAPE = "topicRateParetoShape"; + public static final String CONFIG_TOPIC_CONSUMER_FANOUT_SERIES = "consumerFanoutSeries"; + public static final String CONFIG_PARTITION_COUNT_MIN = "partitionCountMin"; + public static final String CONFIG_PARTITION_COUNT_MAX = "partitionCountMax"; + public static final String CONFIG_BACKBONE_DATA_RATE = "backboneDataRate"; + public static final String CONFIG_PERF_CLIENT_COUNT = "performanceClientCount"; + public static final String CONFIG_PERF_KEY_TABLE_SEED = "performanceKeyTableSeed"; + public static final String CONFIG_PERF_ZIPFIAN_EXPONENT = "performanceZipfianExponent"; + + private static final String backboneTopicName = "backbone"; + + @Override + public CompletionStage apply(Admin admin, Configuration scenarioConfig) { + final var config = new Config(scenarioConfig); + final var rng = new Well19937c(config.seed()); + final var topicDataRateDistribution = + new ParetoDistribution(rng, config.topicRateParetoScale(), config.topicRateParetoShape()); + final var backboneDataRateDistribution = + new UniformRealDistribution( + rng, config.backboneDataRate() * 0.8, config.backboneDataRate() * 1.2); + final var topicPartitionCountDistribution = + new UniformIntegerDistribution(rng, config.partitionMin(), config.partitionMax()); + final var topicConsumerFanoutDistribution = + new EnumeratedDistribution<>( + rng, + config.consumerFanoutSeries().stream() + .map(x -> Pair.create(x, 1.0)) + .collect(Collectors.toUnmodifiableList())); + + return CompletableFuture.supplyAsync( + () -> { + final var topicNames = + IntStream.range(0, config.topicCount()) + .mapToObj(index -> "topic_" + index) + .collect(Collectors.toUnmodifiableSet()); + + // create topics + var normalTopics = + topicNames.stream() + .map( + name -> + admin + .creator() + .topic(name) + .numberOfPartitions(topicPartitionCountDistribution.sample()) + .numberOfReplicas((short) 1) + .run()); + var backboneTopic = + Stream.generate( + () -> + admin + .creator() + .topic(backboneTopicName) + .numberOfPartitions(24) + .numberOfReplicas((short) 1) + .run()) + .limit(1); + + Stream.concat(normalTopics, backboneTopic) + .map(CompletionStage::toCompletableFuture) + .peek( + stage -> + stage.whenComplete( + (done, err) -> { + if (err != null) err.printStackTrace(); + })) + .forEach(CompletableFuture::join); + Utils.sleep(Duration.ofSeconds(1)); + + // gather info and generate necessary variables + var allTopics = + Stream.concat(topicNames.stream(), Stream.of(backboneTopicName)) + .collect(Collectors.toUnmodifiableSet()); + var clusterInfo = admin.clusterInfo(allTopics).toCompletableFuture().join(); + var topicDataRate = + allTopics.stream() + .collect( + Collectors.toUnmodifiableMap( + x -> x, + x -> + DataRate.Byte.of( + (long) + (x.equals(backboneTopicName) + ? backboneDataRateDistribution.sample() + : topicDataRateDistribution.sample())))); + var topicPartitionDataRate = + clusterInfo.topicNames().stream() + .filter(topic -> !topic.equals(backboneTopicName)) + .flatMap( + topic -> { + var partitionWeight = + clusterInfo.replicas(topic).stream() + .map(Replica::topicPartition) + .distinct() + .collect( + Collectors.toUnmodifiableMap(tp -> tp, tp -> rng.nextDouble())); + var totalDataRate = topicDataRate.get(topic).byteRate(); + var totalWeight = + partitionWeight.values().stream().mapToDouble(x -> x).sum(); + + return partitionWeight.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + DataRate.Byte.of( + (long) (totalDataRate * e.getValue() / totalWeight)))); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var backboneTopicBandwidth = topicDataRate.get(backboneTopicName); + var nodeWeight = + IntStream.range(1, clusterInfo.nodes().size()) + .boxed() + .collect( + Collectors.toMap( + index -> clusterInfo.nodes().get(index).id(), index -> rng.nextInt(100))); + nodeWeight.put( + clusterInfo.nodes().get(0).id(), nodeWeight.values().stream().mapToInt(x -> x).sum()); + + clusterInfo.replicas(backboneTopicName).stream() + .collect(Collectors.groupingBy(x -> x.nodeInfo().id())) + .forEach( + (nodeId, replicas) -> { + var weight = nodeWeight.get(nodeId); + var weightSum = nodeWeight.values().stream().mapToInt(x -> x).sum(); + var nodeDataRate = backboneTopicBandwidth.byteRate() * weight / weightSum; + var replicaDataRate = nodeDataRate / replicas.size(); + replicas.forEach( + replica -> + topicPartitionDataRate.put( + replica.topicPartition(), + DataRate.Byte.of((long) replicaDataRate))); + }); + + var consumerFanoutMap = + allTopics.stream() + .collect( + Collectors.toUnmodifiableMap( + x -> x, + x -> + x.equals(backboneTopicName) + ? 1 + : topicConsumerFanoutDistribution.sample())); + + return new Result( + config, + clusterInfo, + allTopics, + topicDataRate, + topicPartitionDataRate, + consumerFanoutMap); + }); + } + + public static class Result { + + @JsonIgnore private final Config config; + @JsonIgnore private final ClusterInfo clusterInfo; + @JsonIgnore private final Set topics; + @JsonIgnore private final Map topicDataRates; + @JsonIgnore private final Map topicPartitionDataRates; + @JsonIgnore private final Map topicConsumerFanout; + + public Result( + Config config, + ClusterInfo clusterInfo, + Set topics, + Map topicDataRates, + Map topicPartitionDataRates, + Map topicConsumerFanout) { + this.config = config; + this.clusterInfo = clusterInfo; + this.topics = topics; + this.topicDataRates = topicDataRates; + this.topicPartitionDataRates = topicPartitionDataRates; + this.topicConsumerFanout = topicConsumerFanout; + } + + @JsonProperty + public long totalTopics() { + return topics.size(); + } + + @JsonProperty + public long totalPartitions() { + return clusterInfo.replicaStream().filter(r -> topics.contains(r.topic())).count(); + } + + @JsonProperty + public String totalProduceRate() { + var sum = topicDataRates.values().stream().mapToDouble(DataRate::byteRate).sum(); + return DataRate.Byte.of((long) sum).toString(); + } + + @JsonProperty + public String totalConsumeRate() { + var sum = + topicDataRates.entrySet().stream() + .mapToDouble(e -> e.getValue().byteRate() * topicConsumerFanout.get(e.getKey())) + .sum(); + return DataRate.Byte.of((long) sum).toString(); + } + + @JsonProperty + public double consumerFanoutAverage() { + return config.consumerFanoutSeries().stream().mapToInt(x -> x).average().orElse(0); + } + + @JsonProperty + public Map topicDataRate() { + return topicDataRates.entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, x -> x.getValue().toString())); + } + + @JsonProperty + public Map topicDataRateHistogram() { + var byteRates = + topicDataRates.values().stream() + .map(DataRate::byteRate) + .sorted(Double::compareTo) + .collect(Collectors.toUnmodifiableList()); + var totalRates = byteRates.size(); + // For all the data rates, we use 1/4 portion of the data rates as one histogram bin. And the + // rest of the 3/4 will be used for the rest of the other bins. This process continues + // recursively until no more rate for a single bin. + var histogramBins = + Stream.iterate( + Map.entry(totalRates, totalRates / 4), + e -> e.getKey() > 0, + (e) -> + Map.entry( + (e.getKey() - e.getValue()), + Math.max(1, (e.getKey() - e.getValue()) / 4))) + .map( + e -> { + var taken = totalRates - e.getKey(); + var takes = e.getValue(); + return byteRates.subList(taken, taken + takes); + }) + .collect(Collectors.toUnmodifiableList()); + var rendered = + histogramBins.stream() + .map( + binContent -> { + var first = DataRate.Byte.of(binContent.get(0).longValue()); + var last = DataRate.Byte.of(binContent.get(binContent.size() - 1).longValue()); + var key = String.format("[%s, %s]", first, last); + var value = Integer.toString(binContent.size()); + return Map.entry(key, value); + }) + .collect(Collectors.toUnmodifiableList()); + var orderMap = + IntStream.range(0, rendered.size()) + .boxed() + .collect( + Collectors.toUnmodifiableMap( + x -> rendered.get(x).getKey(), x -> x, Integer::sum)); + var sortedMap = new TreeMap(Comparator.comparingInt(orderMap::get)); + rendered.forEach(e -> sortedMap.put(e.getKey(), e.getValue())); + return sortedMap; + } + + @JsonProperty + public Map topicConsumerFanout() { + return topicConsumerFanout; + } + + @JsonProperty + public List> perfCommands() { + class PerfClient { + long consumeRate = 0; + long produceRate = 0; + final Set topics = new HashSet<>(); + String keyDistribution; + final Map keyDistributionConfig = new HashMap<>(); + } + var clientCount = config.performanceClientCount(); + if (clientCount < 3) + throw new IllegalArgumentException("At least three perf clients required"); + var clients = + IntStream.range(0, clientCount) + .mapToObj(i -> new PerfClient()) + .collect(Collectors.toUnmodifiableList()); + + // allocate topics to all the performance clients evenly + for (var topic : topics) { + var dataRate = (long) topicDataRates.get(topic).byteRate(); + var fanout = (int) topicConsumerFanout.get(topic); + for (int i = 0; i < fanout; i++) { + if (topic.equals(BackboneImbalanceScenario.backboneTopicName)) { + // separate the processing of produce/consume to two individual clients. + // see https://github.com/skiptests/astraea/issues/1567 + var produceClient = clients.get(0); + produceClient.produceRate += dataRate; + produceClient.topics.add(topic); + var consumeClient = clients.get(1); + consumeClient.consumeRate += dataRate; + consumeClient.topics.add(topic); + } else { + var nextClient = + clients.stream() + .skip(2) + .filter(x -> !x.topics.contains(topic)) + .min(Comparator.comparing(x -> x.consumeRate)) + .orElseThrow(); + nextClient.consumeRate += dataRate; + nextClient.produceRate += dataRate / fanout; + nextClient.topics.add(topic); + } + } + } + for (var client : clients) { + var zipfian = client.topics.equals(Set.of(BackboneImbalanceScenario.backboneTopicName)); + client.keyDistribution = zipfian ? "zipfian" : "uniform"; + if (zipfian) { + client.keyDistributionConfig.put("exponent", Double.toString(config.zipfianExponent())); + } + } + + // render the argument + return clients.stream() + .map( + client -> { + var isBackbone = client.topics.equals(Set.of(backboneTopicName)); + var consumeRate = DataRate.Byte.of(client.consumeRate); + var produceRate = DataRate.Byte.of(client.produceRate); + var throttle = + client.topics.stream() + .flatMap( + topic -> + clusterInfo + .replicaStream(topic) + .map(Replica::topicPartition) + .distinct()) + .flatMap( + tp -> { + // backbone partition bandwidth is unknown before performance start. + if (!topicPartitionDataRates.containsKey(tp)) return Stream.of(); + var bytes = + topicPartitionDataRates + .get(tp) + .dataSize() + .divide(topicConsumerFanout.get(tp.topic())) + .bytes(); + // TopicPartitionDataRateMapField support only integer measurement + // and no space allowed. So we can't just toString the DataRate + // object :( + return Stream.of(String.format("%s:%sByte/second", tp, bytes)); + }) + .collect(Collectors.joining(",")); + var throughput = String.format("%dByte/second", (long) produceRate.byteRate()); + var keyDistConfigString = + client.keyDistributionConfig.entrySet().stream() + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.joining()); + return Map.ofEntries( + Map.entry("backbone", Boolean.toString(isBackbone)), + Map.entry("topics", String.join(",", client.topics)), + Map.entry("throughput", throughput), + Map.entry("throttle", throttle), + Map.entry("key_distribution", client.keyDistribution), + Map.entry("key_distribution_config", keyDistConfigString), + Map.entry("key_table_seed", Integer.toString(config.keyTableSeed())), + Map.entry("no_consumer", Boolean.toString(consumeRate.byteRate() == 0)), + Map.entry("no_producer", Boolean.toString(produceRate.byteRate() == 0)), + Map.entry("consume_rate", consumeRate.toString()), + Map.entry("produce_rate", produceRate.toString())); + }) + .collect(Collectors.toUnmodifiableList()); + } + + @JsonProperty + public int seed() { + return config.seed(); + } + } + + public static class Config { + + private final Configuration scenarioConfig; + private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); + private final int defaultPerfKeyTableSeed = new Random(defaultRandomSeed).nextInt(); + + public Config(Configuration scenarioConfig) { + this.scenarioConfig = scenarioConfig; + + int maxFanout = consumerFanoutSeries().stream().mapToInt(x -> x).max().orElseThrow(); + if (maxFanout > performanceClientCount()) + throw new IllegalArgumentException( + "The number of client is less than the max topic fanout: " + + maxFanout + + " <= " + + performanceClientCount()); + } + + int seed() { + return scenarioConfig + .string(CONFIG_RANDOM_SEED) + .map(Integer::parseInt) + .orElse(defaultRandomSeed); + } + + int topicCount() { + return scenarioConfig.string(CONFIG_TOPIC_COUNT).map(Integer::parseInt).orElse(1000); + } + + int partitionMin() { + return scenarioConfig.string(CONFIG_PARTITION_COUNT_MIN).map(Integer::parseInt).orElse(5); + } + + int partitionMax() { + return scenarioConfig.string(CONFIG_PARTITION_COUNT_MAX).map(Integer::parseInt).orElse(15); + } + + List consumerFanoutSeries() { + return scenarioConfig + .string(CONFIG_TOPIC_CONSUMER_FANOUT_SERIES) + .filter(s -> !s.isEmpty()) + .map( + seriesString -> + Arrays.stream(seriesString.split(",")) + .map(Integer::parseInt) + .collect(Collectors.toUnmodifiableList())) + .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 5)); + } + + double topicRateParetoScale() { + return scenarioConfig + .string(CONFIG_TOPIC_DATA_RATE_PARETO_SCALE) + .map(Double::parseDouble) + .orElse(DataRate.MB.of(1).byteRate()); + } + + double topicRateParetoShape() { + return scenarioConfig + .string(CONFIG_TOPIC_DATA_RATE_PARETO_SHAPE) + .map(Double::parseDouble) + .orElse(3.0); + } + + long backboneDataRate() { + return scenarioConfig + .string(CONFIG_BACKBONE_DATA_RATE) + .map(Long::parseLong) + .orElse(DataSize.MB.of(950).bytes()); + } + + int performanceClientCount() { + return scenarioConfig.string(CONFIG_PERF_CLIENT_COUNT).map(Integer::parseInt).orElse(7); + } + + int keyTableSeed() { + return scenarioConfig + .string(CONFIG_PERF_KEY_TABLE_SEED) + .map(Integer::parseInt) + .orElse(defaultPerfKeyTableSeed); + } + + double zipfianExponent() { + return scenarioConfig + .string(CONFIG_PERF_ZIPFIAN_EXPONENT) + .map(Double::parseDouble) + .orElse(1.0); + } + } +} diff --git a/app/src/main/java/org/astraea/app/web/Scenario.java b/app/src/main/java/org/astraea/app/web/Scenario.java index 6949db3574..76cec403e0 100644 --- a/app/src/main/java/org/astraea/app/web/Scenario.java +++ b/app/src/main/java/org/astraea/app/web/Scenario.java @@ -16,13 +16,13 @@ */ package org.astraea.app.web; -import java.util.Map; import java.util.concurrent.CompletionStage; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; /** The subclass of this class should contain the logic to fulfill a scenario. */ -public interface Scenario { +public interface Scenario { static Builder builder() { return new Builder(); @@ -56,54 +56,12 @@ public Builder binomialProbability(double binomialProbability) { return this; } - public Scenario build() { + public Scenario build() { return new SkewedPartitionScenario( topicName, numberOfPartitions, numberOfReplicas, binomialProbability); } } /** Apply this scenario to the Kafka cluster */ - CompletionStage apply(Admin admin); - - class Result { - - private final String topicName; - private final int numberOfPartitions; - private final short numberOfReplicas; - private final Map leaderSum; - private final Map logSum; - - public Result( - String topicName, - int numberOfPartitions, - short numberOfReplicas, - Map leaderSum, - Map logSum) { - this.topicName = topicName; - this.numberOfPartitions = numberOfPartitions; - this.numberOfReplicas = numberOfReplicas; - this.leaderSum = leaderSum; - this.logSum = logSum; - } - - public String topicName() { - return topicName; - } - - public int numberOfPartitions() { - return numberOfPartitions; - } - - public short numberOfReplicas() { - return numberOfReplicas; - } - - public Map leaderSum() { - return leaderSum; - } - - public Map logSum() { - return logSum; - } - } + CompletionStage apply(Admin admin, Configuration scenarioConfig); } diff --git a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java index 7e45b7700a..87f690ce7d 100644 --- a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java +++ b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java @@ -29,12 +29,13 @@ import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.commons.math3.distribution.IntegerDistribution; import org.apache.commons.math3.util.Pair; +import org.astraea.common.Configuration; import org.astraea.common.admin.Admin; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.TopicPartition; import org.astraea.common.admin.TopicPartitionReplica; -public class SkewedPartitionScenario implements Scenario { +public class SkewedPartitionScenario implements Scenario { final String topicName; final int partitions; @@ -50,7 +51,7 @@ public SkewedPartitionScenario( } @Override - public CompletionStage apply(Admin admin) { + public CompletionStage apply(Admin admin, Configuration scenarioConfig) { return admin .creator() .topic(topicName) @@ -138,4 +139,46 @@ public static List sampledReplicaList( } return result; } + + public static class Result { + + private final String topicName; + private final int numberOfPartitions; + private final short numberOfReplicas; + private final Map leaderSum; + private final Map logSum; + + public Result( + String topicName, + int numberOfPartitions, + short numberOfReplicas, + Map leaderSum, + Map logSum) { + this.topicName = topicName; + this.numberOfPartitions = numberOfPartitions; + this.numberOfReplicas = numberOfReplicas; + this.leaderSum = leaderSum; + this.logSum = logSum; + } + + public String topicName() { + return topicName; + } + + public int numberOfPartitions() { + return numberOfPartitions; + } + + public short numberOfReplicas() { + return numberOfReplicas; + } + + public Map leaderSum() { + return leaderSum; + } + + public Map logSum() { + return logSum; + } + } } diff --git a/app/src/main/java/org/astraea/app/web/TopicHandler.java b/app/src/main/java/org/astraea/app/web/TopicHandler.java index d4320a3033..23c9371226 100644 --- a/app/src/main/java/org/astraea/app/web/TopicHandler.java +++ b/app/src/main/java/org/astraea/app/web/TopicHandler.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.astraea.common.Configuration; import org.astraea.common.FutureUtils; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; @@ -175,7 +176,7 @@ public CompletionStage post(Channel channel) { .numberOfReplicas(numberOfReplicas) .binomialProbability(topic.probability.get()) .build() - .apply(admin) + .apply(admin, Configuration.EMPTY) .thenApply(ignored -> null) .toCompletableFuture(); } diff --git a/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java b/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java index 0c17e63b3f..a3100d10a2 100644 --- a/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java +++ b/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java @@ -17,10 +17,15 @@ package org.astraea.balancer.bench; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import org.astraea.common.admin.ClusterBean; @@ -28,9 +33,11 @@ import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.cost.ClusterCost; +import org.astraea.common.cost.CompositeClusterCost; import org.astraea.common.cost.HasClusterCost; import org.astraea.common.cost.HasMoveCost; import org.astraea.common.cost.MoveCost; +import org.astraea.common.cost.ResourceUsageHint; import org.astraea.common.metrics.collector.MetricSensor; class CostProfilingImpl implements BalancerBenchmark.CostProfilingBuilder { @@ -85,7 +92,22 @@ public CompletableFuture start() { .clusterBean(clusterBean) .timeout(timeout) .clusterCost( - new HasClusterCost() { + new CompositeClusterCost() { + @Override + public Collection functions() { + var queue = new LinkedList(List.of(costFunction)); + var functions = new ArrayList(); + + while (!queue.isEmpty()) { + var next = queue.pop(); + if (next instanceof CompositeClusterCost) + functions.addAll(((CompositeClusterCost) next).functions()); + else functions.add(next); + } + + return List.copyOf(functions); + } + @Override public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { final var start = System.nanoTime(); @@ -124,6 +146,11 @@ public Optional metricSensor() { return moveCostFunction.metricSensor(); } + @Override + public Set resourceUsageHint() { + return moveCostFunction.resourceUsageHint(); + } + @Override public String toString() { return moveCostFunction.toString(); diff --git a/app/src/test/java/org/astraea/app/BackboneImbalanceApplyTest.java b/app/src/test/java/org/astraea/app/BackboneImbalanceApplyTest.java new file mode 100644 index 0000000000..e30e7c3797 --- /dev/null +++ b/app/src/test/java/org/astraea/app/BackboneImbalanceApplyTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.web.BackboneImbalanceScenario; +import org.astraea.common.Configuration; +import org.astraea.common.admin.Admin; +import org.astraea.common.json.JsonConverter; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public class BackboneImbalanceApplyTest { + + public static final String realCluster = + "192.168.103.177:25655,192.168.103.178:25655,192.168.103.179:25655,192.168.103.180:25655,192.168.103.181:25655,192.168.103.182:25655"; + public static final List clients = + List.of( + "192.168.103.184", + "192.168.103.142", + "192.168.103.183", + "192.168.103.141", + "192.168.103.143", + "192.168.103.144", + "192.168.103.145"); + + @Test + @Disabled + void testBackbone() { + try (Admin admin = Admin.of(realCluster)) { + var scenario = new BackboneImbalanceScenario(); + var config = + Configuration.of( + Map.ofEntries( + Map.entry(BackboneImbalanceScenario.CONFIG_PERF_ZIPFIAN_EXPONENT, "1.6"), + Map.entry(BackboneImbalanceScenario.CONFIG_PERF_KEY_TABLE_SEED, "0"), + Map.entry( + BackboneImbalanceScenario.CONFIG_PERF_CLIENT_COUNT, + Integer.toString(clients.size())))); + var result = scenario.apply(admin, config).toCompletableFuture().join(); + // print summary + var converter = JsonConverter.defaultConverter(); + System.out.println(converter.toJson(result)); + // save result to json format + var ansibleInventory = converter.toJson(toAnsibleInventory(result)); + var ansibleInventoryFile = + "/home/garyparrot/Programming/ansible/backbone-imbalance-scenario-inventory.json"; + try (var stream = Files.newBufferedWriter(Path.of(ansibleInventoryFile))) { + stream.write(ansibleInventory); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + Map toAnsibleInventory(BackboneImbalanceScenario.Result result) { + var hosts = + IntStream.range(0, clients.size()) + .boxed() + .collect( + Collectors.toUnmodifiableMap( + clients::get, + index -> { + var clientHostname = clients.get(index); + var clientPerf = result.perfCommands().get(index); + return Map.ofEntries( + Map.entry("ansible_host", clientHostname), + Map.entry("ansible_user", "kafka"), + Map.entry("expected_produce_rate", clientPerf.get("produce_rate")), + Map.entry("expected_consume_rate", clientPerf.get("consume_rate")), + Map.entry("key_table_seed", clientPerf.get("key_table_seed")), + Map.entry("key_distribution", clientPerf.get("key_distribution")), + Map.entry( + "key_distribution_config", clientPerf.get("key_distribution_config")), + Map.entry("throttle", clientPerf.get("throttle")), + Map.entry("throttle_enable", !clientPerf.get("backbone").equals("true")), + Map.entry("throughput", clientPerf.get("throughput")), + Map.entry("throughput_enable", clientPerf.get("backbone").equals("true")), + Map.entry("no_producer", clientPerf.get("no_producer")), + Map.entry("no_consumer", clientPerf.get("no_consumer")), + Map.entry("topics", clientPerf.get("topics"))); + })); + + return Map.of("backbone_imbalance_hosts", Map.of("hosts", hosts)); + } +} diff --git a/app/src/test/java/org/astraea/app/BalancerExperimentTest.java b/app/src/test/java/org/astraea/app/BalancerExperimentTest.java new file mode 100644 index 0000000000..5cb659683a --- /dev/null +++ b/app/src/test/java/org/astraea/app/BalancerExperimentTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Scanner; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.astraea.balancer.bench.BalancerBenchmark; +import org.astraea.common.Configuration; +import org.astraea.common.Utils; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.ClusterBean; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.balancer.AlgorithmConfig; +import org.astraea.common.balancer.Balancer; +import org.astraea.common.balancer.algorithms.GreedyBalancer; +import org.astraea.common.balancer.algorithms.ResourceBalancer; +import org.astraea.common.balancer.executor.StraightPlanExecutor; +import org.astraea.common.cost.HasClusterCost; +import org.astraea.common.cost.HasMoveCost; +import org.astraea.common.cost.NetworkEgressCost; +import org.astraea.common.cost.NetworkIngressCost; +import org.astraea.common.cost.NoSufficientMetricsException; +import org.astraea.common.cost.ReplicaLeaderCost; +import org.astraea.common.cost.ReplicaNumberCost; +import org.astraea.common.cost.ResourceUsage; +import org.astraea.common.metrics.ClusterBeanSerializer; +import org.astraea.common.metrics.ClusterInfoSerializer; +import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.collector.MetricStore; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import scala.math.Equiv; + +public class BalancerExperimentTest { + + public static final String fileName0 = "/home/garyparrot/cluster-file3.bin"; + public static final String fileName1 = "/home/garyparrot/bean-file3.bin"; + public static final String realCluster = + "192.168.103.177:25655,192.168.103.178:25655,192.168.103.179:25655,192.168.103.180:25655,192.168.103.181:25655,192.168.103.182:25655"; + + public static void main(String[] args) { + new BalancerExperimentTest().testProfiling(); + } + + @Disabled + @Test + void testRuntime() { + try( + var stream0 = new FileInputStream(fileName0); + var stream1 = new FileInputStream(fileName1)) { + System.out.println("Serialize ClusterInfo"); + ClusterInfo clusterInfo = ClusterInfoSerializer.deserialize(stream0); + System.out.println("Serialize ClusterBean"); + ClusterBean clusterBean = ClusterBeanSerializer.deserialize(stream1); + ReplicaLeaderCost replicaLeaderCost = new ReplicaLeaderCost(Configuration.of(Map.of( + ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "30" ))); + System.out.println(replicaLeaderCost.resourceUsageHint().size()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Disabled + @Test + void testProfiling() { + // load + try (var admin = Admin.of(realCluster); + var stream0 = new FileInputStream(fileName0); + var stream1 = new FileInputStream(fileName1)) { + // ClusterInfo clusterInfo0 = + // admin.topicNames(false).thenCompose(admin::clusterInfo).toCompletableFuture().join(); + System.out.println("Serialize ClusterInfo"); + ClusterInfo clusterInfo = ClusterInfoSerializer.deserialize(stream0); + System.out.println("Serialize ClusterBean"); + ClusterBean clusterBean = ClusterBeanSerializer.deserialize(stream1); + System.out.println("Done!"); + + Map costMap = + Map.of( + new NetworkIngressCost(Configuration.EMPTY), 3.0, + new NetworkEgressCost(Configuration.EMPTY), 3.0); + var costFunction = HasClusterCost.of(costMap); + + var balancer = new ResourceBalancer(); + var result = + BalancerBenchmark.costProfiling() + .setClusterInfo(clusterInfo) + .setClusterBean(clusterBean) + .setBalancer(balancer) + .setExecutionTimeout(Duration.ofSeconds(180)) + .setAlgorithmConfig(AlgorithmConfig.builder() + .clusterCost(costFunction) + .moveCost( + new ReplicaLeaderCost(Configuration.of(Map.of( + ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "60" + )))) + .build()) + .start() + .toCompletableFuture() + .join(); + + var meanClusterCostTime = + Duration.ofNanos((long) result.clusterCostProcessingTimeNs().getAverage()); + var meanMoveCostTime = + Duration.ofNanos((long) result.moveCostProcessingTimeNs().getAverage()); + System.out.println("Total Run time: " + result.executionTime().toMillis() + " ms"); + System.out.println( + "Total ClusterCost Evaluation: " + result.clusterCostProcessingTimeNs().getCount()); + System.out.println( + "Average ClusterCost Processing: " + meanClusterCostTime.toMillis() + "ms"); + System.out.println("Average MoveCost Processing: " + meanMoveCostTime.toMillis() + "ms"); + System.out.println("Initial Cost: " + result.initial()); + System.out.println( + "Final Cost: " + result.plan().map(Balancer.Plan::proposalClusterCost).orElse(null)); + var profilingFile = Utils.packException(() -> Files.createTempFile("profile-", ".csv")); + System.out.println("Profiling File: " + profilingFile.toString()); + System.out.println( + "Total affected partitions: " + + ClusterInfo.findNonFulfilledAllocation( + clusterInfo, result.plan().orElseThrow().proposal()) + .size()); + System.out.println(); + try (var stream = Files.newBufferedWriter(profilingFile)) { + var start = result.costTimeSeries().keySet().stream().mapToLong(x -> x).min().orElseThrow(); + Utils.packException(() -> stream.write("time, cost\n")); + result.costTimeSeries().entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach( + (e) -> { + var time = e.getKey(); + var cost = e.getValue(); + Utils.packException( + () -> stream.write(String.format("%d, %.7f%n", time - start, cost.value()))); + }); + } catch (IOException e) { + e.printStackTrace(); + } + + System.out.println("Run the plan? (yes/no)"); + while (true) { + var scanner = new Scanner(System.in); + String next = scanner.next(); + if (next.equals("yes")) { + System.out.println("Run the Plan"); + new StraightPlanExecutor(Configuration.EMPTY) + .run(admin, result.plan().orElseThrow().proposal(), Duration.ofHours(1)) + .toCompletableFuture() + .join(); + return; + } else if (next.equals("no")) { + return; + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Disabled + @Test + void testSaveScenario() { + try (Admin admin = Admin.of(realCluster)) { + var clusterInfo = + admin.topicNames(false).thenCompose(admin::clusterInfo).toCompletableFuture().join(); + Map costMap = + Map.of( + new NetworkIngressCost(Configuration.EMPTY), 3.0, + new NetworkEgressCost(Configuration.EMPTY), 3.0, + new ReplicaNumberCost(Configuration.EMPTY), 1.0); + + try (var metricStore = + MetricStore.builder() + .beanExpiration(Duration.ofSeconds(180)) + .sensorsSupplier( + () -> + costMap.keySet().stream() + .filter(x -> x.metricSensor().isPresent()) + .collect( + Collectors.toUnmodifiableMap( + x -> x.metricSensor().orElseThrow(), x -> (i0, i1) -> {}))) + .localReceiver( + () -> + admin + .brokers() + .thenApply( + (brokers) -> + brokers.stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + (Broker b) -> MBeanClient.jndi(b.host(), 16926))))) + .build()) { + var clusterBean = (ClusterBean) null; + var balancer = new GreedyBalancer(); + + while (!Thread.currentThread().isInterrupted()) { + clusterBean = metricStore.clusterBean(); + + System.out.println( + clusterBean.all().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap(Map.Entry::getKey, x -> x.getValue().size()))); + try { + var costFunction = HasClusterCost.of(costMap); + Optional offer = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(clusterInfo) + .clusterBean(clusterBean) + .clusterCost(costFunction) + .timeout(Duration.ofSeconds(10)) + .build()); + if (offer.isPresent()) { + System.out.println("Find one"); + break; + } + } catch (NoSufficientMetricsException e) { + System.out.println("No Plan, try later: " + e.getMessage()); + Utils.sleep(Duration.ofSeconds(3)); + } + } + + // save + try (var stream0 = new FileOutputStream(fileName0); + var stream1 = new FileOutputStream(fileName1)) { + System.out.println("Serialize ClusterInfo"); + ClusterInfoSerializer.serialize(clusterInfo, stream0); + System.out.println("Serialize ClusterBean"); + ClusterBeanSerializer.serialize(clusterBean, stream1); + } catch (IOException e) { + e.printStackTrace(); + } + + // load + try (var stream0 = new FileInputStream(fileName0); + var stream1 = new FileInputStream(fileName1)) { + System.out.println("Serialize ClusterInfo"); + ClusterInfo a = ClusterInfoSerializer.deserialize(stream0); + System.out.println("Serialize ClusterBean"); + ClusterBean b = ClusterBeanSerializer.deserialize(stream1); + System.out.println("Done!"); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @Disabled + @Test + void testDominantSort() { + int[] base = new int[] {1000,300,600}; + List collect = IntStream.range(0, 10000) + .mapToObj(i -> new int[]{ + ThreadLocalRandom.current().nextInt(0, 10000), + ThreadLocalRandom.current().nextInt(0, 500), + ThreadLocalRandom.current().nextInt(0, 900)}) + .collect(Collectors.toUnmodifiableList()); + + } +} diff --git a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java new file mode 100644 index 0000000000..5243714238 --- /dev/null +++ b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.astraea.common.Configuration; +import org.astraea.common.admin.Admin; +import org.astraea.it.Service; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BackboneImbalanceScenarioTest { + + @Test + void testApply() { + try (var service = Service.builder().numberOfBrokers(3).build()) { + try (Admin admin = Admin.of(service.bootstrapServers())) { + var scenario = new BackboneImbalanceScenario(); + var result = + scenario + .apply( + admin, + Configuration.of( + Map.ofEntries( + Map.entry(BackboneImbalanceScenario.CONFIG_TOPIC_COUNT, "100")))) + .toCompletableFuture() + .join(); + + Assertions.assertEquals(101, result.totalTopics()); + } + } + } + + @Test + void testSeedWorks() { + var scenario = new BackboneImbalanceScenario(); + var seed = ThreadLocalRandom.current().nextInt(); + var config = + Configuration.of( + Map.ofEntries( + Map.entry(BackboneImbalanceScenario.CONFIG_RANDOM_SEED, Integer.toString(seed)), + Map.entry(BackboneImbalanceScenario.CONFIG_TOPIC_COUNT, "50"))); + try (var service0 = Service.builder().numberOfBrokers(3).build(); + var service1 = Service.builder().numberOfBrokers(3).build(); ) { + try (Admin admin0 = Admin.of(service0.bootstrapServers()); + Admin admin1 = Admin.of(service1.bootstrapServers())) { + var result0 = scenario.apply(admin0, config).toCompletableFuture().join(); + var result1 = scenario.apply(admin1, config).toCompletableFuture().join(); + + Assertions.assertEquals(result0.topicDataRate(), result1.topicDataRate()); + Assertions.assertEquals(result0.topicDataRateHistogram(), result1.topicDataRateHistogram()); + } + } + } +} diff --git a/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java b/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java index 9e3477b9d6..ea718758dc 100644 --- a/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java +++ b/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.math3.distribution.BinomialDistribution; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.it.Service; @@ -58,7 +59,7 @@ void test(int partitions, short replicas) { var topicName = Utils.randomString(); var scenario = new SkewedPartitionScenario(topicName, partitions, replicas, 0.5); try (var admin = Admin.of(SERVICE.bootstrapServers())) { - var result = scenario.apply(admin).toCompletableFuture().join(); + var result = scenario.apply(admin, Configuration.EMPTY).toCompletableFuture().join(); Assertions.assertEquals(topicName, result.topicName()); Assertions.assertEquals(partitions, result.numberOfPartitions()); Assertions.assertEquals(replicas, result.numberOfReplicas()); diff --git a/common/src/main/java/org/astraea/common/backup/OldByteUtils.java b/common/src/main/java/org/astraea/common/backup/OldByteUtils.java new file mode 100644 index 0000000000..cc16cad756 --- /dev/null +++ b/common/src/main/java/org/astraea/common/backup/OldByteUtils.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.backup; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; +import org.astraea.common.Utils; + +public final class OldByteUtils { + + public static byte[] toBytes(short value) { + return new byte[] {(byte) (value >>> 8), (byte) value}; + } + + public static byte[] toBytes(int value) { + return new byte[] { + (byte) (value >>> 24), (byte) (value >>> 16), (byte) (value >>> 8), (byte) value + }; + } + + public static byte[] toBytes(long value) { + return new byte[] { + (byte) (value >>> 56), + (byte) (value >>> 48), + (byte) (value >>> 40), + (byte) (value >>> 32), + (byte) (value >>> 24), + (byte) (value >>> 16), + (byte) (value >>> 8), + (byte) value + }; + } + + public static byte[] toBytes(String value) { + return value.getBytes(StandardCharsets.UTF_8); + } + + public static byte[] toBytes(char value) { + return String.valueOf(value).getBytes(StandardCharsets.UTF_8); + } + + public static byte[] toBytes(float value) { + int intBits = Float.floatToIntBits(value); + return new byte[] { + (byte) (intBits >> 24), (byte) (intBits >> 16), (byte) (intBits >> 8), (byte) intBits + }; + } + + public static byte[] toBytes(double value) { + long longBits = Double.doubleToLongBits(value); + return new byte[] { + (byte) (longBits >> 56), + (byte) (longBits >> 48), + (byte) (longBits >> 40), + (byte) (longBits >> 32), + (byte) (longBits >> 24), + (byte) (longBits >> 16), + (byte) (longBits >> 8), + (byte) longBits + }; + } + + public static byte[] toBytes(boolean value) { + if (value) return new byte[] {1}; + return new byte[] {0}; + } + + public static int readInt(ReadableByteChannel channel) { + return Utils.packException( + () -> { + var buf = ByteBuffer.allocate(Integer.BYTES); + var size = channel.read(buf); + if (size != Integer.BYTES) + throw new IllegalStateException( + "The remaining size is " + size + ", but expected is " + Integer.BYTES); + return buf.flip().getInt(); + }); + } + + public static int readInt(InputStream fs) { + return Utils.packException( + () -> { + var byteArray = new byte[Integer.BYTES]; + var size = fs.read(byteArray); + if (size != Integer.BYTES) + throw new IllegalStateException( + "The remaining size is " + size + ", but expected is " + Integer.BYTES); + return ByteBuffer.wrap(byteArray).getInt(); + }); + } + + public static short readShort(ReadableByteChannel channel) { + return Utils.packException( + () -> { + var buf = ByteBuffer.allocate(Short.BYTES); + var size = channel.read(buf); + if (size != Short.BYTES) + throw new IllegalStateException( + "The remaining size is " + size + ", but expected is " + Short.BYTES); + return buf.flip().getShort(); + }); + } + + public static short readShort(InputStream fs) { + return Utils.packException( + () -> { + var byteArray = new byte[Short.BYTES]; + var size = fs.read(byteArray); + if (size != Short.BYTES) + throw new IllegalStateException( + "The remaining size is " + size + ", but expected is " + Short.BYTES); + return ByteBuffer.wrap(byteArray).getShort(); + }); + } + + public static String readString(ByteBuffer buffer, int size) { + if (size < 0) return null; + var dst = new byte[size]; + buffer.get(dst); + return new String(dst, StandardCharsets.UTF_8); + } + + /** + * @return null if the size is smaller than zero + */ + public static byte[] readBytes(ByteBuffer buffer, int size) { + if (size < 0) return null; + var dst = new byte[size]; + buffer.get(dst); + return dst; + } + + public static ByteBuffer of(short value) { + var buf = ByteBuffer.allocate(Short.BYTES); + buf.putShort(value); + return buf.flip(); + } + + public static ByteBuffer of(int value) { + var buf = ByteBuffer.allocate(Integer.BYTES); + buf.putInt(value); + return buf.flip(); + } + + public static void putLengthBytes(ByteBuffer buffer, byte[] value) { + if (value == null) buffer.putInt(-1); + else { + buffer.putInt(value.length); + buffer.put(ByteBuffer.wrap(value)); + } + } + + public static void putLengthString(ByteBuffer buffer, String value) { + if (value == null) buffer.putShort((short) -1); + else { + var valueByte = value.getBytes(StandardCharsets.UTF_8); + buffer.putShort((short) valueByte.length); + buffer.put(ByteBuffer.wrap(valueByte)); + } + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/BalancerUtils.java b/common/src/main/java/org/astraea/common/balancer/algorithms/BalancerUtils.java new file mode 100644 index 0000000000..409c529c41 --- /dev/null +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/BalancerUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer.algorithms; + +import java.util.Collection; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.astraea.common.admin.Replica; + +public final class BalancerUtils { + + private BalancerUtils() {} + + public static boolean eligiblePartition(Collection replicas) { + return Stream.>>of( + // only one replica and it is offline + r -> r.size() == 1 && r.stream().findFirst().orElseThrow().isOffline(), + // no leader + r -> r.stream().noneMatch(Replica::isLeader)) + .noneMatch(p -> p.test(replicas)); + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/NetworkBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/NetworkBalancer.java new file mode 100644 index 0000000000..1c1646ce12 --- /dev/null +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/NetworkBalancer.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer.algorithms; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.admin.ClusterInfoBuilder; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.TopicPartition; +import org.astraea.common.balancer.AlgorithmConfig; +import org.astraea.common.balancer.Balancer; +import org.astraea.common.cost.NetworkCost; +import org.astraea.common.cost.NetworkIngressCost; + +public class NetworkBalancer implements Balancer { + @Override + public Optional offer(AlgorithmConfig config) { + var clusterInfo = config.clusterInfo(); + var clusterBean = config.clusterBean(); + var networkCost = new NetworkIngressCost(Configuration.EMPTY); + + if (clusterInfo.topicPartitions().size() != clusterInfo.replicas().size()) + throw new IllegalArgumentException("NetworkBalancer doesn't support replica"); + + NetworkCost.NetworkClusterCost networkClusterCost = + (NetworkCost.NetworkClusterCost) networkCost.clusterCost(clusterInfo, clusterBean); + + var bandwidths = + clusterInfo.topicPartitions().stream() + .collect( + Collectors.toUnmodifiableMap( + tp -> tp, + tp -> + new Bandwidth( + networkClusterCost.partitionIngress.get(tp), + networkClusterCost.partitionEgress.get(tp)))); + + var brokers = + clusterInfo.brokers().stream() + .collect(Collectors.toUnmodifiableMap(NodeInfo::id, broker -> new Bandwidth(0, 0))); + var builder = ClusterInfoBuilder.builder(clusterInfo); + + bandwidths.entrySet().stream() + .collect( + Collectors.groupingBy( + e -> Math.round((double) e.getValue().egress / e.getValue().ingress))) + .entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .forEach( + e -> { + var partitions = e.getValue(); + + partitions.stream() + .sorted( + Comparator.>comparingLong( + ee -> ee.getValue().egress) + .reversed()) + .forEach( + ee -> { + var tp = ee.getKey(); + var bandwidth = ee.getValue(); + + brokers.entrySet().stream() + .min(Comparator.comparingLong(x -> x.getValue().egress)) + .ifPresent( + eee -> { + var toBroker = eee.getKey(); + var brokerBandwidth = eee.getValue(); + + brokerBandwidth.ingress += bandwidth.ingress; + brokerBandwidth.egress += bandwidth.egress; + + var replica = + clusterInfo.replicas(tp).get(0).topicPartitionReplica(); + var folders = + List.copyOf(clusterInfo.brokerFolders().get(toBroker)); + var toFolder = + folders.get( + ThreadLocalRandom.current().nextInt(folders.size())); + builder.reassignReplica(replica, toBroker, toFolder); + }); + }); + }); + + var sourceCost = config.clusterCostFunction().clusterCost(clusterInfo, clusterBean); + var targetClusterInfo = builder.build(); + var targetCost = config.clusterCostFunction().clusterCost(targetClusterInfo, clusterBean); + var moveCost = config.moveCostFunction().moveCost(clusterInfo, targetClusterInfo, clusterBean); + + System.out.println("NetworkBalancer Initial: " + sourceCost.value()); + System.out.println("NetworkBalancer Final: " + targetCost.value()); + return Optional.of(new Plan(clusterInfo, sourceCost, targetClusterInfo, targetCost, moveCost)); + } + + private static class Bandwidth { + long ingress; + long egress; + + public Bandwidth(long ingress, long egress) { + this.ingress = ingress; + this.egress = egress; + } + + @Override + public String toString() { + return "Bandwidth{" + "ingress=" + ingress + ", egress=" + egress + '}'; + } + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/ResourceBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/ResourceBalancer.java new file mode 100644 index 0000000000..85bf333c24 --- /dev/null +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/ResourceBalancer.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer.algorithms; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterBean; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartition; +import org.astraea.common.balancer.AlgorithmConfig; +import org.astraea.common.balancer.Balancer; +import org.astraea.common.cost.CompositeClusterCost; +import org.astraea.common.cost.ResourceCapacity; +import org.astraea.common.cost.ResourceUsage; +import org.astraea.common.cost.ResourceUsageHint; + +public class ResourceBalancer implements Balancer { + + @Override + public Optional offer(AlgorithmConfig config) { + var runtime = config.timeout().toMillis() + System.currentTimeMillis(); + var initialClusterInfo = config.clusterInfo(); + var initialCost = + config.clusterCostFunction().clusterCost(config.clusterInfo(), config.clusterBean()); + + var algorithm = new AlgorithmContext(config, runtime); + var proposalClusterInfo = algorithm.execute(); + var proposalCost = + config.clusterCostFunction().clusterCost(proposalClusterInfo, config.clusterBean()); + var moveCost = + config + .moveCostFunction() + .moveCost(initialClusterInfo, proposalClusterInfo, config.clusterBean()); + + if (proposalCost.value() > initialCost.value() || moveCost.overflow()) return Optional.empty(); + else + return Optional.of( + new Plan(initialClusterInfo, initialCost, proposalClusterInfo, proposalCost, moveCost)); + } + + static class AlgorithmContext { + + private final AlgorithmConfig config; + private final ClusterInfo sourceCluster; + private final ClusterBean clusterBean; + + private final Set usageHints; + private final List resourceCapacities; + private final List orderedReplicas; + private final Predicate feasibleUsage; + + private final long deadline; + + private AlgorithmContext(AlgorithmConfig config, long deadline) { + this.config = config; + this.sourceCluster = config.clusterInfo(); + this.clusterBean = config.clusterBean(); + this.deadline = deadline; + + // hints to estimate the resource usage of replicas + this.usageHints = + Stream.concat( + CompositeClusterCost.decompose(config.clusterCostFunction()).stream(), + config.moveCostFunction().resourceUsageHint().stream()) + .filter(func -> func instanceof ResourceUsageHint) + .map(func -> (ResourceUsageHint) func) + .collect(Collectors.toUnmodifiableSet()); + + + // hints for the capacity of each cluster resource + this.resourceCapacities = + usageHints.stream() + .map(hint -> hint.evaluateClusterResourceCapacity(sourceCluster, clusterBean)) + .flatMap(Collection::stream) + .collect(Collectors.toUnmodifiableList()); + + // replicas are ordered by their resource usage, we tweak the most heavy resource first + // TODO: add support for balancer.allowed.topics.regex + // TODO: add support for balancer.allowed.brokers.regex + this.orderedReplicas = + sourceCluster.topicPartitions().stream() + .filter(tp -> BalancerUtils.eligiblePartition(sourceCluster.replicas(tp))) + .flatMap(tp -> sourceCluster.replicas(tp).stream()) + .sorted( + usageDominationComparator( + resourceCapacities, + (r) -> { + var resource = new ResourceUsage(); + usageHints.stream() + .map( + hint -> + hint.evaluateReplicaResourceUsage(sourceCluster, clusterBean, r)) + .forEach(rrr -> resource.mergeUsage(rrr)); + return resource; + })) + .collect(Collectors.toUnmodifiableList()); + + this.feasibleUsage = + this.resourceCapacities.stream() + .map(ResourceCapacity::usageValidnessPredicate) + .reduce(Predicate::and) + .orElse((u) -> true); + } + + ClusterInfo execute() { + var clusterResourceUsage = new ResourceUsage(); + sourceCluster.replicas().stream() + .flatMap(this::evaluateReplicaUsage) + .forEach(clusterResourceUsage::mergeUsage); + + var bestAllocation = new AtomicReference(); + var bestAllocationScore = new AtomicReference(); + Consumer> updateAnswer = + (replicas) -> { + var newCluster = + ClusterInfo.of( + sourceCluster.clusterId(), + sourceCluster.nodes(), + sourceCluster.topics(), + replicas); + var clusterCost = config.clusterCostFunction().clusterCost(newCluster, clusterBean); + var moveCost = + config.moveCostFunction().moveCost(sourceCluster, newCluster, clusterBean); + + // if movement constraint failed, reject answer + if (moveCost.overflow()) { + System.out.println("Overflow Score: " + clusterCost.value()); + return; + } + // if cluster cost is better, accept answer + if (bestAllocationScore.get() == null || clusterCost.value() < bestAllocationScore.get()) { + bestAllocation.set(newCluster); + bestAllocationScore.set(clusterCost.value()); + System.out.println("New Best Score: " + bestAllocationScore.get()); + System.out.println("New Best Cost: " + clusterCost); + } else { + System.out.println("New Score: " + clusterCost.value()); + System.out.println(clusterCost); + } + }; + + // TODO: the recursion might overflow the stack under large number of replicas. use stack + // instead. + var currentAllocation = sourceCluster.topicPartitions() + .stream() + .collect(Collectors.toUnmodifiableMap( + tp -> tp, + tp -> (List) new ArrayList<>(sourceCluster.replicas(tp)))); + search(updateAnswer, 0, orderedReplicas, currentAllocation, clusterResourceUsage); + + return bestAllocation.get(); + } + + private int trials(int level) { + // TODO: customize this + if (0 <= level && level < 3) return 8; + if (level < 6) return 2; + else return 1; + } + + private void search( + Consumer> updateAnswer, + int next, + List originalReplicas, + Map> currentAllocation, + ResourceUsage currentResourceUsage) { + if (System.currentTimeMillis() > deadline) + return; + if (originalReplicas.size() == next) { + // if this is a complete answer, call update function and return + updateAnswer.accept( + currentAllocation.entrySet().stream() + .flatMap(x -> x.getValue().stream()) + .collect(Collectors.toUnmodifiableList())); + } else { + var nextReplica = originalReplicas.get(next); + + List> possibleTweaks = + tweaks(currentAllocation, nextReplica).stream() + .map( + tweaks -> { + var usageAfterTweaked = new ResourceUsage(currentResourceUsage.usage()); + tweaks.toRemove.stream() + .flatMap(this::evaluateReplicaUsage) + .forEach(usageAfterTweaked::removeUsage); + tweaks.toReplace.stream() + .flatMap(this::evaluateReplicaUsage) + .forEach(usageAfterTweaked::mergeUsage); + + return Map.entry(usageAfterTweaked, tweaks); + }) + .filter(e -> feasibleUsage.test(e.getKey())) + .sorted(Map.Entry.comparingByKey(usageIdealnessDominationComparator(currentResourceUsage, this.resourceCapacities))) + // TODO: maybe change to probability style + .limit(trials(next)) + .collect(Collectors.toUnmodifiableList()); + + for (Map.Entry entry : possibleTweaks) { + // the tweak we are going to use + var newResourceUsage = entry.getKey(); + var tweaks = entry.getValue(); + + // replace the replicas + tweaks.toRemove.stream() + .filter(replica -> !currentAllocation.get(replica.topicPartition()).remove(replica)) + .forEach( + nonexistReplica -> { + throw new IllegalStateException( + "Attempt to remove " + + nonexistReplica.topicPartitionReplica() + + " but it does not exists"); + }); + tweaks.toReplace.forEach( + replica -> currentAllocation.get(replica.topicPartition()).add(replica)); + + // start next search stage + search(updateAnswer, next + 1, originalReplicas, currentAllocation, newResourceUsage); + + // undo the tweak, restore the previous state + tweaks.toReplace.stream() + .filter(replica -> !currentAllocation.get(replica.topicPartition()).remove(replica)) + .forEach( + nonexistReplica -> { + throw new IllegalStateException( + "Attempt to remove " + + nonexistReplica.topicPartitionReplica() + + " but it does not exists"); + }); + tweaks.toRemove.forEach( + replica -> currentAllocation.get(replica.topicPartition()).add(replica)); + } + } + } + + private List tweaks( + Map> currentAllocation, Replica replica) { + // 1. no change + var noMovement = List.of(new Tweak(List.of(), List.of())); + + // 2. leadership change + var leadership = + currentAllocation.get(replica.topicPartition()).stream() + .filter(r -> r.isPreferredLeader() != replica.isPreferredLeader()) + .map( + switchTarget -> { + var toRemove = List.of(replica, switchTarget); + var toReplace = + List.of( + Replica.builder(replica) + .isLeader(!replica.isPreferredLeader()) + .isPreferredLeader(!replica.isPreferredLeader()) + .build(), + Replica.builder(switchTarget) + .isLeader(replica.isPreferredLeader()) + .isPreferredLeader(replica.isPreferredLeader()) + .build()); + + return new Tweak(toRemove, toReplace); + }) + .collect(Collectors.toUnmodifiableList()); + + // 3. move to other data-dir at the same broker + var dataFolderMovement = + this.sourceCluster.brokerFolders().get(replica.nodeInfo().id()).stream() + .filter(folder -> !folder.equals(replica.path())) + .map( + newFolder -> + new Tweak( + List.of(replica), + List.of(Replica.builder(replica).path(newFolder).build()))) + .collect(Collectors.toUnmodifiableList()); + + // 4. move to other brokers/data-dirs + var interBrokerMovement = + this.sourceCluster.brokers().stream() + .filter(b -> b.id() != replica.nodeInfo().id()) + .flatMap( + b -> + b.dataFolders().stream() + // TODO: add data folder back once the framework is ready to deduplicate the similar resource usage among tweaks + .limit(1) + .map( + folder -> + new Tweak( + List.of(replica), + List.of( + Replica.builder(replica) + .nodeInfo(b) + .path(folder.path()) + .build())))) + .collect(Collectors.toUnmodifiableList()); + + // TODO: add data folder back once the framework is ready to deduplicate the similar resource usage among tweaks + return Stream.of(noMovement, leadership, interBrokerMovement) + .flatMap(Collection::stream) + .collect(Collectors.toUnmodifiableList()); + } + + private Stream evaluateReplicaUsage(Replica replica) { + return this.usageHints.stream() + .map( + hint -> + hint.evaluateClusterResourceUsage(sourceCluster, clusterBean, replica)); + } + + // `static Comparator usageDominationComparator( + // ` Function usageHints) { + // ` // TODO: implement the actual dominant sort + // ` return (lhs, rhs) -> { + // ` // var resourceL = usageHints.apply(lhs); + // ` // var resourceR = usageHints.apply(rhs); + + // ` // var dominatedByL = + // ` // resourceL.usage().entrySet().stream() + // ` // .filter(e -> e.getValue() > resourceR.usage().getOrDefault(e.getKey(), 0.0)) + // ` // .count(); + // ` // var dominatedByR = + // ` // resourceR.usage().entrySet().stream() + // ` // .filter(e -> e.getValue() > resourceL.usage().getOrDefault(e.getKey(), 0.0)) + // ` // .count(); + + // ` // // reverse the order intentionally, we want the most dominated replica at the beginning of + // ` // // list. + // ` // int compare = Long.compare(dominatedByL, dominatedByR); + // ` // return -compare; + + // ` double lsum = usageHints.apply(lhs).usage().values().stream().mapToDouble(x -> x).sum(); + // ` double rsum = usageHints.apply(rhs).usage().values().stream().mapToDouble(x -> x).sum(); + // ` return -Double.compare(lsum, rsum); + // ` }; + // `} + + static Comparator usageDominationComparator( + List resourceCapacities, Function usageHints) { + var cmp = Comparator.comparingDouble(u -> resourceCapacities.stream() + .mapToDouble(c -> c.idealness(u)) + .average() + .orElseThrow()); + + return Comparator.comparing(usageHints, cmp).reversed(); + } + + static Comparator usageIdealnessDominationComparator(List resourceCapacities) { + var comparators = + resourceCapacities.stream() + .map(ResourceCapacity::usageIdealnessComparator) + .collect(Collectors.toUnmodifiableSet()); + + return (lhs, rhs) -> { + // TODO: change this logic + var dominatedByL = comparators.stream().filter(e -> e.compare(lhs, rhs) < 0).count(); + var dominatedByR = comparators.stream().filter(e -> e.compare(rhs, lhs) < 0).count(); + + return -Long.compare(dominatedByL, dominatedByR); + }; + } + + static Comparator usageIdealnessDominationComparator(ResourceUsage base, List resourceCapacities) { + var comparators = + resourceCapacities.stream() + .map(ResourceCapacity::usageIdealnessComparator) + .collect(Collectors.toUnmodifiableSet()); + + Comparator dominatedCmp = (lhs, rhs) -> { + var dominatedByL = comparators.stream().filter(e -> e.compare(lhs, rhs) <= 0).count(); + var dominatedByR = comparators.stream().filter(e -> e.compare(rhs, lhs) <= 0).count(); + + return -Long.compare(dominatedByL, dominatedByR); + }; + + // return usageIdealnessDominationComparator(resourceCapacities) + // .thenComparingDouble(usage -> resourceCapacities.stream() + // .mapToDouble(ca -> ca.idealness(usage)) + // .average() + // .orElseThrow()); + return dominatedCmp.thenComparingDouble(usage -> resourceCapacities.stream() + .mapToDouble(ca -> ca.idealness(usage)) + .average() + .orElseThrow()); + } + + + // static Comparator usageIdealnessDominationComparator(ResourceUsage baseUsage, List resourceCapacities) { + // var comparators = + // resourceCapacities.stream() + // .map(ResourceCapacity::usageIdealnessComparator) + // .collect(Collectors.toUnmodifiableSet()); + // var baseIdealness = resourceCapacities.stream() + // .collect(Collectors.toUnmodifiableMap( + // ResourceCapacity::resourceName, + // c -> c.idealness(baseUsage))); + + // return (lhs, rhs) -> { + // var idealnessVectorL = resourceCapacities.stream() + // .collect(Collectors.toUnmodifiableMap( + // ResourceCapacity::resourceName, + // c -> c.idealness(lhs) - baseIdealness.get(c.resourceName()))); + // var idealnessVectorR = resourceCapacities.stream() + // .collect(Collectors.toUnmodifiableMap( + // ResourceCapacity::resourceName, + // c -> c.idealness(rhs) - baseIdealness.get(c.resourceName()))); + + // var sumL = idealnessVectorL.entrySet() + // .stream() + // .mapToDouble(e -> e.getKey().startsWith("Network") ? e.getValue() / 6.0 : e.getValue()) + // .sum(); + // var sumR = idealnessVectorR.entrySet() + // .stream() + // .mapToDouble(e -> e.getKey().startsWith("Network") ? e.getValue() / 6.0 : e.getValue()) + // .sum(); + + // return Double.compare(sumL, sumR); + // }; + // } + } + + private static class Tweak { + private final List toRemove; + private final List toReplace; + + private Tweak(List toRemove, List toReplace) { + this.toRemove = toRemove; + this.toReplace = toReplace; + } + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 1029da896e..2052d3a120 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -30,6 +30,7 @@ import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; +import org.astraea.common.balancer.algorithms.BalancerUtils; /** * The {@link ShuffleTweaker} proposes a new log placement based on the current log placement, but @@ -84,7 +85,7 @@ public Stream generate(ClusterInfo baseAllocation) { final var finalCluster = ClusterInfoBuilder.builder(baseAllocation); for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) { final var tp = partitionOrder.get(i); - if (!eligiblePartition(baseAllocation.replicas(tp))) continue; + if (!BalancerUtils.eligiblePartition(baseAllocation.replicas(tp))) continue; switch (Operation.random()) { case LEADERSHIP_CHANGE: { @@ -143,15 +144,6 @@ private static T randomElement(Collection collection) { .orElseThrow(); } - private static boolean eligiblePartition(Collection replicas) { - return Stream.>>of( - // only one replica and it is offline - r -> r.size() == 1 && r.stream().findFirst().orElseThrow().isOffline(), - // no leader - r -> r.stream().noneMatch(Replica::isLeader)) - .noneMatch(p -> p.test(replicas)); - } - enum Operation implements EnumInfo { LEADERSHIP_CHANGE, REPLICA_LIST_CHANGE; diff --git a/common/src/main/java/org/astraea/common/consumer/DDeserializer.java b/common/src/main/java/org/astraea/common/consumer/DDeserializer.java new file mode 100644 index 0000000000..26129b1839 --- /dev/null +++ b/common/src/main/java/org/astraea/common/consumer/DDeserializer.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.consumer; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.astraea.common.Header; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Config; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.Topic; +import org.astraea.common.admin.TopicPartition; +import org.astraea.common.backup.OldByteUtils; +import org.astraea.common.json.JsonConverter; +import org.astraea.common.json.TypeRef; +import org.astraea.common.metrics.BeanObject; + +@FunctionalInterface +public interface DDeserializer { + + /** + * Deserialize a record value from a byte array into a value or object. + * + * @param topic topic associated with the data + * @param headers headers associated with the record; may be empty. + * @param data serialized bytes; may be null; implementations are recommended to handle null by + * returning a value or null rather than throwing an exception. + * @return deserialized typed data; may be null + */ + T deserialize(String topic, List
headers, byte[] data); + + static org.apache.kafka.common.serialization.Deserializer of( + DDeserializer deserializer) { + return new org.apache.kafka.common.serialization.Deserializer<>() { + + @Override + public T deserialize(String topic, byte[] data) { + return deserializer.deserialize(topic, List.of(), data); + } + + @Override + public T deserialize(String topic, Headers headers, byte[] data) { + return deserializer.deserialize(topic, Header.of(headers), data); + } + }; + } + + private static DDeserializer of( + org.apache.kafka.common.serialization.Deserializer deserializer) { + // the headers are not used by primitive type deserializer + return (topic, headers, data) -> deserializer.deserialize(topic, data); + } + + DDeserializer BASE64 = + (topic, headers, data) -> data == null ? null : Base64.getEncoder().encodeToString(data); + DDeserializer BYTE_ARRAY = of(new ByteArrayDeserializer()); + DDeserializer STRING = of(new StringDeserializer()); + DDeserializer INTEGER = of(new IntegerDeserializer()); + DDeserializer LONG = of(new LongDeserializer()); + DDeserializer FLOAT = of(new FloatDeserializer()); + DDeserializer DOUBLE = of(new DoubleDeserializer()); + DDeserializer BEAN_OBJECT = new BeanDeserializer(); + DDeserializer NODE_INFO = new NodeInfoDeserializer(); + DDeserializer TOPIC = new TopicDeserializer(); + DDeserializer REPLICA = new ReplicaDeserializer(); + DDeserializer CLUSTER_INFO = new ClusterInfoDeserializer(); + + /** + * create Custom JsonDeserializer + * + * @param typeRef The typeRef of message being output by the Deserializer + * @return Custom JsonDeserializer + * @param The type of message being output by the Deserializer + */ + static DDeserializer of(TypeRef typeRef) { + return new JsonDeserializer<>(typeRef); + } + + class JsonDeserializer implements DDeserializer { + private final TypeRef typeRef; + private final JsonConverter jackson = JsonConverter.jackson(); + + private JsonDeserializer(TypeRef typeRef) { + this.typeRef = typeRef; + } + + @Override + public T deserialize(String topic, List
headers, byte[] data) { + if (data == null) return null; + else { + return jackson.fromJson(DDeserializer.STRING.deserialize(topic, headers, data), typeRef); + } + } + } + + /** + * Deserialize byte arrays to string and then parse the string to `BeanObject`. It is inverse of + * BeanObject.toString().getBytes(). TODO: Should be replaced by protoBuf + */ + class BeanDeserializer implements DDeserializer { + @Override + public BeanObject deserialize(String topic, List
headers, byte[] data) { + var beanString = new String(data); + Pattern p = + Pattern.compile("\\[(?[^:]*):(?[^]]*)]\n\\{(?[^}]*)}"); + Matcher m = p.matcher(beanString); + if (!m.matches()) return null; + var domain = m.group("domain"); + var propertiesPairs = m.group("properties").split("[, ]"); + var attributesPairs = m.group("attributes").split("[, ]"); + var properties = + Arrays.stream(propertiesPairs) + .map(kv -> kv.split("=")) + .filter(kv -> kv.length >= 2) + .collect(Collectors.toUnmodifiableMap(kv -> kv[0], kv -> kv[1])); + var attributes = + Arrays.stream(attributesPairs) + .map(kv -> kv.split("=")) + .filter(kv -> kv.length >= 2) + .collect(Collectors.toUnmodifiableMap(kv -> kv[0], kv -> (Object) kv[1])); + return new BeanObject(domain, properties, attributes); + } + } + + class NodeInfoDeserializer implements DDeserializer { + @Override + public NodeInfo deserialize(String topic, List
headers, byte[] data) { + var buffer = ByteBuffer.wrap(data); + var id = buffer.getInt(); + var host = OldByteUtils.readString(buffer, buffer.getShort()); + var port = buffer.getInt(); + return NodeInfo.of(id, host, port); + } + } + + class TopicDeserializer implements DDeserializer { + + @Override + public Topic deserialize(String topic, List
headers, byte[] data) { + var buffer = ByteBuffer.wrap(data); + var name = OldByteUtils.readString(buffer, buffer.getShort()); + var config = + IntStream.range(0, buffer.getInt()) + .boxed() + .collect( + Collectors.toMap( + i -> OldByteUtils.readString(buffer, buffer.getShort()), + i -> OldByteUtils.readString(buffer, buffer.getShort()))); + var internal = buffer.get() != 0; + var topicPartitions = + IntStream.range(0, buffer.getInt()) + .mapToObj(i -> TopicPartition.of(name, buffer.getInt())) + .collect(Collectors.toSet()); + return new Topic() { + @Override + public String name() { + return name; + } + + @Override + public Config config() { + return Config.of(config); + } + + @Override + public boolean internal() { + return internal; + } + + @Override + public Set topicPartitions() { + return topicPartitions; + } + }; + } + } + + class ReplicaDeserializer implements DDeserializer { + + @Override + public Replica deserialize(String topic, List
headers, byte[] data) { + var buffer = ByteBuffer.wrap(data); + var topicName = OldByteUtils.readString(buffer, buffer.getShort()); + var partition = buffer.getInt(); + var nodeInfoData = new byte[buffer.getInt()]; + buffer.get(nodeInfoData); + var nodeInfo = DDeserializer.NODE_INFO.deserialize(topic, headers, nodeInfoData); + var lag = buffer.getLong(); + var size = buffer.getLong(); + var isLeader = buffer.get() != 0; + var isSync = buffer.get() != 0; + var isFuture = buffer.get() != 0; + var isOffline = buffer.get() != 0; + var isPreferredLeader = buffer.get() != 0; + var path = OldByteUtils.readString(buffer, buffer.getShort()); + return Replica.builder() + .topic(topicName) + .partition(partition) + .nodeInfo(nodeInfo) + .lag(lag) + .size(size) + .isLeader(isLeader) + .isSync(isSync) + .isFuture(isFuture) + .isOffline(isOffline) + .isPreferredLeader(isPreferredLeader) + .path(path) + .build(); + } + } + + class ClusterInfoDeserializer implements DDeserializer { + + @Override + public ClusterInfo deserialize(String topic, List
headers, byte[] data) { + var buffer = ByteBuffer.wrap(data); + var clusterId = OldByteUtils.readString(buffer, buffer.getShort()); + var nodes = + IntStream.range(0, buffer.getInt()) + .mapToObj( + i -> { + var nodeInfoData = new byte[buffer.getInt()]; + buffer.get(nodeInfoData); + var node = DDeserializer.NODE_INFO.deserialize(topic, headers, nodeInfoData); + + return new Broker() { + @Override + public boolean isController() { + return false; + } + + @Override + public Config config() { + return null; + } + + @Override + public List dataFolders() { + return Stream.of( + "/tmp/log-folder-0", "/tmp/log-folder-1", "/tmp/log-folder-2") + .map( + path -> + new DataFolder() { + @Override + public String path() { + return path; + } + + @Override + public Map partitionSizes() { + return null; + } + + @Override + public Map orphanPartitionSizes() { + return null; + } + }) + .collect(Collectors.toUnmodifiableList()); + } + + @Override + public Set topicPartitions() { + return null; + } + + @Override + public Set topicPartitionLeaders() { + return null; + } + + @Override + public String host() { + return node.host(); + } + + @Override + public int port() { + return node.port(); + } + + @Override + public int id() { + return node.id(); + } + }; + }) + .map(broker -> (NodeInfo) broker) + .collect(Collectors.toUnmodifiableList()); + var topics = + IntStream.range(0, buffer.getInt()) + .mapToObj( + i -> { + var topicData = new byte[buffer.getInt()]; + buffer.get(topicData); + return DDeserializer.TOPIC.deserialize(topic, headers, topicData); + }) + .collect(Collectors.toMap(Topic::name, s -> s)); + var replicas = + IntStream.range(0, buffer.getInt()) + .mapToObj( + i -> { + var replicaData = new byte[buffer.getInt()]; + buffer.get(replicaData); + return DDeserializer.REPLICA.deserialize(topic, headers, replicaData); + }) + .collect(Collectors.toList()); + return ClusterInfo.of(clusterId, nodes, topics, replicas); + } + } +} diff --git a/common/src/main/java/org/astraea/common/cost/CompositeClusterCost.java b/common/src/main/java/org/astraea/common/cost/CompositeClusterCost.java new file mode 100644 index 0000000000..8068cf8c2e --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/CompositeClusterCost.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.Collection; +import java.util.Collections; + +public interface CompositeClusterCost extends HasClusterCost { + + static Collection decompose(HasClusterCost costFunction) { + if (costFunction instanceof CompositeClusterCost) + return ((CompositeClusterCost) costFunction).functions(); + else return Collections.singleton(costFunction); + } + + Collection functions(); +} diff --git a/common/src/main/java/org/astraea/common/cost/HasClusterCost.java b/common/src/main/java/org/astraea/common/cost/HasClusterCost.java index 256e39651e..4e028c7b91 100644 --- a/common/src/main/java/org/astraea/common/cost/HasClusterCost.java +++ b/common/src/main/java/org/astraea/common/cost/HasClusterCost.java @@ -16,6 +16,10 @@ */ package org.astraea.common.cost; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -27,7 +31,7 @@ @FunctionalInterface public interface HasClusterCost extends CostFunction { - static HasClusterCost of(Map costAndWeight) { + static CompositeClusterCost of(Map costAndWeight) { var sensor = MetricSensor.of( costAndWeight.keySet().stream() @@ -36,7 +40,8 @@ static HasClusterCost of(Map costAndWeight) { .map(Optional::get) .collect(Collectors.toUnmodifiableList())); - return new HasClusterCost() { + return new CompositeClusterCost() { + @Override public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var scores = @@ -78,6 +83,21 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) }); } + @Override + public Collection functions() { + var queue = new LinkedList(costAndWeight.keySet()); + var functions = new ArrayList(); + + while (!queue.isEmpty()) { + var next = queue.pop(); + if (next instanceof CompositeClusterCost) + functions.addAll(((CompositeClusterCost) next).functions()); + else functions.add(next); + } + + return List.copyOf(functions); + } + @Override public Optional metricSensor() { return sensor; diff --git a/common/src/main/java/org/astraea/common/cost/HasMoveCost.java b/common/src/main/java/org/astraea/common/cost/HasMoveCost.java index 8c3981ec61..dcce43a3b1 100644 --- a/common/src/main/java/org/astraea/common/cost/HasMoveCost.java +++ b/common/src/main/java/org/astraea/common/cost/HasMoveCost.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; @@ -53,6 +54,13 @@ public Optional metricSensor() { return sensor; } + @Override + public Set resourceUsageHint() { + return hasMoveCosts.stream() + .flatMap(x -> x.resourceUsageHint().stream()) + .collect(Collectors.toUnmodifiableSet()); + } + @Override public String toString() { return "MoveCosts[" @@ -73,4 +81,11 @@ public String toString() { * @return the score of migrate cost */ MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean); + + default Set resourceUsageHint() { + if(this instanceof ResourceUsageHint) + return Set.of((ResourceUsageHint) this); + else + return Set.of(); + } } diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java index d5213ecd15..3984bbd5ab 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java @@ -22,8 +22,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.astraea.common.Configuration; @@ -35,6 +37,7 @@ import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; +import org.astraea.common.admin.TopicPartitionReplica; import org.astraea.common.cost.utils.ClusterInfoSensor; import org.astraea.common.metrics.HasBeanObject; import org.astraea.common.metrics.broker.LogMetrics; @@ -69,9 +72,13 @@ * this cost with metrics from the servers in steady state. * */ -public abstract class NetworkCost implements HasClusterCost { +public abstract class NetworkCost implements HasClusterCost, ResourceUsageHint { public static final String NETWORK_COST_ESTIMATION_METHOD = "network.cost.estimation.method"; + public static final String NETWORK_COST_REPLICA_RESOURCE_PREFIX_INGRESS = "NetworkIngress"; + public static final String NETWORK_COST_REPLICA_RESOURCE_PREFIX_EGRESS = "NetworkEgress"; + public static final String NETWORK_COST_BROKER_RESOURCE_PREFIX_INGRESS = "NetworkIngress_Broker_"; + public static final String NETWORK_COST_BROKER_RESOURCE_PREFIX_EGRESS = "NetworkEgress_Broker_"; private final EstimationMethod estimationMethod; private final BandwidthType bandwidthType; @@ -187,7 +194,11 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) // reason to do this. double score = (summary.getMax() - summary.getMin()) / (maxRate); - return new NetworkClusterCost(score, brokerRate); + return new NetworkClusterCost( + score, + brokerRate, + cachedCalculation.partitionIngressRate, + cachedCalculation.partitionEgressRate); } @Override @@ -207,6 +218,56 @@ public Optional metricSensor() { .collect(Collectors.toUnmodifiableList())); } + double evaluateIngressResourceUsage(ClusterBean clusterBean, TopicPartitionReplica target) { + final var cachedCalculation = + calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new); + return cachedCalculation.partitionIngressRate.get(target.topicPartition()).doubleValue(); + } + + double evaluateEgressResourceUsage(ClusterBean clusterBean, TopicPartitionReplica target) { + final var cachedCalculation = + calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new); + return cachedCalculation.partitionEgressRate.get(target.topicPartition()).doubleValue(); + } + + Collection evaluateIngressResourceCapacity( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + final var cachedCalculation = + calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new); + final var sumIngress = + cachedCalculation.partitionIngressRate.values().stream().mapToDouble(x -> x).sum(); + final var avgIngressPerBroker = sumIngress / clusterInfo.brokers().size(); + + Set a = clusterInfo.brokers().stream() + .map( + broker -> + new NetworkResourceCapacity( + NETWORK_COST_BROKER_RESOURCE_PREFIX_INGRESS + broker.id(), avgIngressPerBroker)) + .collect(Collectors.toSet()); + + a.add(new NetworkResourceCapacity(NETWORK_COST_REPLICA_RESOURCE_PREFIX_INGRESS, 0)); + return a; + } + + Collection evaluateEgressResourceCapacity( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + final var cachedCalculation = + calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new); + final var sumEgress = + cachedCalculation.partitionEgressRate.values().stream().mapToDouble(x -> x).sum(); + final var avgEgressPerBroker = sumEgress / clusterInfo.brokers().size(); + + Set a = clusterInfo.brokers().stream() + .map( + broker -> + new NetworkResourceCapacity( + NETWORK_COST_BROKER_RESOURCE_PREFIX_EGRESS + broker.id(), avgEgressPerBroker)) + .collect(Collectors.toSet()); + + a.add(new NetworkResourceCapacity(NETWORK_COST_REPLICA_RESOURCE_PREFIX_EGRESS, 0)); + return a; + } + private Map> mapLeaderAllocation(ClusterInfo clusterInfo) { return clusterInfo .replicaStream() @@ -347,16 +408,38 @@ private CachedCalculation(ClusterBean sourceMetric) { estimateRate(metricViewCluster, sourceMetric, ServerMetrics.Topic.BYTES_IN_PER_SEC); this.partitionEgressRate = estimateRate(metricViewCluster, sourceMetric, ServerMetrics.Topic.BYTES_OUT_PER_SEC); + // try (var stream = new FileOutputStream("/home/garyparrot/bandwidth.cvs")) { + // metricViewCluster.topicPartitions().forEach(tp -> { + // var ingress = partitionIngressRate.getOrDefault(tp, 0L); + // var egress = partitionEgressRate.getOrDefault(tp, 0L); + // try { + // stream.write(String.format("%s %d %d%n", tp.toString(), ingress, egress).getBytes()); + // } catch (IOException e) { + // throw new RuntimeException(e); + // } + // }); + // } catch (IOException e) { + // e.printStackTrace(); + // } + // Runtime.getRuntime().exit(0); } } - static class NetworkClusterCost implements ClusterCost { - final double score; - final Map brokerRate; + public static class NetworkClusterCost implements ClusterCost { + public final double score; + public final Map brokerRate; + public final Map partitionIngress; + public final Map partitionEgress; - NetworkClusterCost(double score, Map brokerRate) { + NetworkClusterCost( + double score, + Map brokerRate, + Map partitionIngress, + Map partitionEgress) { this.score = score; this.brokerRate = brokerRate; + this.partitionIngress = partitionIngress; + this.partitionEgress = partitionEgress; } public double value() { @@ -371,4 +454,40 @@ public String toString() { .collect(Collectors.joining(", ", "{", "}")); } } + + static class NetworkResourceCapacity implements ResourceCapacity { + + private final String resourceName; + private final double optimal; + + NetworkResourceCapacity(String resourceName, double optimal) { + this.resourceName = resourceName; + this.optimal = optimal; + } + + @Override + public String resourceName() { + return resourceName; + } + + @Override + public double optimalUsage() { + return optimal; + } + + @Override + public double idealness(ResourceUsage usage) { + return Math.abs(usage.usage().getOrDefault(resourceName, 0.0) - optimal) / 1e12; + } + + @Override + public Comparator usageIdealnessComparator() { + return Comparator.comparingDouble(ru -> Math.abs(ru.usage().getOrDefault(resourceName, 0.0) - optimal)); + } + + @Override + public Predicate usageValidnessPredicate() { + return ignore -> true; + } + } } diff --git a/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java b/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java index dfa27d86c9..240c9fb489 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java @@ -16,7 +16,13 @@ */ package org.astraea.common.cost; +import java.util.Collection; +import java.util.Map; import org.astraea.common.Configuration; +import org.astraea.common.admin.ClusterBean; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionReplica; /** * A cost function to evaluate cluster load balance score in terms of message egress data rate. See @@ -31,4 +37,26 @@ public NetworkEgressCost(Configuration config) { public String toString() { return this.getClass().getSimpleName(); } + + @Override + public ResourceUsage evaluateClusterResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + double value = this.evaluateEgressResourceUsage(clusterBean, target.topicPartitionReplica()); + return new ResourceUsage( + Map.of(NetworkCost.NETWORK_COST_BROKER_RESOURCE_PREFIX_EGRESS + target.nodeInfo().id(), value)); + } + + @Override + public ResourceUsage evaluateReplicaResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + double value = this.evaluateEgressResourceUsage(clusterBean, target.topicPartitionReplica()); + return new ResourceUsage( + Map.of(NetworkCost.NETWORK_COST_REPLICA_RESOURCE_PREFIX_EGRESS, value)); + } + + @Override + public Collection evaluateClusterResourceCapacity( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + return this.evaluateEgressResourceCapacity(clusterInfo, clusterBean); + } } diff --git a/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java b/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java index 8d9145ce5c..d9e2d2aaf0 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java @@ -16,6 +16,7 @@ */ package org.astraea.common.cost; +import java.util.Collection; import java.util.Comparator; import java.util.Map; import java.util.Set; @@ -26,6 +27,7 @@ import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; +import org.astraea.common.admin.TopicPartitionReplica; import org.astraea.common.metrics.broker.ServerMetrics; /** @@ -138,4 +140,26 @@ public Map> incompatibility() { public String toString() { return this.getClass().getSimpleName(); } + + @Override + public ResourceUsage evaluateClusterResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + double value = this.evaluateIngressResourceUsage(clusterBean, target.topicPartitionReplica()); + return new ResourceUsage( + Map.of(NetworkCost.NETWORK_COST_BROKER_RESOURCE_PREFIX_INGRESS + target.nodeInfo().id(), value)); + } + + @Override + public ResourceUsage evaluateReplicaResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + double value = this.evaluateIngressResourceUsage(clusterBean, target.topicPartitionReplica()); + return new ResourceUsage( + Map.of(NetworkCost.NETWORK_COST_REPLICA_RESOURCE_PREFIX_INGRESS, value)); + } + + @Override + public Collection evaluateClusterResourceCapacity( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + return this.evaluateIngressResourceCapacity(clusterInfo, clusterBean); + } } diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index f755f73a97..9ada56b787 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -18,18 +18,23 @@ import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged; +import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.astraea.common.Configuration; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionReplica; import org.astraea.common.metrics.broker.ServerMetrics; import org.astraea.common.metrics.collector.MetricSensor; /** more replica leaders -> higher cost */ -public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMoveCost { +public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMoveCost, ResourceUsageHint { private final Dispersion dispersion = Dispersion.cov(); private final Configuration config; public static final String MAX_MIGRATE_LEADER_KEY = "max.migrated.leader.number"; @@ -80,16 +85,81 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { - var moveCost = replicaLeaderChanged(before, after); + // var moveCost = replicaLeaderChanged(before, after); var maxMigratedLeader = - config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); - var overflow = - maxMigratedLeader < moveCost.values().stream().map(Math::abs).mapToLong(s -> s).sum(); - return () -> overflow; + config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); + // var overflow = + // maxMigratedLeader < moveCost.values().stream().map(Math::abs).mapToLong(s -> s).sum(); + long count = before.topicPartitions() + .stream() + .filter(tp -> { + var a = before.replicaLeader(tp).orElseThrow(); + var b = after.replicaLeader(tp).orElseThrow(); + return b.nodeInfo().id() != a.nodeInfo().id(); + }) + .count(); + return () -> count >= maxMigratedLeader; } @Override public String toString() { return this.getClass().getSimpleName(); } + + @Override + public ResourceUsage evaluateClusterResourceUsage(ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + return new ResourceUsage(Map.of( + "migrated_leaders", + clusterInfo.replicaLeader(target.topicPartition()) + .map(originLeader -> + target.isPreferredLeader() && + originLeader.nodeInfo().id() != target.nodeInfo().id() ? 1.0 : 0.0) + .orElseThrow())); + } + + @Override + public ResourceUsage evaluateReplicaResourceUsage(ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target) { + return new ResourceUsage(); + } + + @Override + public Collection evaluateClusterResourceCapacity(ClusterInfo clusterInfo, ClusterBean clusterBean) { + return List.of(new ResourceCapacity() { + @Override + public String resourceName() { + return "migrated_leaders"; + } + + @Override + public double optimalUsage() { + return config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); + } + + @Override + public double idealness(ResourceUsage usage) { + var a = usage.usage().getOrDefault(resourceName(), 0.0) / optimalUsage(); + return a; + } + + @Override + public Comparator usageIdealnessComparator() { + return Comparator.comparingDouble(this::idealness); + // return (a,b) -> { + // var aa = a.usage().getOrDefault(resourceName(), 0.0); + // var bb = b.usage().getOrDefault(resourceName(), 0.0); + + // if(aa >= optimalUsage() && bb >= optimalUsage()) return 0; + // if(aa >= optimalUsage()) return 1; + // if(bb >= optimalUsage()) return -1; + // return 0; + // }; + // return Comparator.comparingDouble(ru -> ru.usage().getOrDefault(resourceName(), 0.0)); + } + + @Override + public Predicate usageValidnessPredicate() { + return (ru) -> ru.usage().getOrDefault(resourceName(), 0.0) < optimalUsage(); + } + }); + } } diff --git a/common/src/main/java/org/astraea/common/cost/ResourceCapacity.java b/common/src/main/java/org/astraea/common/cost/ResourceCapacity.java new file mode 100644 index 0000000000..b5b5d85662 --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/ResourceCapacity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.Comparator; +import java.util.function.Predicate; + +public interface ResourceCapacity { + + String resourceName(); + + double optimalUsage(); + + double idealness(ResourceUsage usage); + + /** + * @return a {@link Comparator} that can rank the idealness of given usages in terms of specific + * resource usage. + */ + Comparator usageIdealnessComparator(); + + /** + * @return a {@link Predicate} that can test if the given resource usage is valid. + */ + Predicate usageValidnessPredicate(); +} diff --git a/common/src/main/java/org/astraea/common/cost/ResourceUsage.java b/common/src/main/java/org/astraea/common/cost/ResourceUsage.java new file mode 100644 index 0000000000..e2007fd69a --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/ResourceUsage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.HashMap; +import java.util.Map; + +/** A collection of resource usage stat. */ +public class ResourceUsage { + + private final Map usage; + + public ResourceUsage() { + this.usage = new HashMap<>(); + } + + public ResourceUsage(Map usage) { + this.usage = new HashMap<>(usage); + } + + public Map usage() { + return usage; + } + + public void mergeUsage(ResourceUsage resourceUsage) { + resourceUsage.usage.forEach( + (resource, usage) -> + this.usage.put(resource, this.usage.getOrDefault(resource, 0.0) + usage)); + } + + public void removeUsage(ResourceUsage resourceUsage) { + resourceUsage.usage.forEach( + (resource, usage) -> + this.usage.put(resource, this.usage.getOrDefault(resource, 0.0) - usage)); + } + + @Override + public String toString() { + return "ResourceUsage{" + "usage=" + usage + '}'; + } +} diff --git a/common/src/main/java/org/astraea/common/cost/ResourceUsageHint.java b/common/src/main/java/org/astraea/common/cost/ResourceUsageHint.java new file mode 100644 index 0000000000..ad9fcfc35a --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/ResourceUsageHint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.Collection; +import org.astraea.common.admin.ClusterBean; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionReplica; + +public interface ResourceUsageHint { + + ResourceUsage evaluateClusterResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target); + + ResourceUsage evaluateReplicaResourceUsage( + ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target); + + Collection evaluateClusterResourceCapacity( + ClusterInfo clusterInfo, ClusterBean clusterBean); +} diff --git a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java index ef6c408230..e8f9850ce4 100644 --- a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java +++ b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java @@ -16,10 +16,14 @@ */ package org.astraea.common.cost.utils; +import java.time.Duration; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.astraea.common.admin.ClusterBean; @@ -37,6 +41,9 @@ /** This MetricSensor attempts to reconstruct a ClusterInfo of the kafka cluster via JMX metrics. */ public class ClusterInfoSensor implements MetricSensor { + private static final Map> cache = + new ConcurrentHashMap<>(); + @Override public List fetch(MBeanClient client, ClusterBean bean) { return Stream.of( @@ -56,11 +63,21 @@ public List fetch(MBeanClient client, ClusterBean bean) * @return a {@link ClusterInfo}. */ public static ClusterInfo metricViewCluster(ClusterBean clusterBean) { + return cache + .computeIfAbsent( + clusterBean, + (beans) -> CompletableFuture.supplyAsync(() -> calculateMetricViewCluster(beans))) + .toCompletableFuture() + .join(); + } + + private static ClusterInfo calculateMetricViewCluster(ClusterBean clusterBean) { var nodes = clusterBean.brokerIds().stream() .filter(id -> id != -1) .map(id -> NodeInfo.of(id, "", -1)) .collect(Collectors.toUnmodifiableMap(NodeInfo::id, x -> x)); + long l = System.nanoTime(); var replicas = clusterBean.brokerTopics().stream() .filter(bt -> bt.broker() != -1) @@ -114,6 +131,9 @@ public static ClusterInfo metricViewCluster(ClusterBean clusterBean) { return partitions.values().stream(); }) .collect(Collectors.toUnmodifiableList()); + long t = System.nanoTime(); + System.out.println( + "ClusterInfoSensor#metricClusterView " + Duration.ofNanos(t - l).toMillis() + "ms"); var clusterId = clusterBean.all().entrySet().stream() .filter(e -> e.getKey() != -1) diff --git a/common/src/main/java/org/astraea/common/metrics/ClusterBeanSerializer.java b/common/src/main/java/org/astraea/common/metrics/ClusterBeanSerializer.java new file mode 100644 index 0000000000..fdd5eda0c1 --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/ClusterBeanSerializer.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.astraea.common.Utils; +import org.astraea.common.admin.ClusterBean; + +public class ClusterBeanSerializer { + + public static void serialize(ClusterBean clusterBean, OutputStream stream) { + Utils.packException( + () -> { + var buffer = ByteBuffer.allocate(64 << 20); // 64 MB buffer + var rawMap = clusterBean.all(); + + // put size + buffer.putInt(rawMap.size()); + rawMap.forEach( + (broker, metrics) -> { + // put id + buffer.putInt(broker); + + // put metrics size + buffer.putInt(metrics.size()); + + // put metrics + metrics.forEach( + beanObject -> { + var object = beanObject.beanObject(); + // put type name + buffer.putInt(beanObject.getClass().getName().length()); + buffer.put(beanObject.getClass().getName().getBytes()); + + // put domain name size + buffer.putInt(object.domainName().length()); + // put domain name + buffer.put(object.domainName().getBytes()); + + // put property size + buffer.putInt(object.properties().size()); + // put properties + object + .properties() + .forEach( + (k, v) -> { + buffer.putInt(k.length()); + buffer.put(k.getBytes()); + buffer.putInt(v.length()); + buffer.put(v.getBytes()); + }); + + // put attribute size + buffer.putInt(object.attributes().size()); + // put attributes + object + .attributes() + .forEach( + (k, v) -> { + // put attribute key + buffer.putInt(k.length()); + buffer.put(k.getBytes()); + + // put attribute + AttributeType type = AttributeType.from(v); + buffer.putInt(type.value); + switch (type) { + case Integer: + buffer.putInt((int) v); + break; + case Long: + buffer.putLong((long) v); + break; + case Double: + buffer.putDouble((double) v); + break; + case String: + buffer.putInt(((String) v).length()); + buffer.put(((String) v).getBytes()); + break; + case Boolean: + buffer.putInt(((Boolean) v ? 1 : 0)); + break; + default: + System.out.println( + "Doesn't support this type " + + k + + "=" + + v.getClass().getName()); + break; + } + }); + }); + }); + stream.write(buffer.array(), 0, buffer.position()); + }); + } + + public static ClusterBean deserialize(InputStream stream) { + var metrics = new HashMap>(); + var buffer = ByteBuffer.wrap(Utils.packException(stream::readAllBytes)); + + for (int i = 0, len = buffer.getInt(); i < len; i++) { + // get broker id + var brokerId = buffer.getInt(); + // get metric size + var metricSize = buffer.getInt(); + + metrics.put(brokerId, new ArrayList<>()); + + for (int j = 0; j < metricSize; j++) { + // get type name + var typeName = strFromBuffer(buffer); + // get domain name + var domain = strFromBuffer(buffer); + var properties = new HashMap(); + var attributes = new HashMap(); + // get property size + var propertySize = buffer.getInt(); + for (int k = 0; k < propertySize; k++) { + var key = strFromBuffer(buffer); + var value = strFromBuffer(buffer); + properties.put(key, value); + } + + // get attribute size + var attributeSize = buffer.getInt(); + for (int k = 0; k < attributeSize; k++) { + var key = strFromBuffer(buffer); + var attributeType = AttributeType.from(buffer.getInt()); + var value = (Object) null; + + switch (attributeType) { + case Integer: + value = buffer.getInt(); + break; + case Long: + value = buffer.getLong(); + break; + case Double: + value = buffer.getDouble(); + break; + case String: + value = strFromBuffer(buffer); + break; + case Boolean: + value = buffer.getInt() != 0; + break; + default: + value = null; + break; + } + + attributes.put(key, value); + } + + // construct metric + var metric = + (HasBeanObject) + Utils.packException( + () -> + Class.forName(typeName) + .getConstructor(BeanObject.class) + .newInstance(new BeanObject(domain, properties, attributes))); + metrics.get(brokerId).add(metric); + } + } + + return ClusterBean.of(metrics); + } + + private static String strFromBuffer(ByteBuffer buffer) { + var size = buffer.getInt(); + var bytes = new byte[size]; + for (int i = 0; i < size; i++) bytes[i] = buffer.get(); + return new String(bytes); + } + + enum AttributeType { + Unknown(-1), + Integer(1), + Long(2), + Double(3), + String(4), + Boolean(5); + + private static final List attributes = List.of(AttributeType.values()); + + public final int value; + + AttributeType(int value) { + this.value = value; + } + + static AttributeType from(Object object) { + if (object instanceof Integer) return Integer; + else if (object instanceof Long) return Long; + else if (object instanceof Double) return Double; + else if (object instanceof String) return String; + else if (object instanceof Boolean) return Boolean; + else return Unknown; + } + + static AttributeType from(int i) { + return attributes.stream().filter(x -> x.value == i).findFirst().orElseThrow(); + } + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/ClusterInfoSerializer.java b/common/src/main/java/org/astraea/common/metrics/ClusterInfoSerializer.java new file mode 100644 index 0000000000..0a37cf5b95 --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/ClusterInfoSerializer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.astraea.common.Utils; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.consumer.DDeserializer; + +public class ClusterInfoSerializer { + + public static void serialize(ClusterInfo clusterBean, OutputStream stream) { + throw new UnsupportedOperationException(); + // byte[] serialize = Serializer.CLUSTER_INFO.serialize("", List.of(), clusterBean); + // Utils.packException(() -> stream.write(serialize)); + } + + public static ClusterInfo deserialize(InputStream stream) { + byte[] bytes = Utils.packException(stream::readAllBytes); + return DDeserializer.CLUSTER_INFO.deserialize("", List.of(), bytes); + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java index f4d7816e6a..6d5fcd8d35 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java @@ -266,10 +266,18 @@ private void updateData(DelayedIdentity identity) { lock.readLock().lock(); Collection beans; try { + long s = System.nanoTime(); beans = QUERIES.stream() .flatMap(q -> clients.get(identity.id).beans(q, e -> {}).stream()) .collect(Collectors.toUnmodifiableList()); + long t = System.nanoTime(); + System.out.println( + "[Sample " + + identity.id + + "] time spent " + + Duration.ofNanos(t - s).toMillis() + + " ms"); } finally { lock.readLock().unlock(); } diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/ResourceBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/ResourceBalancerTest.java new file mode 100644 index 0000000000..4a1000e178 --- /dev/null +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/ResourceBalancerTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer.algorithms; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.cost.ResourceCapacity; +import org.astraea.common.cost.ResourceUsage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ResourceBalancerTest { + +} diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 0fc4c752ea..64b1503a45 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -16,6 +16,12 @@ */ package org.astraea.common.cost; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -43,6 +49,7 @@ import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.tweakers.ShuffleTweaker; import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.ClusterBeanSerializer; import org.astraea.common.metrics.MetricFactory; import org.astraea.common.metrics.MetricSeriesBuilder; import org.astraea.common.metrics.broker.LogMetrics; @@ -753,4 +760,25 @@ static LogMetrics.Log.Gauge logSize(TopicPartition topicPartition, long size) { var attributes = Map.of("Value", size); return new LogMetrics.Log.Gauge(new BeanObject(domainName, properties, attributes)); } + + @Test + void testSerialization() { + var testcase = new LargeTestCase(10, 10, 0); + var beanFile = Path.of("/tmp/bean-file.bin"); + try (OutputStream stream = new FileOutputStream(beanFile.toString())) { + System.out.println("Do serialize"); + ClusterBeanSerializer.serialize(testcase.clusterBean, stream); + System.out.println("Do serialize done"); + } catch (IOException e) { + throw new RuntimeException(e); + } + + try (InputStream stream = new FileInputStream(beanFile.toString())) { + System.out.println("Do deserialize"); + ClusterBean deserialize = ClusterBeanSerializer.deserialize(stream); + System.out.println("Do deserialize done"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index b16629d59d..f2dd03136a 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -18,8 +18,13 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; + +import org.astraea.common.Configuration; +import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.metrics.BeanObject; @@ -31,6 +36,29 @@ public class ReplicaLeaderCostTest { private final Dispersion dispersion = Dispersion.cov(); + @Test + void testLeaderCount() { + var baseCluster = ClusterInfoBuilder.builder() + .addNode(Set.of(1, 2)) + .addFolders(Map.of(1, Set.of("/folder"))) + .addFolders(Map.of(2, Set.of("/folder"))) + .build(); + var sourceCluster = ClusterInfoBuilder.builder(baseCluster) + .addTopic("topic1", 3, (short) 1, r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .addTopic("topic2", 3, (short) 1, r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .build(); + var targetCluster = ClusterInfoBuilder.builder(baseCluster) + .addTopic("topic1", 3, (short) 1, r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .addTopic("topic2", 3, (short) 1, r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .build(); + + MoveCost moveCost = new ReplicaLeaderCost(Configuration.of(Map.of( + ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "1" + ))).moveCost(sourceCluster, targetCluster, ClusterBean.EMPTY); + + System.out.println(moveCost.overflow()); + } + @Test void testNoMetrics() { var replicas =