diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 53db1cb544..9f447ae707 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -30,7 +30,7 @@ def versions = [ "jmh-core" : project.properties['jmh-core.version'] ?: "1.36", "jmh-generator-annprocess": project.properties['jmh-generator.version'] ?: "1.36", junit : project.properties['junit.version'] ?: "5.9.3", - kafka : project.properties['kafka.version'] ?: "3.5.1", + kafka : project.properties['kafka.version'] ?: "3.7.0", mockito : project.properties['mockito.version'] ?: "5.4.0", "mockito-inline" : project.properties['mockito.version'] ?: "5.2.0", scala : project.properties['scala.version'] ?: "2.13.11", @@ -57,6 +57,7 @@ libs += [ "kafka-connect-runtime" : "org.apache.kafka:connect-runtime:${versions["kafka"]}", "kafka-core" : "org.apache.kafka:kafka_2.13:${versions["kafka"]}", "kafka-server-common" : "org.apache.kafka:kafka-server-common:${versions["kafka"]}", + "kafka-metadata" : "org.apache.kafka:kafka-metadata:${versions["kafka"]}", "mockito-core" : "org.mockito:mockito-core:${versions["mockito"]}", "mockito-inline" : "org.mockito:mockito-inline:${versions["mockito-inline"]}", scala : "org.scala-lang:scala-library:${versions["scala"]}", diff --git a/it/build.gradle b/it/build.gradle index bd227622fa..8cc3ed23cc 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation libs["junit"] implementation libs["kafka-core"] implementation libs["kafka-server-common"] + implementation libs["kafka-metadata"] implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] implementation libs["hadoop-minicluster"] diff --git a/it/src/main/java/org/astraea/it/BrokerCluster.java b/it/src/main/java/org/astraea/it/BrokerCluster.java index ea3c4abf56..f56a4684b6 100644 --- a/it/src/main/java/org/astraea/it/BrokerCluster.java +++ b/it/src/main/java/org/astraea/it/BrokerCluster.java @@ -29,24 +29,32 @@ import java.util.stream.IntStream; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; -import kafka.server.MetaProperties; import kafka.server.Server; import kafka.tools.StorageTool; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesVersion; import org.apache.kafka.server.common.MetadataVersion; public interface BrokerCluster extends AutoCloseable { private static CompletableFuture> server( Map configs, Set folders, String clusterId, int nodeId) { + StorageTool.formatCommand( new PrintStream(new ByteArrayOutputStream()), scala.collection.JavaConverters.collectionAsScalaIterableConverter(folders) .asScala() .toSeq(), - new MetaProperties(clusterId, nodeId), - MetadataVersion.latest(), + new MetaProperties.Builder() + .setVersion(MetaPropertiesVersion.V1) + .setClusterId(clusterId) + .setNodeId(nodeId) + .setDirectoryId(DirectoryId.random()) + .build(), + MetadataVersion.latestProduction(), true); return CompletableFuture.supplyAsync(