Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Oct 26, 2024
1 parent 7276b28 commit 3a19894
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 50 deletions.
121 changes: 76 additions & 45 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.astraea.app.argument.DataRateField;
import org.astraea.app.argument.DataSizeField;
import org.astraea.app.argument.DistributionTypeField;
Expand All @@ -58,6 +59,7 @@
import org.astraea.common.admin.Partition;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.admin.TopicPartitionPath;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.partitioner.Partitioner;
Expand Down Expand Up @@ -103,52 +105,75 @@ public static List<String> execute(final Argument param) {
() -> producerThreads.stream().allMatch(AbstractThread::closed),
() -> consumerThreads.stream().allMatch(AbstractThread::closed));

var fileWriterTask =
CompletableFuture.completedFuture(
param.CSVPath == null
? (Runnable) (() -> {})
: ReportFormat.createFileWriter(
param.reportFormat,
param.CSVPath,
() -> consumerThreads.stream().allMatch(AbstractThread::closed),
() -> producerThreads.stream().allMatch(AbstractThread::closed)))
.thenAcceptAsync(Runnable::run);

var monkeys = MonkeyThread.play(consumerThreads, param);

CompletableFuture.runAsync(
() -> {
dataGenerator.waitForDone();
var last = 0L;
var lastChange = System.currentTimeMillis();
while (true) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
try (var admin = Admin.of(param.bootstrapServers())) {
Supplier<LongStream> sizes =
() ->
admin.brokers().toCompletableFuture().join().stream()
.filter(
b ->
b.topicPartitionPaths().stream()
.anyMatch(p -> param.topics.contains(p.topic())))
.mapToLong(
b ->
b.topicPartitionPaths().stream()
.mapToLong(TopicPartitionPath::size)
.sum());

var fileWriterTask =
CompletableFuture.completedFuture(
param.CSVPath == null
? (Runnable) (() -> {})
: ReportFormat.createFileWriter(
param.reportFormat,
param.CSVPath,
() -> consumerThreads.stream().allMatch(AbstractThread::closed),
() -> producerThreads.stream().allMatch(AbstractThread::closed),
param.logInterval,
List.of(
ReportFormat.CSVContentElement.create(
"max size", () -> String.valueOf(sizes.get().max().getAsLong())),
ReportFormat.CSVContentElement.create(
"min size",
() -> String.valueOf(sizes.get().min().getAsLong())))))
.thenAcceptAsync(Runnable::run);

var monkeys = MonkeyThread.play(consumerThreads, param);

CompletableFuture.runAsync(
() -> {
dataGenerator.waitForDone();
var last = 0L;
var lastChange = System.currentTimeMillis();
while (true) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
var unfinishedProducers =
producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
}

if (current != last) {
last = current;
lastChange = System.currentTimeMillis();
}
if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) {
consumerThreads.forEach(AbstractThread::close);
monkeys.forEach(AbstractThread::close);
}
if (consumerThreads.stream().allMatch(AbstractThread::closed)
&& monkeys.stream().allMatch(AbstractThread::closed)
&& producerThreads.stream().allMatch(AbstractThread::closed)) return;
Utils.sleep(Duration.ofSeconds(1));
}

if (current != last) {
last = current;
lastChange = System.currentTimeMillis();
}
if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) {
consumerThreads.forEach(AbstractThread::close);
monkeys.forEach(AbstractThread::close);
}
if (consumerThreads.stream().allMatch(AbstractThread::closed)
&& monkeys.stream().allMatch(AbstractThread::closed)
&& producerThreads.stream().allMatch(AbstractThread::closed)) return;
Utils.sleep(Duration.ofSeconds(1));
}
});
producerThreads.forEach(AbstractThread::waitForDone);
monkeys.forEach(AbstractThread::waitForDone);
consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
fileWriterTask.join();
return param.topics;
});
producerThreads.forEach(AbstractThread::waitForDone);
monkeys.forEach(AbstractThread::waitForDone);
consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
fileWriterTask.join();
return param.topics;
}
}

static List<ConsumerThread> consumers(Argument param, Map<TopicPartition, Long> latestOffsets) {
Expand Down Expand Up @@ -263,6 +288,12 @@ String partitioner() {
return this.partitioner;
}

@Parameter(
names = {"--log.interval"},
description = "integer: seconds to log csv output",
validateWith = PositiveLongField.class)
Duration logInterval = Duration.ofSeconds(2);

@Parameter(
names = {"--transaction.size"},
description =
Expand Down
27 changes: 22 additions & 5 deletions app/src/main/java/org/astraea/app/performance/ReportFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ public static Runnable createFileWriter(
ReportFormat reportFormat,
Path path,
Supplier<Boolean> consumerDone,
Supplier<Boolean> producerDone) {
Supplier<Boolean> producerDone,
Duration interval) {
return createFileWriter(reportFormat, path, consumerDone, producerDone, interval, List.of());
}

public static Runnable createFileWriter(
ReportFormat reportFormat,
Path path,
Supplier<Boolean> consumerDone,
Supplier<Boolean> producerDone,
Duration interval,
List<CSVContentElement> extraCsv) {
var filePath =
FileSystems.getDefault()
.getPath(
Expand All @@ -81,12 +92,18 @@ public static Runnable createFileWriter(
var writer = new BufferedWriter(Utils.packException(() -> new FileWriter(filePath.toFile())));
switch (reportFormat) {
case CSV -> {
initCSVFormat(writer, latencyAndIO());
Supplier<List<CSVContentElement>> elements =
() -> {
var e = new ArrayList<>(latencyAndIO());
e.addAll(extraCsv);
return e;
};
initCSVFormat(writer, elements.get());
return () -> {
try {
while (!(producerDone.get() && consumerDone.get())) {
logToCSV(writer, latencyAndIO());
Utils.sleep(Duration.ofSeconds(1));
logToCSV(writer, elements.get());
Utils.sleep(interval);
}
} finally {
Utils.close(writer);
Expand Down Expand Up @@ -138,7 +155,7 @@ static void logToJSON(BufferedWriter writer, List<CSVContentElement> elements) {
}

// Visible for test
interface CSVContentElement {
public interface CSVContentElement {
String title();

String value();
Expand Down

0 comments on commit 3a19894

Please sign in to comment.