Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Mar 8, 2024
1 parent 35aff76 commit a2dc6c8
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 5 deletions.
20 changes: 20 additions & 0 deletions common/src/main/java/org/astraea/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -392,6 +393,10 @@ public static Object member(Object object, String attribute) {
return reflectionAttribute(object.getClass(), object, attribute);
}

public static Object method(Object object, String name) {
return reflectionMethod(object.getClass(), object, name);
}

/**
* reflection class attribute
*
Expand All @@ -415,6 +420,21 @@ private static Object reflectionAttribute(Class<?> clz, Object object, String at
throw new RuntimeException(attribute + " is not existent in " + object.getClass().getName());
}

private static Object reflectionMethod(Class<?> clz, Object object, String attribute) {
do {
try {
var method = clz.getDeclaredMethod(attribute);
method.setAccessible(true);
return method.invoke(object);
} catch (NoSuchMethodException e) {
clz = clz.getSuperclass();
} catch (Exception e) {
throw new RuntimeException(e);
}
} while (clz != null);
throw new RuntimeException(attribute + " is not existent in " + object.getClass().getName());
}

public static <T> List<T> constants(
Class<?> clz, Predicate<String> variableNameFilter, Class<T> cast) {
return Arrays.stream(clz.getFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected abstract static class BaseConsumer<Key, Value> implements Consumer<Key
public BaseConsumer(org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
// KafkaConsumer does not expose client-id
this.clientId = (String) Utils.member(kafkaConsumer, "clientId");
this.clientId = (String) Utils.method(kafkaConsumer, "clientId");
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def versions = [
"jmh-core" : project.properties['jmh-core.version'] ?: "1.36",
"jmh-generator-annprocess": project.properties['jmh-generator.version'] ?: "1.36",
junit : project.properties['junit.version'] ?: "5.9.3",
kafka : project.properties['kafka.version'] ?: "3.5.1",
kafka : project.properties['kafka.version'] ?: "3.7.0",
mockito : project.properties['mockito.version'] ?: "5.4.0",
"mockito-inline" : project.properties['mockito.version'] ?: "5.2.0",
scala : project.properties['scala.version'] ?: "2.13.11",
Expand All @@ -57,6 +57,7 @@ libs += [
"kafka-connect-runtime" : "org.apache.kafka:connect-runtime:${versions["kafka"]}",
"kafka-core" : "org.apache.kafka:kafka_2.13:${versions["kafka"]}",
"kafka-server-common" : "org.apache.kafka:kafka-server-common:${versions["kafka"]}",
"kafka-metadata" : "org.apache.kafka:kafka-metadata:${versions["kafka"]}",
"mockito-core" : "org.mockito:mockito-core:${versions["mockito"]}",
"mockito-inline" : "org.mockito:mockito-inline:${versions["mockito-inline"]}",
scala : "org.scala-lang:scala-library:${versions["scala"]}",
Expand Down
1 change: 1 addition & 0 deletions it/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation libs["junit"]
implementation libs["kafka-core"]
implementation libs["kafka-server-common"]
implementation libs["kafka-metadata"]
implementation libs["kafka-connect-runtime"]
implementation libs["kafka-connect-json"]
implementation libs["hadoop-minicluster"]
Expand Down
14 changes: 11 additions & 3 deletions it/src/main/java/org/astraea/it/BrokerCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,32 @@
import java.util.stream.IntStream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.server.common.MetadataVersion;

public interface BrokerCluster extends AutoCloseable {

private static CompletableFuture<Map.Entry<Integer, Server>> server(
Map<String, String> configs, Set<String> folders, String clusterId, int nodeId) {

StorageTool.formatCommand(
new PrintStream(new ByteArrayOutputStream()),
scala.collection.JavaConverters.collectionAsScalaIterableConverter(folders)
.asScala()
.toSeq(),
new MetaProperties(clusterId, nodeId),
MetadataVersion.latest(),
new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
.setNodeId(nodeId)
.setDirectoryId(DirectoryId.random())
.build(),
MetadataVersion.latestProduction(),
true);

return CompletableFuture.supplyAsync(
Expand Down

0 comments on commit a2dc6c8

Please sign in to comment.