Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 2, 2024
1 parent 8fc7470 commit da19d80
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 213 deletions.
77 changes: 30 additions & 47 deletions app/src/main/java/org/astraea/app/performance/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -38,7 +35,7 @@
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.stats.Avg;

public interface ConsumerThread extends AbstractThread {
interface ConsumerThread extends AbstractThread {
String DOMAIN_NAME = "org.astraea";
String TYPE_PROPERTY = "type";
String TYPE_VALUE = "consumer";
Expand Down Expand Up @@ -72,26 +69,12 @@ static List<ConsumerThread> create(
BiFunction<String, ConsumerRebalanceListener, SubscribedConsumer<byte[], byte[]>>
consumerSupplier) {
if (consumers == 0) return List.of();
var closeLatches =
IntStream.range(0, consumers).mapToObj(ignored -> new CountDownLatch(1)).toList();
var executors = Executors.newFixedThreadPool(consumers);
// monitor
CompletableFuture.runAsync(
() -> {
try {
closeLatches.forEach(l -> Utils.swallowException(l::await));
} finally {
executors.shutdown();
Utils.swallowException(() -> executors.awaitTermination(30, TimeUnit.SECONDS));
}
});
return IntStream.range(0, consumers)
.mapToObj(
index -> {
var clientId = Performance.CLIENT_ID_PREFIX + "-consumer-" + index;
var consumer = consumerSupplier.apply(clientId, new PartitionRatioListener(clientId));
var closed = new AtomicBoolean(false);
var closeLatch = closeLatches.get(index);
var subscribed = new AtomicBoolean(true);
var sensor =
Sensor.builder()
Expand All @@ -109,43 +92,43 @@ static List<ConsumerThread> create(
Double.class,
() -> sensor.measure(EXP_WEIGHT_BY_TIME_PROPERTY))
.register();
executors.execute(
() -> {
try {
while (!closed.get()) {
if (subscribed.get()) consumer.resubscribe();
else {
consumer.unsubscribe();
Utils.sleep(Duration.ofSeconds(1));
continue;
var future =
CompletableFuture.runAsync(
() -> {
try {
while (!closed.get()) {
if (subscribed.get()) consumer.resubscribe();
else {
consumer.unsubscribe();
Utils.sleep(Duration.ofSeconds(1));
continue;
}
consumer.poll(Duration.ofSeconds(1)).stream()
.mapToLong(r -> System.currentTimeMillis() - r.timestamp())
.average()
.ifPresent(sensor::record);
}
} catch (WakeupException ignore) {
// Stop polling and being ready to clean up
} finally {
Utils.close(consumer);
closed.set(true);
CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId);
CLIENT_ID_REVOKED_PARTITIONS.remove(clientId);
NON_STICKY_SENSOR.remove(clientId);
DIFFERENCE_SENSOR.remove(clientId);
}
consumer.poll(Duration.ofSeconds(1)).stream()
.mapToLong(r -> System.currentTimeMillis() - r.timestamp())
.average()
.ifPresent(sensor::record);
}
} catch (WakeupException ignore) {
// Stop polling and being ready to clean up
} finally {
Utils.close(consumer);
closeLatch.countDown();
closed.set(true);
CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId);
CLIENT_ID_REVOKED_PARTITIONS.remove(clientId);
NON_STICKY_SENSOR.remove(clientId);
DIFFERENCE_SENSOR.remove(clientId);
}
});
});
return new ConsumerThread() {

@Override
public void waitForDone() {
Utils.swallowException(closeLatch::await);
Utils.swallowException(future::join);
}

@Override
public boolean closed() {
return closeLatch.getCount() == 0;
return future.isDone();
}

@Override
Expand All @@ -161,7 +144,7 @@ public void unsubscribe() {
@Override
public void close() {
closed.set(true);
Utils.swallowException(closeLatch::await);
waitForDone();
}
};
})
Expand Down
94 changes: 27 additions & 67 deletions app/src/main/java/org/astraea/app/performance/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@
package org.astraea.app.performance;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import org.astraea.common.Configuration;
Expand All @@ -34,13 +29,11 @@
import org.astraea.common.producer.Record;
import org.astraea.common.producer.RecordGenerator;

public interface DataGenerator extends AbstractThread {
interface DataGenerator extends AbstractThread {
static DataGenerator of(
List<ArrayBlockingQueue<List<Record<byte[], byte[]>>>> queues,
BlockingQueue<List<Record<byte[], byte[]>>> queue,
Supplier<TopicPartition> partitionSelector,
Performance.Argument argument) {
if (queues.size() == 0) return terminatedGenerator();

var keyDistConfig = new Configuration(argument.keyDistributionConfig);
var keySizeDistConfig = new Configuration(argument.keySizeDistributionConfig);
var valueDistConfig = new Configuration(argument.valueDistributionConfig);
Expand All @@ -61,61 +54,43 @@ static DataGenerator of(
argument.valueSize.measurement(DataUnit.Byte).intValue(), valueDistConfig))
.throughput(tp -> argument.throttles.getOrDefault(tp, argument.throughput))
.build();
var closeLatch = new CountDownLatch(1);
var executor = Executors.newFixedThreadPool(1);
var closed = new AtomicBoolean(false);
var start = System.currentTimeMillis();
var dataCount = new AtomicLong(0);

// monitor the data generator if close or not
CompletableFuture.runAsync(
() -> {
try {
Utils.swallowException(closeLatch::await);
} finally {
executor.shutdown();
Utils.swallowException(() -> executor.awaitTermination(30, TimeUnit.SECONDS));
}
});

// put the data into blocking queue
CompletableFuture.runAsync(
() ->
executor.execute(
() -> {
try {
var future =
CompletableFuture.runAsync(
() -> {
try {
long dataCount = 0;
while (!closed.get()) {
// check the generator is finished or not
if (argument.exeTime.percentage(dataCount, System.currentTimeMillis() - start)
>= 100D) return;
var tp = partitionSelector.get();
var records = dataSupplier.apply(tp);
dataCount += records.size();

while (!closed.get()) {
// check the generator is finished or not
if (argument.exeTime.percentage(
dataCount.getAndIncrement(), System.currentTimeMillis() - start)
>= 100D) return;
// throttled data wouldn't put into the queue
if (records.isEmpty()) continue;
queue.put(records);
}
} catch (InterruptedException e) {
throw new RuntimeException("The data generator didn't close properly", e);
} finally {
closed.set(true);
}
});

var tp = partitionSelector.get();
var records = dataSupplier.apply(tp);

// throttled data wouldn't put into the queue
if (records.isEmpty()) continue;
var queue = queues.get(ThreadLocalRandom.current().nextInt(queues.size()));
queue.put(records);
}
} catch (InterruptedException e) {
if (closeLatch.getCount() != 0 || closed.get())
throw new RuntimeException(e + ", The data generator didn't close properly");
} finally {
closeLatch.countDown();
closed.set(true);
}
}));
return new DataGenerator() {
@Override
public void waitForDone() {
Utils.swallowException(closeLatch::await);
Utils.swallowException(future::join);
}

@Override
public boolean closed() {
return closeLatch.getCount() == 0;
return future.isDone();
}

@Override
Expand All @@ -125,19 +100,4 @@ public void close() {
}
};
}

static DataGenerator terminatedGenerator() {
return new DataGenerator() {
@Override
public void waitForDone() {}

@Override
public boolean closed() {
return true;
}

@Override
public void close() {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;

public class MonkeyThread implements AbstractThread {
class MonkeyThread implements AbstractThread {

private final CountDownLatch closeLatch;
private final AtomicBoolean close;
Expand Down
22 changes: 13 additions & 9 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +32,6 @@
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.argument.DataRateField;
import org.astraea.app.argument.DataSizeField;
import org.astraea.app.argument.DistributionTypeField;
Expand Down Expand Up @@ -75,10 +73,7 @@ public static void main(String[] args) {
}

public static List<String> execute(final Argument param) {
var blockingQueues =
IntStream.range(0, param.producers)
.mapToObj(i -> new ArrayBlockingQueue<List<Record<byte[], byte[]>>>(3000))
.toList();
var dataQueue = new ArrayBlockingQueue<List<Record<byte[], byte[]>>>(3000);
// ensure topics are existent
System.out.println("checking topics: " + String.join(",", param.topics));
param.checkTopics();
Expand All @@ -87,14 +82,16 @@ public static List<String> execute(final Argument param) {
var latestOffsets = param.lastOffsets();

System.out.println("creating threads");
var producerThreads = ProducerThread.create(blockingQueues, param::createProducer);
var producerThreads =
ProducerThread.create(
dataQueue, param::createProducer, param.producers, param.producerThreads);
var consumerThreads =
param.monkeys != null
? Collections.synchronizedList(new ArrayList<>(consumers(param, latestOffsets)))
: consumers(param, latestOffsets);

System.out.println("creating data generator");
var dataGenerator = DataGenerator.of(blockingQueues, param.topicPartitionSelector(), param);
var dataGenerator = DataGenerator.of(dataQueue, param.topicPartitionSelector(), param);

System.out.println("creating tracker");
var tracker =
Expand Down Expand Up @@ -150,7 +147,7 @@ public static List<String> execute(final Argument param) {
while (true) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
if (dataQueue.isEmpty()) {
var unfinishedProducers =
producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
Expand Down Expand Up @@ -233,6 +230,13 @@ Map<TopicPartition, Long> lastOffsets() {
}
}

@Parameter(
names = {"--producerThreads"},
description = "Integer: number of producer threads",
validateWith = NonNegativeShortField.class,
converter = NonNegativeShortField.class)
int producerThreads = 1;

@Parameter(
names = {"--producers"},
description = "Integer: number of producers to produce records",
Expand Down
Loading

0 comments on commit da19d80

Please sign in to comment.