From 98f1cd33fd5307233d975f01c3ed5468c40b077a Mon Sep 17 00:00:00 2001 From: kwall Date: Fri, 23 Feb 2024 13:09:44 +0000 Subject: [PATCH 1/4] Upgrade to Apache Kafka 3.7 why: the RC is close to being voted, so let's get ready for the release --- kafka-server/pom.xml | 4 ++++ .../com/ozangunalp/kafka/server/Storage.java | 4 ++-- pom.xml | 20 +++++++++++++++++-- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/kafka-server/pom.xml b/kafka-server/pom.xml index 16c2bd0..18b33d5 100644 --- a/kafka-server/pom.xml +++ b/kafka-server/pom.xml @@ -33,6 +33,10 @@ org.apache.kafka kafka_2.13 + + org.apache.kafka + kafka-metadata + org.apache.kafka kafka-server-common diff --git a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java index f3804ee..cd89bb4 100644 --- a/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java +++ b/kafka-server/src/main/java/com/ozangunalp/kafka/server/Storage.java @@ -15,7 +15,7 @@ import org.jboss.logging.Logger; import kafka.server.KafkaConfig; -import kafka.server.MetaProperties; +import org.apache.kafka.metadata.properties.MetaProperties; import kafka.tools.StorageTool; import scala.collection.immutable.Seq; import scala.jdk.CollectionConverters; @@ -58,7 +58,7 @@ public static void formatStorageFromConfig(KafkaConfig config, String clusterId, } public static void formatStorage(List directories, String clusterId, int nodeId, boolean ignoreFormatted) { - MetaProperties metaProperties = new MetaProperties(clusterId, nodeId); + MetaProperties metaProperties = new MetaProperties.Builder().setClusterId(clusterId).setNodeId(nodeId).build(); Seq dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq(); StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), dirs, metaProperties, MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted); diff --git a/pom.xml b/pom.xml index bd89a47..33d56bd 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ io.quarkus.platform 3.7.4 - 3.6.1 + 3.7.0 3.9.1 2.13.13 @@ -123,6 +123,11 @@ kafka-storage ${kafka.version} + + org.apache.kafka + kafka-metadata + ${kafka.version} + org.apache.kafka kafka-group-coordinator @@ -207,6 +212,17 @@ + + + + apache staging + https://repository.apache.org/content/groups/staging/ + + true + + + + @@ -436,4 +452,4 @@ - + \ No newline at end of file From ff2c602eccdfb869a78c5c10f9733bd8f539d273 Mon Sep 17 00:00:00 2001 From: kwall Date: Fri, 23 Feb 2024 17:03:07 +0000 Subject: [PATCH 2/4] Update substitutions to reflect changes in Kafka 3.7.0 why: * KAFKA-14481: Move LogSegment/LogSegments to storage module (#14529) * KAFKA-16013: Add metric for expiration rate of delayed remote fetch (#15014) Signed-off-by: kwall --- .../KafkaServerExtensionProcessor.java | 5 ++++- .../runtime/KafkaServerSubstitutions.java | 17 +---------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java b/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java index 7d3f0ea..68bfa7f 100644 --- a/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java +++ b/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java @@ -127,9 +127,12 @@ void build(BuildProducer reflectiveClass, "org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin")); producer.produce(new RuntimeInitializedClassBuildItem( "org.apache.kafka.common.security.kerberos.KerberosLogin")); - + producer.produce(new RuntimeInitializedClassBuildItem( + "org.apache.kafka.storage.internals.log.LogSegment")); + producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedFetchMetrics$")); producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedProduceMetrics$")); + producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedRemoteFetchMetrics$")); producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedDeleteRecordsMetrics$")); reflectiveClass.produce(ReflectiveClassBuildItem.builder(org.apache.kafka.common.metrics.JmxReporter.class).build()); diff --git a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java index ca9cb84..435e36d 100644 --- a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java +++ b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java @@ -18,7 +18,6 @@ import org.jboss.logmanager.LogContext; import com.oracle.svm.core.annotate.Alias; -import com.oracle.svm.core.annotate.Delete; import com.oracle.svm.core.annotate.InjectAccessors; import com.oracle.svm.core.annotate.Substitute; import com.oracle.svm.core.annotate.TargetClass; @@ -105,21 +104,7 @@ public long randomSegmentJitter() { } -@TargetClass(className = "kafka.log.LogFlushStats$") -final class Target_LogFlushStats { - - @Delete - private static Target_com_yammer_metrics_core_Timer logFlushTimer; - -} - -// Used in kafka.log.LogFlushStats.logFlushTimer -@TargetClass(className = "com.yammer.metrics.core.Timer") -final class Target_com_yammer_metrics_core_Timer { - -} - -@TargetClass(className = "kafka.log.LogSegment") +@TargetClass(className = "org.apache.kafka.storage.internals.log.LogSegment") final class Target_LogSegment { @Alias From 8ac9dce2f2c345b4a1089512cf0c5a4793767b8e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 26 Feb 2024 11:36:19 +0100 Subject: [PATCH 3/4] Remove static initializer from LogSegment class when building in native mode --- .../KafkaServerExtensionProcessor.java | 61 ++++++++++++++++--- .../runtime/KafkaServerSubstitutions.java | 9 +++ 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java b/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java index 68bfa7f..eb23723 100644 --- a/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java +++ b/quarkus-kafka-server-extension/deployment/src/main/java/com/ozangunalp/kafka/server/extension/deployment/KafkaServerExtensionProcessor.java @@ -1,6 +1,11 @@ package com.ozangunalp.kafka.server.extension.deployment; +import static org.objectweb.asm.Opcodes.INVOKESTATIC; +import static org.objectweb.asm.Opcodes.PUTSTATIC; +import static org.objectweb.asm.Opcodes.ACONST_NULL; + import java.util.Set; +import java.util.function.BiFunction; import javax.security.auth.spi.LoginModule; @@ -18,15 +23,18 @@ import org.apache.kafka.common.security.scram.internals.ScramSaslServerProvider; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.storage.internals.log.LogSegment; import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.objectweb.asm.ClassVisitor; import org.objectweb.asm.FieldVisitor; import org.objectweb.asm.MethodVisitor; import org.objectweb.asm.Opcodes; +import org.objectweb.asm.Type; import com.ozangunalp.kafka.server.extension.runtime.JsonPathConfigRecorder; import com.sun.security.auth.module.Krb5LoginModule; + import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; @@ -41,6 +49,7 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageSecurityProviderBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; +import io.quarkus.deployment.pkg.steps.NativeBuild; import io.quarkus.gizmo.Gizmo; class KafkaServerExtensionProcessor { @@ -106,7 +115,7 @@ FeatureBuildItem feature() { RunTimeConfigurationDefaultBuildItem deleteDirsOnClose() { return new RunTimeConfigurationDefaultBuildItem("server.delete-dirs-on-close", "true"); } - + @BuildStep void index(BuildProducer indexDependency) { indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "kafka_2.13")); @@ -117,7 +126,7 @@ void index(BuildProducer indexDependency) { indexDependency.produce(new IndexDependencyBuildItem("io.strimzi", "kafka-oauth-server-plain")); indexDependency.produce(new IndexDependencyBuildItem("io.strimzi", "kafka-oauth-client")); } - + @BuildStep void build(BuildProducer reflectiveClass, BuildProducer producer) { @@ -127,8 +136,6 @@ void build(BuildProducer reflectiveClass, "org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin")); producer.produce(new RuntimeInitializedClassBuildItem( "org.apache.kafka.common.security.kerberos.KerberosLogin")); - producer.produce(new RuntimeInitializedClassBuildItem( - "org.apache.kafka.storage.internals.log.LogSegment")); producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedFetchMetrics$")); producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedProduceMetrics$")); @@ -284,11 +291,8 @@ public void visitCode() { private static class CoreUtilsClassVisitor extends ClassVisitor { - private final String fqcn; - protected CoreUtilsClassVisitor(String fqcn, ClassVisitor classVisitor) { super(Gizmo.ASM_API_VERSION, classVisitor); - this.fqcn = fqcn; } @Override @@ -319,4 +323,47 @@ public void visitCode() { } } + + @BuildStep(onlyIf = { NativeBuild.class }) + void logSegmentStaticBlock(BuildProducer producer) { + producer.produce(new BytecodeTransformerBuildItem(LogSegment.class.getName(), LogSegmentStaticBlockRemover::new)); + } + + private class LogSegmentStaticBlockRemover extends ClassVisitor { + public LogSegmentStaticBlockRemover(String fqcn, ClassVisitor cv) { + super(Gizmo.ASM_API_VERSION, cv); + } + + // invoked for every method + @Override + public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) { + MethodVisitor visitor = super.visitMethod(access, name, desc, signature, exceptions); + if (visitor == null) { + return null; + } + if (name.equals("")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + // Load LogSegment class + visitLdcInsn(Type.getType(LogSegment.class)); + // Invoke LoggerFactory.getLogger + visitMethodInsn(INVOKESTATIC, "org/slf4j/LoggerFactory", "getLogger", "(Ljava/lang/Class;)Lorg/slf4j/Logger;", false); + // Store the result in the LOGGER field + visitFieldInsn(PUTSTATIC, Type.getInternalName(LogSegment.class), "LOGGER", "Lorg/slf4j/Logger;"); + // Load null onto the stack + visitInsn(ACONST_NULL); + // Store null into LOG_FLUSH_TIMER field + visitFieldInsn(PUTSTATIC, Type.getInternalName(LogSegment.class), "LOG_FLUSH_TIMER", "Lcom/yammer/metrics/core/Timer;"); + // Continue with the original static block code + mv.visitInsn(Opcodes.RETURN);// our new code + } + }; + } + visitor.visitMaxs(0, 0); + return visitor; + } + + } + } diff --git a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java index 435e36d..58a6e71 100644 --- a/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java +++ b/quarkus-kafka-server-extension/runtime/src/main/java/com/ozangunalp/kafka/server/extension/runtime/KafkaServerSubstitutions.java @@ -18,6 +18,7 @@ import org.jboss.logmanager.LogContext; import com.oracle.svm.core.annotate.Alias; +import com.oracle.svm.core.annotate.Delete; import com.oracle.svm.core.annotate.InjectAccessors; import com.oracle.svm.core.annotate.Substitute; import com.oracle.svm.core.annotate.TargetClass; @@ -104,9 +105,17 @@ public long randomSegmentJitter() { } +@TargetClass(className = "com.yammer.metrics.core.Timer") +final class Target_com_yammer_metrics_core_Timer { + +} + @TargetClass(className = "org.apache.kafka.storage.internals.log.LogSegment") final class Target_LogSegment { + @Delete + static Target_com_yammer_metrics_core_Timer LOG_FLUSH_TIMER; + @Alias FileRecords log; From 2d650e3e72f411ceeefe3fd7c037e825773656cf Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 27 Feb 2024 07:58:57 +0000 Subject: [PATCH 4/4] Remove repository override --- pom.xml | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 33d56bd..c38b5da 100644 --- a/pom.xml +++ b/pom.xml @@ -212,17 +212,6 @@ - - - - apache staging - https://repository.apache.org/content/groups/staging/ - - true - - - - @@ -452,4 +441,4 @@ - \ No newline at end of file +