From 4bc880b6bb6bac93db371a71216d43a8c7464096 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 8 Mar 2024 10:27:11 +0800 Subject: [PATCH] [COMMON] update kafka from 3.5.1 to 3.7.0 (#1845) --- common/src/main/java/org/astraea/common/Utils.java | 14 ++++++++++++++ .../java/org/astraea/common/consumer/Builder.java | 2 +- docker/start_broker.sh | 4 ++-- docker/start_controller.sh | 4 ++-- docker/start_worker.sh | 4 ++-- gradle/dependencies.gradle | 3 ++- it/build.gradle | 1 + it/src/main/java/org/astraea/it/BrokerCluster.java | 14 +++++++++++--- 8 files changed, 35 insertions(+), 11 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Utils.java b/common/src/main/java/org/astraea/common/Utils.java index 4b4b4a81a1..43eb3412b6 100644 --- a/common/src/main/java/org/astraea/common/Utils.java +++ b/common/src/main/java/org/astraea/common/Utils.java @@ -392,6 +392,10 @@ public static Object member(Object object, String attribute) { return reflectionAttribute(object.getClass(), object, attribute); } + public static Object method(Object object, String name) { + return reflectionMethod(object.getClass(), object, name); + } + /** * reflection class attribute * @@ -415,6 +419,16 @@ private static Object reflectionAttribute(Class clz, Object object, String at throw new RuntimeException(attribute + " is not existent in " + object.getClass().getName()); } + private static Object reflectionMethod(Class clz, Object object, String attribute) { + try { + var method = clz.getDeclaredMethod(attribute); + method.setAccessible(true); + return method.invoke(object); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static List constants( Class clz, Predicate variableNameFilter, Class cast) { return Arrays.stream(clz.getFields()) diff --git a/common/src/main/java/org/astraea/common/consumer/Builder.java b/common/src/main/java/org/astraea/common/consumer/Builder.java index 16045a476a..30387dbb1c 100644 --- a/common/src/main/java/org/astraea/common/consumer/Builder.java +++ b/common/src/main/java/org/astraea/common/consumer/Builder.java @@ -178,7 +178,7 @@ protected abstract static class BaseConsumer implements Consumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; // KafkaConsumer does not expose client-id - this.clientId = (String) Utils.member(kafkaConsumer, "clientId"); + this.clientId = (String) Utils.method(kafkaConsumer, "clientId"); } @Override diff --git a/docker/start_broker.sh b/docker/start_broker.sh index a6e9f2f800..59ab403726 100755 --- a/docker/start_broker.sh +++ b/docker/start_broker.sh @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh # ===============================[global variables]=============================== declare -r ACCOUNT=${ACCOUNT:-skiptests} declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache} -declare -r VERSION=${REVISION:-${VERSION:-3.5.1}} +declare -r VERSION=${REVISION:-${VERSION:-3.7.0}} declare -r DOCKERFILE=$DOCKER_FOLDER/broker.dockerfile declare -r DATA_FOLDER_IN_CONTAINER_PREFIX="/tmp/log-folder" declare -r EXPORTER_VERSION="0.16.1" @@ -61,7 +61,7 @@ function showHelp() { echo " ACCOUNT=skiptests set the github account for astraea repo" echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set broker JVM memory" echo " REVISION=trunk set revision of kafka source code to build container" - echo " VERSION=3.5.1 set version of kafka distribution" + echo " VERSION=3.7.0 set version of kafka distribution" echo " BUILD=false set true if you want to build image locally" echo " RUN=false set false if you want to build/pull image only" echo " DATA_FOLDERS=/tmp/folder1 set host folders used by broker" diff --git a/docker/start_controller.sh b/docker/start_controller.sh index da70903665..9ea0fc995e 100755 --- a/docker/start_controller.sh +++ b/docker/start_controller.sh @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh # ===============================[global variables]=============================== declare -r ACCOUNT=${ACCOUNT:-skiptests} declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache} -declare -r VERSION=${REVISION:-${VERSION:-3.5.1}} +declare -r VERSION=${REVISION:-${VERSION:-3.7.0}} declare -r DOCKERFILE=$DOCKER_FOLDER/controller.dockerfile declare -r EXPORTER_VERSION="0.16.1" declare -r CLUSTER_ID=${CLUSTER_ID:-"$(randomString)"} @@ -52,7 +52,7 @@ function showHelp() { echo " ACCOUNT=skiptests set the github account for astraea repo" echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set controller JVM memory" echo " REVISION=trunk set revision of kafka source code to build container" - echo " VERSION=3.5.1 set version of kafka distribution" + echo " VERSION=3.7.0 set version of kafka distribution" echo " BUILD=false set true if you want to build image locally" echo " RUN=false set false if you want to build/pull image only" echo " META_FOLDER=/tmp/folder1 set host folder used by controller" diff --git a/docker/start_worker.sh b/docker/start_worker.sh index 96d76cdf15..a9d0282641 100755 --- a/docker/start_worker.sh +++ b/docker/start_worker.sh @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh # ===============================[global variables]=============================== declare -r ACCOUNT=${ACCOUNT:-skiptests} declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache} -declare -r VERSION=${REVISION:-${VERSION:-3.5.1}} +declare -r VERSION=${REVISION:-${VERSION:-3.7.0}} declare -r DOCKERFILE=$DOCKER_FOLDER/worker.dockerfile declare -r WORKER_PORT=${WORKER_PORT:-"$(getRandomPort)"} declare -r CONTAINER_NAME="worker-$WORKER_PORT" @@ -50,7 +50,7 @@ function showHelp() { echo " ACCOUNT=skiptests set the github account for astraea repo" echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set worker JVM memory" echo " REVISION=trunk set revision of kafka source code to build container" - echo " VERSION=3.5.1 set version of kafka distribution" + echo " VERSION=3.7.0 set version of kafka distribution" echo " BUILD=false set true if you want to build image locally" echo " RUN=false set false if you want to build/pull image only" echo " WORKER_PLUGIN_PATH=/tmp/worker-plugins set plugin path to kafka worker" diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 53db1cb544..9f447ae707 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -30,7 +30,7 @@ def versions = [ "jmh-core" : project.properties['jmh-core.version'] ?: "1.36", "jmh-generator-annprocess": project.properties['jmh-generator.version'] ?: "1.36", junit : project.properties['junit.version'] ?: "5.9.3", - kafka : project.properties['kafka.version'] ?: "3.5.1", + kafka : project.properties['kafka.version'] ?: "3.7.0", mockito : project.properties['mockito.version'] ?: "5.4.0", "mockito-inline" : project.properties['mockito.version'] ?: "5.2.0", scala : project.properties['scala.version'] ?: "2.13.11", @@ -57,6 +57,7 @@ libs += [ "kafka-connect-runtime" : "org.apache.kafka:connect-runtime:${versions["kafka"]}", "kafka-core" : "org.apache.kafka:kafka_2.13:${versions["kafka"]}", "kafka-server-common" : "org.apache.kafka:kafka-server-common:${versions["kafka"]}", + "kafka-metadata" : "org.apache.kafka:kafka-metadata:${versions["kafka"]}", "mockito-core" : "org.mockito:mockito-core:${versions["mockito"]}", "mockito-inline" : "org.mockito:mockito-inline:${versions["mockito-inline"]}", scala : "org.scala-lang:scala-library:${versions["scala"]}", diff --git a/it/build.gradle b/it/build.gradle index bd227622fa..8cc3ed23cc 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation libs["junit"] implementation libs["kafka-core"] implementation libs["kafka-server-common"] + implementation libs["kafka-metadata"] implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] implementation libs["hadoop-minicluster"] diff --git a/it/src/main/java/org/astraea/it/BrokerCluster.java b/it/src/main/java/org/astraea/it/BrokerCluster.java index ea3c4abf56..f56a4684b6 100644 --- a/it/src/main/java/org/astraea/it/BrokerCluster.java +++ b/it/src/main/java/org/astraea/it/BrokerCluster.java @@ -29,24 +29,32 @@ import java.util.stream.IntStream; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; -import kafka.server.MetaProperties; import kafka.server.Server; import kafka.tools.StorageTool; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesVersion; import org.apache.kafka.server.common.MetadataVersion; public interface BrokerCluster extends AutoCloseable { private static CompletableFuture> server( Map configs, Set folders, String clusterId, int nodeId) { + StorageTool.formatCommand( new PrintStream(new ByteArrayOutputStream()), scala.collection.JavaConverters.collectionAsScalaIterableConverter(folders) .asScala() .toSeq(), - new MetaProperties(clusterId, nodeId), - MetadataVersion.latest(), + new MetaProperties.Builder() + .setVersion(MetaPropertiesVersion.V1) + .setClusterId(clusterId) + .setNodeId(nodeId) + .setDirectoryId(DirectoryId.random()) + .build(), + MetadataVersion.latestProduction(), true); return CompletableFuture.supplyAsync(