diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index c53c89de60..76cea37259 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,6 +23,8 @@ import java.util.Map; import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; +import org.astraea.app.homework.BulkChecker; +import org.astraea.app.homework.BulkSender; import org.astraea.app.homework.Prepare; import org.astraea.app.homework.SendYourData; import org.astraea.app.performance.Performance; @@ -33,6 +35,10 @@ public class App { private static final Map> MAIN_CLASSES = Map.of( + "bulk_sender", + BulkSender.class, + "bulk_checker", + BulkChecker.class, "performance", Performance.class, "prepare", diff --git a/app/src/main/java/org/astraea/app/homework/BulkChecker.java b/app/src/main/java/org/astraea/app/homework/BulkChecker.java new file mode 100644 index 0000000000..baa48dad35 --- /dev/null +++ b/app/src/main/java/org/astraea/app/homework/BulkChecker.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.homework; + +import com.beust.jcommander.Parameter; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.astraea.app.argument.StringListField; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Replica; +import org.astraea.common.consumer.Consumer; +import org.astraea.common.consumer.ConsumerConfigs; + +public class BulkChecker { + public static void main(String[] args) throws IOException, InterruptedException { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws IOException, InterruptedException { + var sizes = new HashMap(); + var count = 0L; + try (var admin = Admin.of(param.bootstrapServers()); + var consumer = + Consumer.forTopics(Set.copyOf(param.topics)) + .bootstrapServers(param.bootstrapServers()) + .config( + ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) + .build()) { + var tpsAndOffsets = + admin + .clusterInfo(Set.copyOf(param.topics)) + .thenApply( + c -> + c.replicaLeaders().stream() + .map(Replica::topicPartition) + .collect(Collectors.toSet())) + .thenCompose(admin::latestOffsets) + .toCompletableFuture() + .join(); + var totalRecords = tpsAndOffsets.values().stream().mapToLong(Long::longValue).sum(); + while (count < totalRecords) { + var records = consumer.poll(Duration.ofSeconds(3)); + records.forEach( + r -> + sizes.put( + r.topic(), + sizes.getOrDefault(r.topic(), 0L) + + r.serializedKeySize() + + r.serializedValueSize())); + count += records.size(); + } + } + System.out.println("records=" + count); + System.out.println("size=" + sizes.values().stream().mapToLong(Long::longValue).sum()); + sizes.forEach((k, v) -> System.out.println(k + "=" + v)); + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you should check", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = true) + List topics; + } +} diff --git a/app/src/main/java/org/astraea/app/homework/BulkSender.java b/app/src/main/java/org/astraea/app/homework/BulkSender.java new file mode 100644 index 0000000000..73eb8ce79c --- /dev/null +++ b/app/src/main/java/org/astraea/app/homework/BulkSender.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.homework; + +import com.beust.jcommander.Parameter; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.astraea.app.argument.DataSizeField; +import org.astraea.app.argument.StringListField; +import org.astraea.common.DataSize; +import org.astraea.common.admin.AdminConfigs; + +public class BulkSender { + public static void main(String[] args) throws IOException, InterruptedException { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws IOException, InterruptedException { + // you must create topics for best configs + try (var admin = + Admin.create(Map.of(AdminConfigs.BOOTSTRAP_SERVERS_CONFIG, param.bootstrapServers()))) { + for (var t : param.topics) { + admin.createTopics(List.of(new NewTopic(t, 1, (short) 1))).all(); + } + } + // you must manage producers for best performance + try (var producer = + new KafkaProducer<>( + Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, param.bootstrapServers()), + new StringSerializer(), + new StringSerializer())) { + var size = new AtomicLong(0); + var key = "key"; + var value = "value"; + while (size.get() < param.dataSize.bytes()) { + var topic = param.topics.get((int) (Math.random() * param.topics.size())); + producer.send( + new ProducerRecord<>(topic, key, value), + (m, e) -> { + if (e == null) size.addAndGet(m.serializedKeySize() + m.serializedValueSize()); + }); + } + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you should send", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = true) + List topics; + + @Parameter( + names = {"--dataSize"}, + description = "data size: total size you have to send", + validateWith = DataSizeField.class, + converter = DataSizeField.class) + DataSize dataSize = DataSize.GB.of(10); + } +}