From 7fa1a22a554c4f9eecb11a1da9e69aa506a08618 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 2 May 2023 11:39:51 +0800 Subject: [PATCH] [BALANCER] Implement balancer config `balancer.allowed.brokers.regex` (#1656) --- .../common/balancer/BalancerConfigs.java | 7 + .../balancer/algorithms/GreedyBalancer.java | 10 ++ .../algorithms/SingleStepBalancer.java | 10 ++ .../balancer/tweakers/ShuffleTweaker.java | 159 ++++++++++++------ .../balancer/BalancerConfigTestSuite.java | 152 +++++++++++++---- .../algorithms/GreedyBalancerTest.java | 2 +- .../algorithms/SingleStepBalancerTest.java | 9 +- .../balancer/tweakers/ShuffleTweakerTest.java | 6 +- .../astraea/common/cost/NetworkCostTest.java | 6 +- docs/web_server/web_api_balancer_chinese.md | 7 +- 10 files changed, 279 insertions(+), 89 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java index e764e8c137..01ebcd26ff 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java @@ -33,4 +33,11 @@ private BalancerConfigs() {} * position. */ public static final String BALANCER_ALLOWED_TOPICS_REGEX = "balancer.allowed.topics.regex"; + + /** + * A regular expression indicates which brokers are eligible for moving loading. When specified, a + * broker with an id that doesn't match this expression cannot accept a partition from the other + * broker or move its partition to other brokers. + */ + public static final String BALANCER_ALLOWED_BROKERS_REGEX = "balancer.allowed.brokers.regex"; } diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index dfded52b0c..7875b0fcb2 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.DoubleAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; import org.astraea.common.Utils; @@ -139,6 +140,14 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); + final var allowedBrokers = + config + .balancerConfig() + .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) + .map(Pattern::asMatchPredicate) + .>map( + predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) + .orElse((ignore) -> true); final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); @@ -146,6 +155,7 @@ public Optional offer(AlgorithmConfig config) { ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) + .allowedBrokers(allowedBrokers) .build(); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index b17f259489..531ae536ac 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Predicate; import java.util.regex.Pattern; import org.astraea.common.Utils; import org.astraea.common.balancer.AlgorithmConfig; @@ -67,6 +68,14 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); + final var allowedBrokers = + config + .balancerConfig() + .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) + .map(Pattern::asMatchPredicate) + .>map( + predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) + .orElse((ignore) -> true); final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); @@ -74,6 +83,7 @@ public Optional offer(AlgorithmConfig config) { ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) + .allowedBrokers(allowedBrokers) .build(); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index c32cb2cc4f..7e48e5424d 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -27,8 +28,8 @@ import java.util.stream.Stream; import org.astraea.common.EnumInfo; import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionReplica; /** * The {@link ShuffleTweaker} proposes a new log placement based on the current log placement, but @@ -47,10 +48,15 @@ public class ShuffleTweaker { private final Supplier numberOfShuffle; private final Predicate allowedTopics; + private final Predicate allowedBrokers; - public ShuffleTweaker(Supplier numberOfShuffle, Predicate allowedTopics) { + public ShuffleTweaker( + Supplier numberOfShuffle, + Predicate allowedTopics, + Predicate allowedBrokers) { this.numberOfShuffle = numberOfShuffle; this.allowedTopics = allowedTopics; + this.allowedBrokers = allowedBrokers; } public static Builder builder() { @@ -61,74 +67,120 @@ public Stream generate(ClusterInfo baseAllocation) { // There is no broker if (baseAllocation.nodes().isEmpty()) return Stream.of(); - // No non-ignored topic to working on. - if (baseAllocation.topicPartitions().isEmpty()) return Stream.of(); + // No replica to working on. + if (baseAllocation.replicas().size() == 0) return Stream.of(); - // Only one broker & one folder exists, unable to do any log migration + // Only one broker & one folder exists, unable to do any meaningful log migration if (baseAllocation.nodes().size() == 1 && baseAllocation.brokerFolders().values().stream().findFirst().orElseThrow().size() == 1) return Stream.of(); + final var legalReplicas = + baseAllocation.topicPartitions().stream() + .filter(tp -> this.allowedTopics.test(tp.topic())) + .filter(tp -> eligiblePartition(baseAllocation.replicas(tp))) + .flatMap(baseAllocation::replicaStream) + .filter(r -> this.allowedBrokers.test(r.nodeInfo().id())) + .collect(Collectors.toUnmodifiableList()); + return Stream.generate( () -> { final var shuffleCount = numberOfShuffle.get(); - final var partitionOrder = - baseAllocation.topicPartitions().stream() - .filter(tp -> this.allowedTopics.test(tp.topic())) - .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) + final var replicaOrder = + legalReplicas.stream() + .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .collect(Collectors.toUnmodifiableList()); + final var forbiddenReplica = new HashSet(); final var finalCluster = ClusterInfo.builder(baseAllocation); - for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) { - final var tp = partitionOrder.get(i); - if (!eligiblePartition(baseAllocation.replicas(tp))) continue; - switch (Operation.random()) { - case LEADERSHIP_CHANGE: - { - // change leader/follower identity - var replica = + for (int i = 0, shuffled = 0; i < replicaOrder.size() && shuffled < shuffleCount; i++) { + final var sourceReplica = replicaOrder.get(i); + + // the leadership change operation will not only affect source target but also the + // target replica. To prevent mutating one replica twice in the tweaking loop. We have + // to mandatory exclude the target replica since it has been touched in this tweak. + // Tweaking a replica twice is meaningless and incompatible with the design of + // ClusterInfoBuilder. + if (forbiddenReplica.contains(sourceReplica.topicPartitionReplica())) continue; + + Supplier leadershipChange = + () -> { + var targetReplica = baseAllocation - .replicaStream(tp) - .filter(Replica::isFollower) + .replicaStream(sourceReplica.topicPartition()) + // leader pair follower, follower pair leader + .filter(r -> r.isFollower() != sourceReplica.isFollower()) + // this follower is located at allowed broker + .filter(r -> this.allowedBrokers.test(r.nodeInfo().id())) + // not forbidden + .filter(r -> !forbiddenReplica.contains(r.topicPartitionReplica())) .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) .min(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); - if (replica.isPresent()) { - finalCluster.setPreferredLeader(replica.get().topicPartitionReplica()); - shuffled++; + + // allowed broker filter might cause no legal exchange target + if (targetReplica.isPresent()) { + var theFollower = + sourceReplica.isFollower() ? sourceReplica : targetReplica.orElseThrow(); + finalCluster.setPreferredLeader(theFollower.topicPartitionReplica()); + + forbiddenReplica.add(sourceReplica.topicPartitionReplica()); + forbiddenReplica.add(targetReplica.orElseThrow().topicPartitionReplica()); + + return true; + } else { + return false; } - break; - } - case REPLICA_LIST_CHANGE: - { - // change replica list - var replicaList = baseAllocation.replicas(tp); - var currentIds = - replicaList.stream() - .map(Replica::nodeInfo) - .map(NodeInfo::id) - .collect(Collectors.toUnmodifiableSet()); - var broker = + }; + Supplier replicaListChange = + () -> { + var replicaList = baseAllocation.replicas(sourceReplica.topicPartition()); + var targetBroker = baseAllocation.brokers().stream() - .filter(b -> !currentIds.contains(b.id())) + // the candidate should not be part of the replica list + .filter( + b -> replicaList.stream().noneMatch(r -> r.nodeInfo().id() == b.id())) + // should be an allowed broker + .filter(b -> this.allowedBrokers.test(b.id())) .map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt())) .min(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); - if (broker.isPresent()) { - var replica = randomElement(replicaList); + + if (targetBroker.isPresent()) { finalCluster.reassignReplica( - replica.topicPartitionReplica(), - broker.get().id(), - randomElement(baseAllocation.brokerFolders().get(broker.get().id()))); - shuffled++; + sourceReplica.topicPartitionReplica(), + targetBroker.orElseThrow().id(), + randomElement( + baseAllocation.brokerFolders().get(targetBroker.orElseThrow().id()))); + + forbiddenReplica.add(sourceReplica.topicPartitionReplica()); + return true; + } else { + return false; } - break; - } - default: - throw new RuntimeException("Unexpected Condition"); - } + }; + + final var isFinished = + Operation.randomStream() + .sequential() + .map( + operation -> { + switch (operation) { + case LEADERSHIP_CHANGE: + return leadershipChange.get(); + case REPLICA_LIST_CHANGE: + return replicaListChange.get(); + default: + throw new RuntimeException("Unexpected Condition: " + operation); + } + }) + .filter(finished -> finished) + .findFirst() + .orElse(false); + + shuffled += isFinished ? 1 : 0; } return finalCluster.build(); @@ -162,6 +214,13 @@ public static Operation random() { return OPERATIONS.get(ThreadLocalRandom.current().nextInt(OPERATIONS.size())); } + public static Stream randomStream() { + return OPERATIONS.stream() + .map(x -> Map.entry(x, ThreadLocalRandom.current().nextInt())) + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + } + public static Operation ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(Operation.class, alias); } @@ -181,6 +240,7 @@ public static class Builder { private Supplier numberOfShuffle = () -> ThreadLocalRandom.current().nextInt(1, 5); private Predicate allowedTopics = (name) -> true; + private Predicate allowedBrokers = (name) -> true; private Builder() {} @@ -194,8 +254,13 @@ public Builder allowedTopics(Predicate allowedTopics) { return this; } + public Builder allowedBrokers(Predicate allowedBrokers) { + this.allowedBrokers = allowedBrokers; + return this; + } + public ShuffleTweaker build() { - return new ShuffleTweaker(numberOfShuffle, allowedTopics); + return new ShuffleTweaker(numberOfShuffle, allowedTopics, allowedBrokers); } } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java index 0b0e30419a..693c2be146 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java @@ -36,42 +36,17 @@ public abstract class BalancerConfigTestSuite { private final Class balancerClass; + private final Configuration customConfig; - public BalancerConfigTestSuite(Class balancerClass) { + public BalancerConfigTestSuite(Class balancerClass, Configuration custom) { this.balancerClass = balancerClass; + this.customConfig = custom; } @Test - public void testBalancerAllowedTopicRegex() { + public void testBalancerAllowedTopicsRegex() { final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); - final var cluster = cluster(10, 10, 10, (short) 5); - final var AssertionsHelper = - new Object() { - void assertSomeMovement(ClusterInfo source, ClusterInfo target, String name) { - Assertions.assertNotEquals( - Set.of(), - ClusterInfo.findNonFulfilledAllocation(source, target), - name + ": Should have movements"); - } - - void assertOnlyAllowedMovement( - ClusterInfo source, ClusterInfo target, Pattern allowed, String name) { - assertSomeMovement(source, target, name); - Assertions.assertEquals( - Set.of(), - ClusterInfo.findNonFulfilledAllocation(source, target).stream() - .filter(Predicate.not((tp) -> allowed.asMatchPredicate().test(tp.topic()))) - .collect(Collectors.toUnmodifiableSet()), - name + ": Only allowed topics been altered."); - } - - void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { - Assertions.assertEquals( - Set.of(), - ClusterInfo.findNonFulfilledAllocation(source, target), - name + ": Should have no movement"); - } - }; + final var cluster = cluster(20, 10, 10, (short) 5); { var testName = "[test no limit]"; @@ -81,8 +56,9 @@ void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { .clusterInfo(cluster) .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) // This argument is not applied - // .config(BalancerCapabilities.BALANCER_ALLOWED_TOPIC_REGEX, regexRaw) + // .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, regexRaw) .build()); AssertionsHelper.assertSomeMovement(cluster, plan.orElseThrow().proposal(), testName); } @@ -101,9 +77,10 @@ void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { .clusterInfo(cluster) .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, regexRaw) .build()); - AssertionsHelper.assertOnlyAllowedMovement( + AssertionsHelper.assertOnlyAllowedTopicMovement( cluster, plan.orElseThrow().proposal(), regex, testName); } @@ -121,6 +98,7 @@ void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { .clusterInfo(cluster) .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, regexRaw) .build()); AssertionsHelper.assertNoMovement(cluster, plan.orElseThrow().proposal(), testName); @@ -135,12 +113,69 @@ void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { .clusterInfo(cluster) .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, regexRaw) .build()); AssertionsHelper.assertNoMovement(cluster, plan.orElseThrow().proposal(), testName); } } + @Test + public void testBalancerAllowedBrokersRegex() { + final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); + final var cluster = cluster(10, 10, 10, (short) 5); + + { + var testName = "[test all match]"; + var plan = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "[0-9]*") + .build()); + AssertionsHelper.assertSomeMovement(cluster, plan.orElseThrow().proposal(), testName); + } + + { + var testName = "[test no match]"; + var plan = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "NoMatch") + .build()); + // since nothing can be moved. It is ok to return no plan. + if (plan.isPresent()) { + // But if we have a plan here. It must contain no movement. + AssertionsHelper.assertNoMovement(cluster, plan.orElseThrow().proposal(), testName); + } + } + + { + var testName = "[test some match]"; + var allowedBrokers = IntStream.range(1, 6).boxed().collect(Collectors.toUnmodifiableSet()); + var rawRegex = + allowedBrokers.stream().map(Object::toString).collect(Collectors.joining("|", "(", ")")); + var plan = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, rawRegex) + .build()); + AssertionsHelper.assertOnlyAllowedBrokerMovement( + cluster, plan.orElseThrow().proposal(), allowedBrokers::contains, testName); + } + } + private static ClusterInfo cluster(int nodes, int topics, int partitions, short replicas) { var builder = ClusterInfo.builder() @@ -171,4 +206,57 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) } }; } + + private static class AssertionsHelper { + static void assertSomeMovement(ClusterInfo source, ClusterInfo target, String name) { + Assertions.assertNotEquals( + Set.of(), + ClusterInfo.findNonFulfilledAllocation(source, target), + name + ": Should have movements"); + } + + static void assertNoMovement(ClusterInfo source, ClusterInfo target, String name) { + Assertions.assertEquals( + Set.of(), + ClusterInfo.findNonFulfilledAllocation(source, target), + name + ": Should have no movement"); + } + + static void assertOnlyAllowedTopicMovement( + ClusterInfo source, ClusterInfo target, Pattern allowedTopic, String name) { + assertSomeMovement(source, target, name); + Assertions.assertEquals( + Set.of(), + ClusterInfo.findNonFulfilledAllocation(source, target).stream() + .filter(Predicate.not((tp) -> allowedTopic.asMatchPredicate().test(tp.topic()))) + .collect(Collectors.toUnmodifiableSet()), + name + ": Only allowed topics been altered."); + } + + static void assertOnlyAllowedBrokerMovement( + ClusterInfo source, ClusterInfo target, Predicate allowedBroker, String name) { + assertSomeMovement(source, target, name); + source + .replicaStream() + // for those replicas that are not allowed to move + .filter(r -> !allowedBroker.test(r.nodeInfo().id())) + // they should exist as-is in the target allocation + .forEach( + fixedReplica -> { + target + .replicaStream() + .filter(targetReplica -> targetReplica.equals(fixedReplica)) + .findFirst() + .ifPresentOrElse( + (r) -> {}, + () -> { + Assertions.fail( + name + + ": Expect replica " + + fixedReplica + + " not moved, but it appears to disappear from the target allocation"); + }); + }); + } + } } diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java index 90affa56c9..3f9b617391 100644 --- a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java @@ -35,7 +35,7 @@ class GreedyBalancerTest extends BalancerConfigTestSuite { public GreedyBalancerTest() { - super(GreedyBalancer.class); + super(GreedyBalancer.class, Configuration.EMPTY); } @Test diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java index 966beccdae..74e70acd87 100644 --- a/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java @@ -16,6 +16,8 @@ */ package org.astraea.common.balancer.algorithms; +import java.util.Map; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.balancer.BalancerConfigTestSuite; import org.junit.jupiter.api.Assertions; @@ -24,7 +26,12 @@ class SingleStepBalancerTest extends BalancerConfigTestSuite { public SingleStepBalancerTest() { - super(SingleStepBalancer.class); + super( + SingleStepBalancer.class, + Configuration.of( + Map.of( + "shuffle.tweaker.min.step", "1000", + "shuffle.tweaker.max.step", "2000"))); } @Test diff --git a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java index 8f80e3aa53..baef67f81a 100644 --- a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java @@ -83,7 +83,7 @@ void testNoNodes() { final var shuffleTweaker = ShuffleTweaker.builder().numberOfShuffle(() -> 3).build(); Assertions.assertEquals( - 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); + 0, (int) shuffleTweaker.generate(fakeCluster).limit(100).count(), "No possible tweak"); } @Test @@ -92,7 +92,7 @@ void testOneNode() { final var shuffleTweaker = ShuffleTweaker.builder().numberOfShuffle(() -> 3).build(); Assertions.assertEquals( - 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); + 0, (int) shuffleTweaker.generate(fakeCluster).limit(100).count(), "No possible tweak"); } @Test @@ -101,7 +101,7 @@ void testNoTopic() { final var shuffleTweaker = ShuffleTweaker.builder().numberOfShuffle(() -> 3).build(); Assertions.assertEquals( - 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); + 0, (int) shuffleTweaker.generate(fakeCluster).limit(100).count(), "No possible tweak"); } @Test diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index f963e0db49..cba78a7ad9 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -336,9 +336,11 @@ void testExpectedImprovement(int seed) { var clusterInfo = testCase.clusterInfo(); var clusterBean = testCase.clusterBean(); var smallShuffle = - new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 6), (x) -> true); + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(1, 6), (x) -> true, (x) -> true); var largeShuffle = - new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31), (x) -> true); + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(1, 31), (x) -> true, (x) -> true); var costFunction = HasClusterCost.of( Map.of( diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index dc13d978c8..332f2c7215 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -26,9 +26,10 @@ POST /balancer `balancerConfig` 是 balancer 實作開放給使用者設定的內部演算法行為參數,我們有針對常用情境的 balancer config 規範出一些固定的名稱, 參數是否支援要看 Balancer 實作本身。當指定的參數不被 balancer 實作支援時,該實作可能會丟出錯誤提示使用者。 -| config key | config value | -|-------------------------------|----------------------------------------------------------------| -| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | +| config key | config value | +|--------------------------------|-------------------------------------------------------------------------------------------------------------------| +| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | +| balancer.allowed.brokers.regex | 一個正則表達式,表達允許進行搬移操作的 broker 編號,當沒有指定的時候,代表沒有任何限制,所有 broker 都可以做負載更動。當有指定時,只有那些 broker 編號有匹配此正則表達式的 broker 能進行負載的更動 | costConfig: