diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 4180d4c246..c53c89de60 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,8 +23,9 @@ import java.util.Map; import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; +import org.astraea.app.homework.Prepare; +import org.astraea.app.homework.SendYourData; import org.astraea.app.performance.Performance; -import org.astraea.app.performance.Prepare; import org.astraea.app.publisher.MetricPublisher; import org.astraea.app.version.Version; import org.astraea.app.web.WebService; @@ -36,6 +37,8 @@ public class App { Performance.class, "prepare", Prepare.class, + "send_your_data", + SendYourData.class, "automation", Automation.class, "web", diff --git a/app/src/main/java/org/astraea/app/performance/Prepare.java b/app/src/main/java/org/astraea/app/homework/Prepare.java similarity index 97% rename from app/src/main/java/org/astraea/app/performance/Prepare.java rename to app/src/main/java/org/astraea/app/homework/Prepare.java index 8b0d35ffc0..040fc66e99 100644 --- a/app/src/main/java/org/astraea/app/performance/Prepare.java +++ b/app/src/main/java/org/astraea/app/homework/Prepare.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.performance; +package org.astraea.app.homework; import com.beust.jcommander.Parameter; import java.util.List; @@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument { names = {"--topics"}, description = "List: topic names which you subscribed", validateWith = StringListField.class, - listConverter = StringListField.class, - required = false) + listConverter = StringListField.class) List topics; } } diff --git a/app/src/main/java/org/astraea/app/homework/SendYourData.java b/app/src/main/java/org/astraea/app/homework/SendYourData.java new file mode 100644 index 0000000000..443cca4921 --- /dev/null +++ b/app/src/main/java/org/astraea/app/homework/SendYourData.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.homework; + +import com.beust.jcommander.Parameter; +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.astraea.app.argument.DurationField; +import org.astraea.app.argument.StringListField; +import org.astraea.common.DataSize; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.TopicPartition; + +public class SendYourData { + + private static final int NUMBER_OF_PARTITIONS = 4; + + public static void main(String[] args) throws IOException, InterruptedException { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws IOException, InterruptedException { + try (var admin = Admin.of(param.bootstrapServers())) { + param.topics.forEach( + t -> + admin + .creator() + .topic(t) + .numberOfReplicas((short) 1) + .numberOfPartitions(NUMBER_OF_PARTITIONS) + .run() + .toCompletableFuture() + .join()); + admin.waitCluster( + Set.copyOf(param.topics), + clusterInfo -> clusterInfo.topicNames().containsAll(param.topics), + Duration.ofSeconds(10), + 1); + } + var keys = + List.of( + new Key(IntStream.range(0, 1000).mapToObj(Long::valueOf).toList()), + new Key(IntStream.range(0, 2500).mapToObj(Long::valueOf).toList()), + new Key(IntStream.range(0, 3000).mapToObj(Long::valueOf).toList())); + var max = Runtime.getRuntime().totalMemory(); + try (var sender = new YourSender(param.bootstrapServers())) { + var start = System.currentTimeMillis(); + var fs = + IntStream.range(0, 5) + .mapToObj( + __ -> + CompletableFuture.runAsync( + () -> { + while (System.currentTimeMillis() - start + <= param.duration.toMillis()) { + keys.forEach(k -> sender.send(param.topics, k)); + } + })) + .toList(); + while (!fs.stream().allMatch(CompletableFuture::isDone)) { + max = Math.max(max, Runtime.getRuntime().totalMemory()); + TimeUnit.MILLISECONDS.sleep(300); + } + } + try (var admin = Admin.of(param.bootstrapServers())) { + var offsets = + param.topics.stream() + .collect( + Collectors.toUnmodifiableMap( + t -> t, + t -> + admin + .latestOffsets( + IntStream.range(0, NUMBER_OF_PARTITIONS) + .mapToObj(i -> TopicPartition.of(t, i)) + .collect(Collectors.toSet())) + .toCompletableFuture() + .join() + .values() + .stream() + .mapToLong(i -> i) + .sum())); + var gc = ManagementFactory.getGarbageCollectorMXBeans(); + System.out.println("memory=" + DataSize.Byte.of(max)); + System.out.println( + "gc_count=" + + ManagementFactory.getGarbageCollectorMXBeans().stream() + .mapToLong(GarbageCollectorMXBean::getCollectionCount) + .sum()); + System.out.println( + "gc_time=" + + ManagementFactory.getGarbageCollectorMXBeans().stream() + .mapToLong(GarbageCollectorMXBean::getCollectionTime) + .sum()); + offsets.forEach((t, o) -> System.out.println(t + "=" + o)); + } + } + + public record Key(List vs) {} + + public static class YourSender implements Closeable { + private final KafkaProducer producer; + + @Override + public void close() throws IOException { + producer.close(); + } + + public YourSender(String bootstrapServers) { + Serializer serializer = + (topic, key) -> { + var buffer = ByteBuffer.allocate(Long.BYTES * key.vs.size()); + key.vs.forEach(buffer::putLong); + buffer.flip(); + var bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + }; + producer = + new KafkaProducer<>( + Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers), + serializer, + new ByteArraySerializer()); + } + + public void send(List topic, Key key) { + // topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null))); + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you subscribed", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = true) + List topics; + + @Parameter( + names = {"--duration"}, + description = "duration: the time to test", + validateWith = DurationField.class, + converter = DurationField.class) + Duration duration = Duration.ofSeconds(20); + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java similarity index 97% rename from common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java rename to app/src/main/java/org/astraea/app/homework/YourPartitioner.java index 8ae5bc82e8..c0bea8ec55 100644 --- a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java +++ b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.common.partitioner; +package org.astraea.app.homework; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner;