Skip to content

Commit

Permalink
Fix #181: default the metadata version from the IBP version in the sa…
Browse files Browse the repository at this point in the history
…me manner as StorageTool

Signed-off-by: kwall <[email protected]>
  • Loading branch information
k-wall committed Jun 1, 2024
1 parent 05025ea commit 76aecc8
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import com.github.dockerjava.api.command.InspectContainerResponse;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.util.regex.Pattern.quote;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -107,9 +103,9 @@ void testFixedPortContainer() {
}

@Test
void testSaslContainer() {
void testSaslPlainContainer() {
try (var container = createKafkaNativeContainer()
.withServerProperties(MountableFile.forClasspathResource("sasl_plaintext.properties"))
.withServerProperties(MountableFile.forClasspathResource("sasl_plain_plaintext.properties"))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
container.start();
checkProduceConsume(container, Map.of(
Expand All @@ -118,6 +114,18 @@ void testSaslContainer() {
SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"client\" password=\"client-secret\";"));
}
}
@Test
void testSaslScramContainer() {
try (var container = createKafkaNativeContainer()
.withServerProperties(MountableFile.forClasspathResource("sasl_scram_plaintext.properties"))
.withAdvertisedListeners(c -> String.format("SASL_PLAINTEXT://%s:%s", c.getHost(), c.getExposedKafkaPort()))) {
container.start();
checkProduceConsume(container, Map.of(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"client\" password=\"client-secret\";"));
}
}

@Test
void testSslContainer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
sasl.enabled.mechanisms=SCRAM-SHA-512

listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="broker" \
password="broker-secret";
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.ozangunalp.kafka.server;

import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

import java.io.Closeable;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -17,6 +19,7 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.jboss.logging.Logger;

import kafka.cluster.EndPoint;
Expand Down Expand Up @@ -202,7 +205,9 @@ public synchronized EmbeddedKafkaBroker start() {
if (zkMode) {
server = new KafkaServer(config, Time.SYSTEM, Option.apply(KAFKA_PREFIX), false);
} else {
Storage.formatStorageFromConfig(config, clusterId, true);
// Default the metadata version from the IBP version in the same way as kafka.tools.StorageTool.
var metadataVersion = MetadataVersion.fromVersionString(brokerConfig.getProperty(KafkaConfig.InterBrokerProtocolVersionProp(), MetadataVersion.LATEST_PRODUCTION.version()));
Storage.formatStorageFromConfig(config, clusterId, true, metadataVersion);
server = new KafkaRaftServer(config, Time.SYSTEM);
}
server.startup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.unchecked.Unchecked;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.Locale;

import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

@ApplicationScoped
public class Startup {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package com.ozangunalp.kafka.server;

import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.server.common.MetadataVersion;
import org.jboss.logging.Logger;

import kafka.server.KafkaConfig;
import org.apache.kafka.metadata.properties.MetaProperties;
import kafka.tools.StorageTool;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters;

public final class Storage {

Expand Down Expand Up @@ -50,18 +47,11 @@ public static void createAndSetLogDir(Properties properties) {
}
}

public static void formatStorageFromConfig(KafkaConfig config, String clusterId, boolean ignoreFormatted) {
public static void formatStorageFromConfig(KafkaConfig config, String clusterId, boolean ignoreFormatted, MetadataVersion metadataVersion) {
Seq<String> directories = StorageTool.configToLogDirectories(config);
MetaProperties metaProperties = StorageTool.buildMetadataProperties(clusterId, config);
StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), directories, metaProperties,
MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted);
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
MetaProperties metaProperties = new MetaProperties.Builder().setClusterId(clusterId).setNodeId(nodeId).build();
Seq<String> dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq();
StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), dirs, metaProperties,
MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted);
StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), directories, metaProperties,
metadataVersion, ignoreFormatted);
}

public static class LoggingOutputStream extends java.io.OutputStream {
Expand Down

0 comments on commit 76aecc8

Please sign in to comment.