Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 7, 2024
1 parent 3a217db commit 5d791b4
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
3 changes: 3 additions & 0 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.performance.Performance;
import org.astraea.app.performance.Prepare;
import org.astraea.app.publisher.MetricPublisher;
import org.astraea.app.version.Version;
import org.astraea.app.web.WebService;
Expand All @@ -33,6 +34,8 @@ public class App {
Map.of(
"performance",
Performance.class,
"prepare",
Prepare.class,
"automation",
Automation.class,
"web",
Expand Down
120 changes: 120 additions & 0 deletions app/src/main/java/org/astraea/app/performance/Prepare.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.performance;

import com.beust.jcommander.Parameter;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.argument.StringListField;
import org.astraea.common.DataSize;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartitionPath;

public class Prepare {

public static void main(String[] args) {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) {
try (var admin = Admin.of(param.bootstrapServers())) {
if (param.topics == null || param.topics.isEmpty()) {
var cluster =
admin
.topicNames(true)
.thenComposeAsync(admin::clusterInfo)
.toCompletableFuture()
.join();
var brokerSize =
cluster.brokers().stream()
.collect(
Collectors.toMap(
Broker::id,
b ->
b.topicPartitionPaths().stream()
.mapToLong(TopicPartitionPath::size)
.sum()));
System.out.println("id,role,value");
brokerSize.forEach((id, size) -> System.out.println(id + ",broker," + size));

var partitionSize =
cluster.replicas().stream()
.filter(Replica::isLeader)
.collect(Collectors.toMap(Replica::topicPartition, Replica::size));
partitionSize.forEach((tp, size) -> System.out.println(tp + ",partition," + size));

var brokerAvg =
brokerSize.values().stream().mapToLong(i -> i).filter(i -> i > 0).average().orElse(0);
var avedevBroker =
brokerSize.values().stream()
.mapToDouble(i -> Math.abs(i - brokerAvg))
.average()
.orElse(0.0D);
System.out.println("avedev,broker," + DataSize.Byte.of((long) avedevBroker));

var partitionAvg =
partitionSize.values().stream()
.mapToLong(i -> i)
.filter(i -> i > 0)
.average()
.orElse(0);
var avedevPartition =
partitionSize.values().stream()
.mapToDouble(i -> Math.abs(i - partitionAvg))
.average()
.orElse(0.0D);
System.out.println("avedev,partition," + DataSize.Byte.of((long) avedevPartition));
return;
}
var brokerIds =
admin.brokers().toCompletableFuture().join().stream().map(Broker::id).toList();
System.out.println("brokerIds: " + brokerIds);
Function<Integer, List<Integer>> ids =
index -> {
if (index < brokerIds.size()) return List.of(brokerIds.get(index));
return List.of(brokerIds.get((int) (Math.random() * brokerIds.size())));
};
param.topics.forEach(
topic ->
admin
.creator()
.topic(topic)
.replicasAssignments(
IntStream.range(0, brokerIds.size() + 2)
.boxed()
.collect(Collectors.toMap(i -> i, ids)))
.run()
.toCompletableFuture()
.join());
System.out.println("succeed to create topics: " + param.topics);
}
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = false)
List<String> topics;
}
}

0 comments on commit 5d791b4

Please sign in to comment.