From 157f3db7a14e561c5f0f88ec80229b67a901e4fc Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 8 Nov 2024 00:28:46 +0800 Subject: [PATCH] tmp --- .../org/astraea/app/performance/Prepare.java | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 app/src/main/java/org/astraea/app/performance/Prepare.java diff --git a/app/src/main/java/org/astraea/app/performance/Prepare.java b/app/src/main/java/org/astraea/app/performance/Prepare.java new file mode 100644 index 0000000000..578ae74739 --- /dev/null +++ b/app/src/main/java/org/astraea/app/performance/Prepare.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.performance; + +import com.beust.jcommander.Parameter; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.argument.StringListField; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionPath; + +public class Prepare { + + public static void main(String[] args) { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) { + try (var admin = Admin.of(param.bootstrapServers())) { + if (param.topics == null || param.topics.isEmpty()) { + var cluster = + admin + .topicNames(true) + .thenComposeAsync(admin::clusterInfo) + .toCompletableFuture() + .join(); + var brokerSize = + cluster.brokers().stream() + .collect( + Collectors.toMap( + Broker::id, + b -> + b.topicPartitionPaths().stream() + .mapToLong(TopicPartitionPath::size) + .sum())); + System.out.println("id,role,value"); + brokerSize.forEach((id, size) -> System.out.println(id + ",broker," + size)); + + var partitionSize = + cluster.replicas().stream() + .filter(Replica::isLeader) + .collect(Collectors.toMap(Replica::topicPartition, Replica::size)); + partitionSize.forEach((tp, size) -> System.out.println(tp + ",partition," + size)); + + var brokerAvg = brokerSize.values().stream().mapToLong(i -> i).average().orElse(0); + var avedevBroker = + brokerSize.values().stream() + .mapToDouble(i -> Math.abs(i - brokerAvg)) + .average() + .orElse(0.0D); + System.out.println("avedev,broker," + avedevBroker); + + var partitionAvg = partitionSize.values().stream().mapToLong(i -> i).average().orElse(0); + var avedevPartition = + partitionSize.values().stream() + .mapToDouble(i -> Math.abs(i - partitionAvg)) + .average() + .orElse(0.0D); + System.out.println("avedev,partition," + avedevPartition); + return; + } + var brokerIds = + admin.brokers().toCompletableFuture().join().stream().map(Broker::id).toList(); + Function> ids = + index -> { + if (index <= brokerIds.size() / 2) return List.of(brokerIds.get(0)); + return List.of((int) (Math.random() * brokerIds.size())); + }; + param.topics.forEach( + topic -> + admin + .creator() + .topic(topic) + .replicasAssignments( + IntStream.range(0, brokerIds.size()) + .boxed() + .collect(Collectors.toMap(i -> i, ids))) + .run() + .toCompletableFuture() + .join()); + System.out.println("succeed to create topics: " + param.topics); + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you subscribed", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = false) + List topics; + } +}