From a599f2f3e99326c437cb99e15648def03ee60afd Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 27 Oct 2024 02:14:53 +0800 Subject: [PATCH] [COMMON] display all nodes in pref log --- .../astraea/app/performance/Performance.java | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index dfb6eafa7f..6a218155ff 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -30,11 +30,11 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.LongStream; import org.astraea.app.argument.DataRateField; import org.astraea.app.argument.DataSizeField; import org.astraea.app.argument.DistributionTypeField; @@ -56,6 +56,7 @@ import org.astraea.common.DistributionType; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Broker; import org.astraea.common.admin.Partition; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; @@ -106,18 +107,24 @@ public static List execute(final Argument param) { () -> consumerThreads.stream().allMatch(AbstractThread::closed)); try (var admin = Admin.of(param.bootstrapServers())) { - Supplier sizes = - () -> + var matchedBrokers = + admin.brokers().toCompletableFuture().join().stream() + .filter( + b -> + b.topicPartitionPaths().stream() + .anyMatch(p -> param.topics.contains(p.topic()))) + .map(Broker::host) + .collect(Collectors.toSet()); + Function size = + host -> admin.brokers().toCompletableFuture().join().stream() - .filter( - b -> - b.topicPartitionPaths().stream() - .anyMatch(p -> param.topics.contains(p.topic()))) + .filter(b -> b.host().equals(host)) .mapToLong( b -> b.topicPartitionPaths().stream() .mapToLong(TopicPartitionPath::size) - .sum()); + .sum()) + .sum(); var fileWriterTask = CompletableFuture.completedFuture( @@ -129,14 +136,12 @@ public static List execute(final Argument param) { () -> consumerThreads.stream().allMatch(AbstractThread::closed), () -> producerThreads.stream().allMatch(AbstractThread::closed), param.logInterval, - List.of( - ReportFormat.CSVContentElement.create( - "size", () -> String.valueOf(sizes.get().sum())), - ReportFormat.CSVContentElement.create( - "max size", () -> String.valueOf(sizes.get().max().getAsLong())), - ReportFormat.CSVContentElement.create( - "min size", - () -> String.valueOf(sizes.get().min().getAsLong()))))) + matchedBrokers.stream() + .map( + host -> + ReportFormat.CSVContentElement.create( + host, () -> String.valueOf(size.apply(host)))) + .toList())) .thenAcceptAsync(Runnable::run); var monkeys = MonkeyThread.play(consumerThreads, param);