Skip to content

Commit

Permalink
Merge pull request #148 from k-wall/kafka-3.7-test-fire
Browse files Browse the repository at this point in the history
Upgrade to Apache Kafka 3.7.0
  • Loading branch information
ozangunalp authored Feb 27, 2024
2 parents eeee886 + 2d650e3 commit 231682b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 19 deletions.
4 changes: 4 additions & 0 deletions kafka-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-metadata</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.jboss.logging.Logger;

import kafka.server.KafkaConfig;
import kafka.server.MetaProperties;
import org.apache.kafka.metadata.properties.MetaProperties;
import kafka.tools.StorageTool;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters;
Expand Down Expand Up @@ -58,7 +58,7 @@ public static void formatStorageFromConfig(KafkaConfig config, String clusterId,
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
MetaProperties metaProperties = new MetaProperties(clusterId, nodeId);
MetaProperties metaProperties = new MetaProperties.Builder().setClusterId(clusterId).setNodeId(nodeId).build();
Seq<String> dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq();
StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), dirs, metaProperties,
MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted);
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.7.4</quarkus.platform.version>

<kafka.version>3.6.1</kafka.version>
<kafka.version>3.7.0</kafka.version>
<zookeeper.version>3.9.1</zookeeper.version>
<scala.version>2.13.13</scala.version>

Expand Down Expand Up @@ -123,6 +123,11 @@
<artifactId>kafka-storage</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-metadata</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-group-coordinator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.ozangunalp.kafka.server.extension.deployment;

import static org.objectweb.asm.Opcodes.INVOKESTATIC;
import static org.objectweb.asm.Opcodes.PUTSTATIC;
import static org.objectweb.asm.Opcodes.ACONST_NULL;

import java.util.Set;
import java.util.function.BiFunction;

import javax.security.auth.spi.LoginModule;

Expand All @@ -18,15 +23,18 @@
import org.apache.kafka.common.security.scram.internals.ScramSaslServerProvider;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.FieldVisitor;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Opcodes;
import org.objectweb.asm.Type;

import com.ozangunalp.kafka.server.extension.runtime.JsonPathConfigRecorder;
import com.sun.security.auth.module.Krb5LoginModule;

import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
Expand All @@ -41,6 +49,7 @@
import io.quarkus.deployment.builditem.nativeimage.NativeImageSecurityProviderBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.pkg.steps.NativeBuild;
import io.quarkus.gizmo.Gizmo;

class KafkaServerExtensionProcessor {
Expand Down Expand Up @@ -106,7 +115,7 @@ FeatureBuildItem feature() {
RunTimeConfigurationDefaultBuildItem deleteDirsOnClose() {
return new RunTimeConfigurationDefaultBuildItem("server.delete-dirs-on-close", "true");
}

@BuildStep
void index(BuildProducer<IndexDependencyBuildItem> indexDependency) {
indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "kafka_2.13"));
Expand All @@ -117,7 +126,7 @@ void index(BuildProducer<IndexDependencyBuildItem> indexDependency) {
indexDependency.produce(new IndexDependencyBuildItem("io.strimzi", "kafka-oauth-server-plain"));
indexDependency.produce(new IndexDependencyBuildItem("io.strimzi", "kafka-oauth-client"));
}

@BuildStep
void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<RuntimeInitializedClassBuildItem> producer) {
Expand All @@ -127,9 +136,10 @@ void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
"org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin"));
producer.produce(new RuntimeInitializedClassBuildItem(
"org.apache.kafka.common.security.kerberos.KerberosLogin"));

producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedFetchMetrics$"));
producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedProduceMetrics$"));
producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedRemoteFetchMetrics$"));
producer.produce(new RuntimeInitializedClassBuildItem("kafka.server.DelayedDeleteRecordsMetrics$"));

reflectiveClass.produce(ReflectiveClassBuildItem.builder(org.apache.kafka.common.metrics.JmxReporter.class).build());
Expand Down Expand Up @@ -281,11 +291,8 @@ public void visitCode() {

private static class CoreUtilsClassVisitor extends ClassVisitor {

private final String fqcn;

protected CoreUtilsClassVisitor(String fqcn, ClassVisitor classVisitor) {
super(Gizmo.ASM_API_VERSION, classVisitor);
this.fqcn = fqcn;
}

@Override
Expand Down Expand Up @@ -316,4 +323,47 @@ public void visitCode() {
}

}

@BuildStep(onlyIf = { NativeBuild.class })
void logSegmentStaticBlock(BuildProducer<BytecodeTransformerBuildItem> producer) {
producer.produce(new BytecodeTransformerBuildItem(LogSegment.class.getName(), LogSegmentStaticBlockRemover::new));
}

private class LogSegmentStaticBlockRemover extends ClassVisitor {
public LogSegmentStaticBlockRemover(String fqcn, ClassVisitor cv) {
super(Gizmo.ASM_API_VERSION, cv);
}

// invoked for every method
@Override
public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
MethodVisitor visitor = super.visitMethod(access, name, desc, signature, exceptions);
if (visitor == null) {
return null;
}
if (name.equals("<clinit>")) {
return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) {
@Override
public void visitCode() {
// Load LogSegment class
visitLdcInsn(Type.getType(LogSegment.class));
// Invoke LoggerFactory.getLogger
visitMethodInsn(INVOKESTATIC, "org/slf4j/LoggerFactory", "getLogger", "(Ljava/lang/Class;)Lorg/slf4j/Logger;", false);
// Store the result in the LOGGER field
visitFieldInsn(PUTSTATIC, Type.getInternalName(LogSegment.class), "LOGGER", "Lorg/slf4j/Logger;");
// Load null onto the stack
visitInsn(ACONST_NULL);
// Store null into LOG_FLUSH_TIMER field
visitFieldInsn(PUTSTATIC, Type.getInternalName(LogSegment.class), "LOG_FLUSH_TIMER", "Lcom/yammer/metrics/core/Timer;");
// Continue with the original static block code
mv.visitInsn(Opcodes.RETURN);// our new code
}
};
}
visitor.visitMaxs(0, 0);
return visitor;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,17 @@ public long randomSegmentJitter() {

}

@TargetClass(className = "kafka.log.LogFlushStats$")
final class Target_LogFlushStats {

@Delete
private static Target_com_yammer_metrics_core_Timer logFlushTimer;

}

// Used in kafka.log.LogFlushStats.logFlushTimer
@TargetClass(className = "com.yammer.metrics.core.Timer")
final class Target_com_yammer_metrics_core_Timer {

}

@TargetClass(className = "kafka.log.LogSegment")
@TargetClass(className = "org.apache.kafka.storage.internals.log.LogSegment")
final class Target_LogSegment {

@Delete
static Target_com_yammer_metrics_core_Timer LOG_FLUSH_TIMER;

@Alias
FileRecords log;

Expand Down

0 comments on commit 231682b

Please sign in to comment.