Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BALANCER] Demonstration of ResourceBalancer & ResourceUsageHint #1674

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
b210fe9
Implementation of `BackboneImbalanceScenario`
garyparrot Feb 2, 2023
b2449f3
Revise
garyparrot Feb 5, 2023
e901380
fix naming
garyparrot Feb 5, 2023
fcc9bb5
less threads
garyparrot Feb 6, 2023
52c53ff
remove `key.distribution` from default perf arg
garyparrot Feb 7, 2023
8128822
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Feb 7, 2023
9369582
Revise
garyparrot Feb 8, 2023
ff69e1b
allocate one dedicate perf client for backbone topic
garyparrot Feb 8, 2023
96b1ed4
Revise
garyparrot Feb 9, 2023
c6b7f80
fix test
garyparrot Feb 9, 2023
59b66d0
Change scenario config
garyparrot Feb 11, 2023
43d2698
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Feb 21, 2023
6bbab45
fix merge
garyparrot Feb 22, 2023
dd36d6b
fix key conflict
garyparrot Feb 22, 2023
062daac
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 8, 2023
c236968
Support newest performance config
garyparrot Mar 8, 2023
af3f54e
Fix fanout series config
garyparrot Mar 9, 2023
49337e9
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 10, 2023
10851a2
Fixed bandwidth
garyparrot Mar 13, 2023
b17d339
Update
garyparrot Mar 25, 2023
e4feeca
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 27, 2023
ec22f17
Update
garyparrot Mar 27, 2023
6c66519
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 28, 2023
3af83e4
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 31, 2023
8a609b6
Merge branch 'main' into backbone-imbalance-scenario
garyparrot Mar 31, 2023
eb45939
Remove unnecessary output fields
garyparrot Apr 1, 2023
4db9495
Merge branch 'main' into checkpoint
garyparrot Apr 4, 2023
7448535
Update
garyparrot Apr 5, 2023
9f303a3
Merge branch 'main' into checkpoint
garyparrot Apr 6, 2023
bfec6eb
Update
garyparrot Apr 6, 2023
a5d64d8
Cache
garyparrot Apr 7, 2023
21057b3
Enable to tune the estimation method for `NetworkCost`
garyparrot Apr 7, 2023
a929437
Rename `EstimationMethod` name
garyparrot Apr 7, 2023
1ca4e76
Move `EstimationMethod` to upper layer
garyparrot Apr 7, 2023
94c81c5
Fix test
garyparrot Apr 7, 2023
4b386b2
Merge branch 'network-cost-tunable-estimation-method' into checkpoint
garyparrot Apr 7, 2023
7c4abf8
Amend
garyparrot Apr 8, 2023
c7fdcfb
Merge branch 'main' into checkpoint
garyparrot Apr 9, 2023
3aba486
Temp
garyparrot Apr 10, 2023
258cd71
Update
garyparrot Apr 11, 2023
847f75f
Update
garyparrot Apr 12, 2023
0e6105a
spotless
garyparrot Apr 21, 2023
05d6b40
Prepare backport
garyparrot Apr 21, 2023
468ee35
Merge branch 'main' into checkpoint
garyparrot Apr 21, 2023
202a7e3
Implementation of basic resource class
garyparrot Apr 19, 2023
873c4de
Implement `ResourceUsageHint` for `NetworkCost`
garyparrot Apr 20, 2023
dd08b1b
Enable to list composite cost from `HasClusterCost#of`
garyparrot Apr 20, 2023
cb3d2d6
Implementation of `ResourceBalancer`
garyparrot Apr 21, 2023
a317b91
Merge branch 'checkpoint' into new-balancing-framework
garyparrot Apr 21, 2023
7e7a957
Can find local minimum now
garyparrot Apr 22, 2023
7d66543
Preserved
garyparrot Apr 25, 2023
f63ec66
Update
garyparrot Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
526 changes: 526 additions & 0 deletions app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java

Large diffs are not rendered by default.

50 changes: 4 additions & 46 deletions app/src/main/java/org/astraea/app/web/Scenario.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.astraea.app.web;

import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;

/** The subclass of this class should contain the logic to fulfill a scenario. */
public interface Scenario {
public interface Scenario<Result> {

static Builder builder() {
return new Builder();
Expand Down Expand Up @@ -56,54 +56,12 @@ public Builder binomialProbability(double binomialProbability) {
return this;
}

public Scenario build() {
public Scenario<SkewedPartitionScenario.Result> build() {
return new SkewedPartitionScenario(
topicName, numberOfPartitions, numberOfReplicas, binomialProbability);
}
}

/** Apply this scenario to the Kafka cluster */
CompletionStage<Result> apply(Admin admin);

class Result {

private final String topicName;
private final int numberOfPartitions;
private final short numberOfReplicas;
private final Map<Integer, Long> leaderSum;
private final Map<Integer, Long> logSum;

public Result(
String topicName,
int numberOfPartitions,
short numberOfReplicas,
Map<Integer, Long> leaderSum,
Map<Integer, Long> logSum) {
this.topicName = topicName;
this.numberOfPartitions = numberOfPartitions;
this.numberOfReplicas = numberOfReplicas;
this.leaderSum = leaderSum;
this.logSum = logSum;
}

public String topicName() {
return topicName;
}

public int numberOfPartitions() {
return numberOfPartitions;
}

public short numberOfReplicas() {
return numberOfReplicas;
}

public Map<Integer, Long> leaderSum() {
return leaderSum;
}

public Map<Integer, Long> logSum() {
return logSum;
}
}
CompletionStage<Result> apply(Admin admin, Configuration scenarioConfig);
}
47 changes: 45 additions & 2 deletions app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.distribution.IntegerDistribution;
import org.apache.commons.math3.util.Pair;
import org.astraea.common.Configuration;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.admin.TopicPartitionReplica;

public class SkewedPartitionScenario implements Scenario {
public class SkewedPartitionScenario implements Scenario<SkewedPartitionScenario.Result> {

final String topicName;
final int partitions;
Expand All @@ -50,7 +51,7 @@ public SkewedPartitionScenario(
}

@Override
public CompletionStage<Result> apply(Admin admin) {
public CompletionStage<Result> apply(Admin admin, Configuration scenarioConfig) {
return admin
.creator()
.topic(topicName)
Expand Down Expand Up @@ -138,4 +139,46 @@ public static List<Integer> sampledReplicaList(
}
return result;
}

public static class Result {

private final String topicName;
private final int numberOfPartitions;
private final short numberOfReplicas;
private final Map<Integer, Long> leaderSum;
private final Map<Integer, Long> logSum;

public Result(
String topicName,
int numberOfPartitions,
short numberOfReplicas,
Map<Integer, Long> leaderSum,
Map<Integer, Long> logSum) {
this.topicName = topicName;
this.numberOfPartitions = numberOfPartitions;
this.numberOfReplicas = numberOfReplicas;
this.leaderSum = leaderSum;
this.logSum = logSum;
}

public String topicName() {
return topicName;
}

public int numberOfPartitions() {
return numberOfPartitions;
}

public short numberOfReplicas() {
return numberOfReplicas;
}

public Map<Integer, Long> leaderSum() {
return leaderSum;
}

public Map<Integer, Long> logSum() {
return logSum;
}
}
}
3 changes: 2 additions & 1 deletion app/src/main/java/org/astraea/app/web/TopicHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.FutureUtils;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
Expand Down Expand Up @@ -175,7 +176,7 @@ public CompletionStage<Topics> post(Channel channel) {
.numberOfReplicas(numberOfReplicas)
.binomialProbability(topic.probability.get())
.build()
.apply(admin)
.apply(admin, Configuration.EMPTY)
.thenApply(ignored -> null)
.toCompletableFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,27 @@
package org.astraea.balancer.bench;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.balancer.AlgorithmConfig;
import org.astraea.common.balancer.Balancer;
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.cost.CompositeClusterCost;
import org.astraea.common.cost.HasClusterCost;
import org.astraea.common.cost.HasMoveCost;
import org.astraea.common.cost.MoveCost;
import org.astraea.common.cost.ResourceUsageHint;
import org.astraea.common.metrics.collector.MetricSensor;

class CostProfilingImpl implements BalancerBenchmark.CostProfilingBuilder {
Expand Down Expand Up @@ -85,7 +92,22 @@ public CompletableFuture<BalancerBenchmark.CostProfilingResult> start() {
.clusterBean(clusterBean)
.timeout(timeout)
.clusterCost(
new HasClusterCost() {
new CompositeClusterCost() {
@Override
public Collection<? extends HasClusterCost> functions() {
var queue = new LinkedList<HasClusterCost>(List.of(costFunction));
var functions = new ArrayList<HasClusterCost>();

while (!queue.isEmpty()) {
var next = queue.pop();
if (next instanceof CompositeClusterCost)
functions.addAll(((CompositeClusterCost) next).functions());
else functions.add(next);
}

return List.copyOf(functions);
}

@Override
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
final var start = System.nanoTime();
Expand Down Expand Up @@ -124,6 +146,11 @@ public Optional<MetricSensor> metricSensor() {
return moveCostFunction.metricSensor();
}

@Override
public Set<? extends ResourceUsageHint> resourceUsageHint() {
return moveCostFunction.resourceUsageHint();
}

@Override
public String toString() {
return moveCostFunction.toString();
Expand Down
106 changes: 106 additions & 0 deletions app/src/test/java/org/astraea/app/BackboneImbalanceApplyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.web.BackboneImbalanceScenario;
import org.astraea.common.Configuration;
import org.astraea.common.admin.Admin;
import org.astraea.common.json.JsonConverter;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class BackboneImbalanceApplyTest {

public static final String realCluster =
"192.168.103.177:25655,192.168.103.178:25655,192.168.103.179:25655,192.168.103.180:25655,192.168.103.181:25655,192.168.103.182:25655";
public static final List<String> clients =
List.of(
"192.168.103.184",
"192.168.103.142",
"192.168.103.183",
"192.168.103.141",
"192.168.103.143",
"192.168.103.144",
"192.168.103.145");

@Test
@Disabled
void testBackbone() {
try (Admin admin = Admin.of(realCluster)) {
var scenario = new BackboneImbalanceScenario();
var config =
Configuration.of(
Map.ofEntries(
Map.entry(BackboneImbalanceScenario.CONFIG_PERF_ZIPFIAN_EXPONENT, "1.6"),
Map.entry(BackboneImbalanceScenario.CONFIG_PERF_KEY_TABLE_SEED, "0"),
Map.entry(
BackboneImbalanceScenario.CONFIG_PERF_CLIENT_COUNT,
Integer.toString(clients.size()))));
var result = scenario.apply(admin, config).toCompletableFuture().join();
// print summary
var converter = JsonConverter.defaultConverter();
System.out.println(converter.toJson(result));
// save result to json format
var ansibleInventory = converter.toJson(toAnsibleInventory(result));
var ansibleInventoryFile =
"/home/garyparrot/Programming/ansible/backbone-imbalance-scenario-inventory.json";
try (var stream = Files.newBufferedWriter(Path.of(ansibleInventoryFile))) {
stream.write(ansibleInventory);
} catch (IOException e) {
e.printStackTrace();
}
}
}

Map<?, ?> toAnsibleInventory(BackboneImbalanceScenario.Result result) {
var hosts =
IntStream.range(0, clients.size())
.boxed()
.collect(
Collectors.toUnmodifiableMap(
clients::get,
index -> {
var clientHostname = clients.get(index);
var clientPerf = result.perfCommands().get(index);
return Map.ofEntries(
Map.entry("ansible_host", clientHostname),
Map.entry("ansible_user", "kafka"),
Map.entry("expected_produce_rate", clientPerf.get("produce_rate")),
Map.entry("expected_consume_rate", clientPerf.get("consume_rate")),
Map.entry("key_table_seed", clientPerf.get("key_table_seed")),
Map.entry("key_distribution", clientPerf.get("key_distribution")),
Map.entry(
"key_distribution_config", clientPerf.get("key_distribution_config")),
Map.entry("throttle", clientPerf.get("throttle")),
Map.entry("throttle_enable", !clientPerf.get("backbone").equals("true")),
Map.entry("throughput", clientPerf.get("throughput")),
Map.entry("throughput_enable", clientPerf.get("backbone").equals("true")),
Map.entry("no_producer", clientPerf.get("no_producer")),
Map.entry("no_consumer", clientPerf.get("no_consumer")),
Map.entry("topics", clientPerf.get("topics")));
}));

return Map.of("backbone_imbalance_hosts", Map.of("hosts", hosts));
}
}
Loading