diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 4180d4c246..c53c89de60 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,8 +23,9 @@ import java.util.Map; import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; +import org.astraea.app.homework.Prepare; +import org.astraea.app.homework.SendYourData; import org.astraea.app.performance.Performance; -import org.astraea.app.performance.Prepare; import org.astraea.app.publisher.MetricPublisher; import org.astraea.app.version.Version; import org.astraea.app.web.WebService; @@ -36,6 +37,8 @@ public class App { Performance.class, "prepare", Prepare.class, + "send_your_data", + SendYourData.class, "automation", Automation.class, "web", diff --git a/app/src/main/java/org/astraea/app/performance/Prepare.java b/app/src/main/java/org/astraea/app/homework/Prepare.java similarity index 97% rename from app/src/main/java/org/astraea/app/performance/Prepare.java rename to app/src/main/java/org/astraea/app/homework/Prepare.java index 8b0d35ffc0..040fc66e99 100644 --- a/app/src/main/java/org/astraea/app/performance/Prepare.java +++ b/app/src/main/java/org/astraea/app/homework/Prepare.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.performance; +package org.astraea.app.homework; import com.beust.jcommander.Parameter; import java.util.List; @@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument { names = {"--topics"}, description = "List: topic names which you subscribed", validateWith = StringListField.class, - listConverter = StringListField.class, - required = false) + listConverter = StringListField.class) List topics; } } diff --git a/app/src/main/java/org/astraea/app/homework/SendYourData.java b/app/src/main/java/org/astraea/app/homework/SendYourData.java new file mode 100644 index 0000000000..67986893e5 --- /dev/null +++ b/app/src/main/java/org/astraea/app/homework/SendYourData.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.homework; + +import com.beust.jcommander.Parameter; +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.astraea.app.argument.NonNegativeIntegerField; +import org.astraea.app.argument.StringListField; +import org.astraea.common.admin.Admin; + +public class SendYourData { + + public static void main(String[] args) throws IOException { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws IOException { + try (var admin = Admin.of(param.bootstrapServers())) { + param.topics.forEach( + t -> + admin + .creator() + .topic(t) + .numberOfReplicas((short) 1) + .numberOfPartitions(10) + .run() + .toCompletableFuture() + .join()); + admin.waitCluster( + Set.copyOf(param.topics), + clusterInfo -> clusterInfo.topicNames().containsAll(param.topics), + Duration.ofSeconds(10), + 1); + } + var start = System.currentTimeMillis(); + var max = Runtime.getRuntime().totalMemory(); + var last = -1L; + try (var sender = new YourSender(param.bootstrapServers())) { + IntStream.range(0, param.count) + .forEach(index -> sender.send(param.topics, new Key(index, index, index))); + if (System.currentTimeMillis() - last > 5000) { + max = Math.max(max, Runtime.getRuntime().totalMemory()); + last = System.currentTimeMillis(); + } + } + System.out.println("elapsed=" + (System.currentTimeMillis() - start)); + System.out.println("memory=" + Math.max(max, Runtime.getRuntime().totalMemory())); + } + + public record Key(int v0, int v1, int v2) {} + + public static class YourSender implements Closeable { + private final KafkaProducer producer; + + @Override + public void close() throws IOException { + producer.close(); + } + + public YourSender(String bootstrapServers) { + Serializer serializer = + (topic, key) -> { + var buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + Long.BYTES); + buffer.putInt(key.v0); + buffer.putInt(key.v1); + buffer.putInt(key.v2); + buffer.flip(); + var bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + }; + producer = + new KafkaProducer<>( + Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers), + serializer, + new ByteArraySerializer()); + } + + public void send(List topic, Key key) { + topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null))); + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you subscribed", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = true) + List topics; + + @Parameter( + names = {"--count"}, + description = "Long: the record count", + validateWith = NonNegativeIntegerField.class) + int count = 100; + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java similarity index 97% rename from common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java rename to app/src/main/java/org/astraea/app/homework/YourPartitioner.java index 8ae5bc82e8..c0bea8ec55 100644 --- a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java +++ b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.common.partitioner; +package org.astraea.app.homework; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner;