Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 21, 2024
1 parent 4c38461 commit 9dcb214
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 0 deletions.
6 changes: 6 additions & 0 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.homework.BulkChecker;
import org.astraea.app.homework.BulkSender;
import org.astraea.app.homework.Prepare;
import org.astraea.app.homework.SendYourData;
import org.astraea.app.performance.Performance;
Expand All @@ -33,6 +35,10 @@
public class App {
private static final Map<String, Class<?>> MAIN_CLASSES =
Map.of(
"bulk_sender",
BulkSender.class,
"bulk_checker",
BulkChecker.class,
"performance",
Performance.class,
"prepare",
Expand Down
86 changes: 86 additions & 0 deletions app/src/main/java/org/astraea/app/homework/BulkChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.astraea.app.argument.StringListField;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Replica;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;

public class BulkChecker {
public static void main(String[] args) throws IOException, InterruptedException {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) throws IOException, InterruptedException {
var sizes = new HashMap<String, Long>();
var count = 0L;
try (var admin = Admin.of(param.bootstrapServers());
var consumer =
Consumer.forTopics(Set.copyOf(param.topics))
.bootstrapServers(param.bootstrapServers())
.config(
ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG,
ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST)
.build()) {
var tpsAndOffsets =
admin
.clusterInfo(Set.copyOf(param.topics))
.thenApply(
c ->
c.replicaLeaders().stream()
.map(Replica::topicPartition)
.collect(Collectors.toSet()))
.thenCompose(admin::latestOffsets)
.toCompletableFuture()
.join();
var totalRecords = tpsAndOffsets.values().stream().mapToLong(Long::longValue).sum();
while (count < totalRecords) {
var records = consumer.poll(Duration.ofSeconds(3));
records.forEach(
r ->
sizes.put(
r.topic(),
sizes.getOrDefault(r.topic(), 0L)
+ r.serializedKeySize()
+ r.serializedValueSize()));
count += records.size();
}
}
System.out.println("records=" + count);
System.out.println("size=" + sizes.values().stream().mapToLong(Long::longValue).sum());
sizes.forEach((k, v) -> System.out.println(k + "=" + v));
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you should check",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = true)
List<String> topics;
}
}
84 changes: 84 additions & 0 deletions app/src/main/java/org/astraea/app/homework/BulkSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.astraea.app.argument.DataSizeField;
import org.astraea.app.argument.StringListField;
import org.astraea.common.DataSize;
import org.astraea.common.admin.AdminConfigs;

public class BulkSender {
public static void main(String[] args) throws IOException, InterruptedException {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) throws IOException, InterruptedException {
// you must create topics for best configs
try (var admin =
Admin.create(Map.of(AdminConfigs.BOOTSTRAP_SERVERS_CONFIG, param.bootstrapServers()))) {
for (var t : param.topics) {
admin.createTopics(List.of(new NewTopic(t, 1, (short) 1))).all();
}
}
// you must manage producers for best performance
try (var producer =
new KafkaProducer<>(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, param.bootstrapServers()),
new StringSerializer(),
new StringSerializer())) {
var size = new AtomicLong(0);
var key = "key";
var value = "value";
while (size.get() < param.dataSize.bytes()) {
var topic = param.topics.get((int) (Math.random() * param.topics.size()));
producer.send(
new ProducerRecord<>(topic, key, value),
(m, e) -> {
if (e == null) size.addAndGet(m.serializedKeySize() + m.serializedValueSize());
});
}
}
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you should send",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = true)
List<String> topics;

@Parameter(
names = {"--dataSize"},
description = "data size: total size you have to send",
validateWith = DataSizeField.class,
converter = DataSizeField.class)
DataSize dataSize = DataSize.GB.of(10);
}
}

0 comments on commit 9dcb214

Please sign in to comment.