From 4d4d1e364224767733402b98ae77a96fd8d4f077 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 1 Feb 2024 16:46:54 -0500 Subject: [PATCH 1/4] Obtain credentials from request when missing configuration Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 236 ++++++++++++++---- .../console/api/support/KafkaContext.java | 7 + .../support/TrustAllCertificateManager.java | 21 +- api/src/main/resources/application.properties | 3 + .../console/api/KafkaClustersResourceIT.java | 4 +- .../deployment/StrimziCrdResourceManager.java | 70 +++--- .../console/config/KafkaClusterConfig.java | 10 +- 7 files changed, 260 insertions(+), 91 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 2e2893eeb..e365c7c28 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -4,6 +4,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Path; +import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -30,7 +31,9 @@ import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import jakarta.inject.Named; +import jakarta.ws.rs.NotAuthorizedException; import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.UriInfo; import org.apache.kafka.clients.CommonClientConfigs; @@ -44,6 +47,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; @@ -63,7 +69,11 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; import io.strimzi.api.kafka.model.kafka.KafkaStatus; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthentication; import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; /** @@ -82,6 +92,20 @@ @ApplicationScoped public class ClientFactory { + private static final String BEARER = "Bearer "; + private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; + private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; + private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() + + " required" + + " oauth.access.token=\"%s\" ;"; + + private static final String BASIC = "Basic "; + private static final String SCRAM_SHA512 = "SCRAM-SHA-512"; + private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName() + + " required" + + " username=\"%s\"" + + " password=\"%s\" ;"; + static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; private final Function noSuchKafka = clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName)); @@ -108,6 +132,9 @@ public class ClientFactory { @Inject Instance trustManager; + @Inject + HttpHeaders headers; + @Inject UriInfo requestUri; @@ -275,7 +302,7 @@ void putKafkaContext(Map contexts, Admin admin = null; - if (establishGlobalConnection(clusterConfig, adminConfigs)) { + if (establishGlobalConnection(adminConfigs)) { admin = adminBuilder.apply(adminConfigs); } @@ -309,7 +336,7 @@ void putKafkaContext(Map contexts, } Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { - return kafkaInformer.map(SharedIndexInformer::getStore) + return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> { String key = clusterConfig.clusterKey(); Kafka resource = store.getByKey(key); @@ -317,7 +344,7 @@ Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key); } return resource; - }); + }) : Optional.empty(); } Map requiredAdminConfig() { @@ -349,7 +376,7 @@ Map requiredProducerConfig() { return configs; } - static boolean establishGlobalConnection(KafkaClusterConfig clusterConfig, Map configs) { + static boolean establishGlobalConnection(Map configs) { if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { return false; } @@ -405,21 +432,30 @@ public KafkaContext produceKafkaContext(Map contexts, return Optional.ofNullable(contexts.get(clusterId)) .map(ctx -> { - Admin admin = Optional.ofNullable(ctx.admin()) - /* - * Admin may be null if credentials were not given - * in the configuration. The user must provide the - * login secrets in the request in that case. - */ - .orElseGet(() -> adminBuilder.apply(ctx.configs(Admin.class))); - return new KafkaContext(ctx, filter.apply(admin)); + if (ctx.admin() == null) { + /* + * Admin may be null if credentials were not given in the + * configuration. The user must provide the login secrets + * in the request in that case. + */ + var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class)); + var admin = adminBuilder.apply(adminConfigs); + return new KafkaContext(ctx, filter.apply(admin)); + } + + return ctx; }) .orElseThrow(() -> noSuchKafka.apply(clusterId)); } public void disposeKafkaContext(@Disposes KafkaContext context, Map contexts) { if (!contexts.values().contains(context)) { - log.infof("Closing out-of-date KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource())); + var clusterKey = context.clusterConfig().clusterKey(); + if (context.applicationScoped()) { + log.infof("Closing out-of-date KafkaContext: %s", clusterKey); + } else { + log.debugf("Closing request-scoped KafkaContext: %s", clusterKey); + } context.close(); } } @@ -427,8 +463,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = context.configs(Consumer.class); - Consumer client = new KafkaConsumer<>(configs); + var configs = maybeAuthenticate(context.configs(Consumer.class)); + Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer return () -> client; } @@ -439,8 +475,8 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = context.configs(Producer.class); - Producer client = new KafkaProducer<>(configs); + var configs = maybeAuthenticate(context.configs(Producer.class)); + Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer return () -> client; } @@ -448,6 +484,16 @@ public void producerDisposer(@Disposes Supplier> produc producer.get().close(); } + Map maybeAuthenticate(Map configs) { + if (configs.containsKey(SaslConfigs.SASL_MECHANISM) + && !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + configs = new HashMap<>(configs); + configureAuthentication(configs); + } + + return configs; + } + Map buildConfig(Set configNames, KafkaClusterConfig config, String clientType, @@ -465,49 +511,43 @@ Map buildConfig(Set configNames, .map(Optional::get) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); - if (!cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { - cluster.map(Kafka::getStatus) + var listenerSpec = cluster.map(Kafka::getSpec) + .map(KafkaSpec::getKafka) + .map(KafkaClusterSpec::getListeners) + .map(Collection::stream) + .orElseGet(Stream::empty) + .filter(listener -> listener.getName().equals(config.getListener())) + .findFirst(); + + var listenerStatus = cluster.map(Kafka::getStatus) .map(KafkaStatus::getListeners) .map(Collection::stream) .orElseGet(Stream::empty) .filter(listener -> listener.getName().equals(config.getListener())) - .map(ListenerStatus::getBootstrapServers) - .findFirst() - .ifPresent(bootstrapServers -> cfg.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); - } + .findFirst(); + + listenerSpec.ifPresent(listener -> applyListenerConfiguration(cfg, listener)); + + listenerStatus.map(ListenerStatus::getBootstrapServers) + .ifPresent(bootstrap -> cfg.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap)); + + listenerStatus.map(ListenerStatus::getCertificates) + .filter(Objects::nonNull) + .filter(Predicate.not(Collection::isEmpty)) + .map(certificates -> String.join("\n", certificates).trim()) + .ifPresent(certificates -> { + cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates); + }); if (truststoreRequired(cfg)) { - if (trustManager.isResolvable()) { + if (cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) && trustManager.isResolvable()) { trustManager.get().trustClusterCertificate(cfg); } else { - cluster.map(Kafka::getStatus) - .map(KafkaStatus::getListeners) - .map(Collection::stream) - .orElseGet(Stream::empty) - .filter(listener -> { - if (listener.getName().equals(config.getListener())) { - return true; - } - - return cfg.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "") - .toString() - .contains(listener.getBootstrapServers()); - }) - .map(ListenerStatus::getCertificates) - .filter(Objects::nonNull) - .filter(Predicate.not(Collection::isEmpty)) - .findFirst() - .ifPresent(certificates -> { - cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); - cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, String.join("\n", certificates).trim()); - }); - - if (truststoreRequired(cfg)) { - log.warnf(""" - Failed to set configuration for %s client to Kafka cluster %s. Connection \ - requires truststore which could not be obtained from the Kafka resource status.""" - .formatted(clientType, config.clusterKey())); - } + log.warnf(""" + Failed to set configuration for %s client to Kafka cluster %s. Connection \ + requires truststore which could not be obtained from the Kafka resource status.""" + .formatted(clientType, config.clusterKey())); } } @@ -522,6 +562,42 @@ Map buildConfig(Set configNames, return cfg; } + private void applyListenerConfiguration(Map cfg, GenericKafkaListener listener) { + var authType = Optional.ofNullable(listener.getAuth()) + .map(KafkaListenerAuthentication::getType) + .orElse(""); + boolean saslEnabled; + + switch (authType) { + case "oauth": + saslEnabled = true; + cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, OAUTHBEARER); + cfg.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK); + break; + case "scram-sha-512": + cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, SCRAM_SHA512); + saslEnabled = true; + break; + default: + saslEnabled = false; + break; + } + + StringBuilder protocol = new StringBuilder(); + + if (saslEnabled) { + protocol.append("SASL_"); + } + + if (listener.isTls()) { + protocol.append(SecurityProtocol.SSL.name); + } else { + protocol.append(SecurityProtocol.PLAINTEXT.name); + } + + cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + } + private void resolveValues(Map properties) { properties.entrySet().forEach(entry -> entry.setValue(resolveValue(entry.getValue()))); @@ -589,6 +665,62 @@ void logConfig(String clientType, Map config) { } } + void configureAuthentication(Map configs) { + switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) { + case OAUTHBEARER: + configureOAuthBearer(configs); + break; + case SCRAM_SHA512: + configureScram(configs); + break; + default: + throw new NotAuthorizedException("Unknown"); + } + } + + void configureOAuthBearer(Map configs) { + log.trace("SASL/OAUTHBEARER enabled"); + + configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK); + // Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin) + // May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS. + configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0"); + + String jaasConfig = getAuthorization(BEARER) + .map(SASL_OAUTH_CONFIG_TEMPLATE::formatted) + .orElseThrow(() -> new NotAuthorizedException(BEARER.trim())); + + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + } + + void configureScram(Map configs) { + log.trace("SASL/SCRAM enabled"); + + String jaasConfig = getBasicAuthentication() + .map(SASL_SCRAM_CONFIG_TEMPLATE::formatted) + .orElseThrow(() -> new NotAuthorizedException(BASIC.trim())); + + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + } + + Optional getBasicAuthentication() { + return getAuthorization(BASIC) + .map(Base64.getDecoder()::decode) + .map(String::new) + .filter(authn -> authn.indexOf(':') >= 0) + .map(authn -> new String[] { + authn.substring(0, authn.indexOf(':')), + authn.substring(authn.indexOf(':') + 1) + }) + .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty()); + } + + Optional getAuthorization(String scheme) { + return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION)) + .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length())) + .map(header -> header.substring(scheme.length())); + } + private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)"); private static final Pattern EMBEDDED_STRING = Pattern.compile("\"[^\"]*\""); } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 7226bb34d..48601ee9c 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -19,16 +19,19 @@ public class KafkaContext implements Closeable { final Kafka resource; final Map, Map> configs; final Admin admin; + boolean applicationScoped; public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map, Map> configs, Admin admin) { this.clusterConfig = clusterConfig; this.resource = resource; this.configs = Map.copyOf(configs); this.admin = admin; + this.applicationScoped = true; } public KafkaContext(KafkaContext other, Admin admin) { this(other.clusterConfig, other.resource, other.configs, admin); + this.applicationScoped = false; } @Override @@ -75,4 +78,8 @@ public Map configs(Class type) { public Admin admin() { return admin; } + + public boolean applicationScoped() { + return applicationScoped; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java b/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java index 3e13eb08a..112c967b1 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java @@ -7,6 +7,7 @@ import java.security.cert.X509Certificate; import java.util.Base64; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -32,14 +33,26 @@ public class TrustAllCertificateManager implements X509TrustManager { @Inject Logger log; + Map trustedCertificates = new ConcurrentHashMap<>(); + public void trustClusterCertificate(Map cfg) { + String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + String trusted = trustedCertificates.computeIfAbsent(bootstrap, this::loadCertificates); + + if (trusted != null) { + cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, trusted); + cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + } + } + + private String loadCertificates(String bootstrap) { TrustManager[] trustAllCerts = {this}; + String certificates = null; try { SSLContext sc = SSLContext.getInstance("TLSv1.2"); sc.init(null, trustAllCerts, new SecureRandom()); SSLSocketFactory factory = sc.getSocketFactory(); - String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); String[] hostport = bootstrap.split(",")[0].split(":"); ByteArrayOutputStream certificateOut = new ByteArrayOutputStream(); @@ -52,13 +65,13 @@ public void trustClusterCertificate(Map cfg) { } } - cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, - new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim()); - cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + certificates = new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim(); log.warnf("Certificate hosted at %s:%s is automatically trusted", hostport[0], hostport[1]); } catch (Exception e) { log.infof("Exception setting up trusted certificate: %s", e.getMessage()); } + + return certificates; } public X509Certificate[] getAcceptedIssuers() { diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 6f2baaade..c70fec245 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -63,8 +63,11 @@ console.kafka.admin.default.api.timeout.ms=10000 ######## #%dev.quarkus.http.auth.proactive=false #%dev.quarkus.http.auth.permission."oidc".policy=permit +%dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG +#%dev.quarkus.log.category."io.vertx.core.http".level=DEBUG +#%dev.quarkus.log.category."io.netty".level=DEBUG ######## %testplain.quarkus.devservices.enabled=true diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index ece32151f..879f392ab 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -56,7 +56,6 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; -import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuthBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512Builder; import io.strimzi.test.container.StrimziKafkaContainer; @@ -543,8 +542,7 @@ void testDescribeClusterWithCertificates() { * Create a Kafka CR with OAuth that proxies to kafka1. * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL */ - Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, - new KafkaListenerAuthenticationOAuthBuilder().build())) + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) .editStatus() .editMatchingListener(l -> "listener0".equals(l.getName())) .addToCertificates(""" diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java index 6d1a7be66..2f7d0d625 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientBuilder; @@ -30,13 +31,29 @@ public void setIntegrationTestContext(DevServicesContext context) { devConfig = context.devServicesProperties(); } + Optional get(String key, Function mapper) { + return Optional.ofNullable(devConfig.get(PREFIX + key)) + .map(mapper); + } + Optional get(String key) { return get(key, Function.identity()); } - Optional get(String key, Function mapper) { - return Optional.ofNullable(devConfig.get(PREFIX + key)) - .map(mapper); + T get(String key, Function mapper, Supplier defaultValue) { + return get(key, mapper).orElseGet(defaultValue); + } + + String get(String key, Supplier defaultValue) { + return get(key, Function.identity()) + .orElseGet(defaultValue); + } + + Integer durationMs(String key, Supplier defaultValue) { + return get(key, Duration::parse) + .map(Duration::toMillis) + .map(Integer.class::cast) + .orElseGet(defaultValue); } @Override @@ -45,31 +62,28 @@ public Map start() { var k8s = new KubernetesClientBuilder() .editOrNewConfig() - .withTrustCerts(get("trust-certs", Boolean::parseBoolean).orElseGet(base::isTrustCerts)) - .withWatchReconnectLimit(get("watch-reconnect-limit", Integer::parseInt).orElseGet(base::getWatchReconnectLimit)) - .withWatchReconnectInterval((int) get("watch-reconnect-interval", Duration::parse) - .orElse(Duration.ofMillis(base.getWatchReconnectInterval())).toMillis()) - .withConnectionTimeout((int) get("connection-timeout", Duration::parse) - .orElse(Duration.ofMillis(base.getConnectionTimeout())).toMillis()) - .withRequestTimeout((int) get("request-timeout", Duration::parse) - .orElse(Duration.ofMillis(base.getRequestTimeout())).toMillis()) - .withMasterUrl(get("api-server-url").or(() -> get("master-url")).orElse(base.getMasterUrl())) - .withNamespace(get("namespace").orElseGet(base::getNamespace)) - .withUsername(get("username").orElse(base.getUsername())) - .withPassword(get("password").orElse(base.getPassword())) - .withCaCertFile(get("ca-cert-file").orElse(base.getCaCertFile())) - .withCaCertData(get("ca-cert-data").orElse(base.getCaCertData())) - .withClientCertFile(get("client-cert-file").orElse(base.getClientCertFile())) - .withClientCertData(get("client-cert-data").orElse(base.getClientCertData())) - .withClientKeyFile(get("client-key-file").orElse(base.getClientKeyFile())) - .withClientKeyData(get("client-key-data").orElse(base.getClientKeyData())) - .withClientKeyPassphrase(get("client-key-passphrase").orElse(base.getClientKeyPassphrase())) - .withClientKeyAlgo(get("client-key-algo").orElse(base.getClientKeyAlgo())) - .withHttpProxy(get("http-proxy").orElse(base.getHttpProxy())) - .withHttpsProxy(get("https-proxy").orElse(base.getHttpsProxy())) - .withProxyUsername(get("proxy-username").orElse(base.getProxyUsername())) - .withProxyPassword(get("proxy-password").orElse(base.getProxyPassword())) - .withNoProxy(get("no-proxy", s -> s.split(",")).orElse(base.getNoProxy())) + .withTrustCerts(get("trust-certs", Boolean::parseBoolean, base::isTrustCerts)) + .withWatchReconnectLimit(get("watch-reconnect-limit", Integer::parseInt, base::getWatchReconnectLimit)) + .withWatchReconnectInterval(durationMs("watch-reconnect-interval", base::getWatchReconnectInterval)) + .withConnectionTimeout(durationMs("connection-timeout", base::getConnectionTimeout)) + .withRequestTimeout(durationMs("request-timeout", base::getRequestTimeout)) + .withMasterUrl(get("api-server-url").or(() -> get("master-url")).orElseGet(base::getMasterUrl)) + .withNamespace(get("namespace", base::getNamespace)) + .withUsername(get("username", base::getUsername)) + .withPassword(get("password", base::getPassword)) + .withCaCertFile(get("ca-cert-file", base::getCaCertFile)) + .withCaCertData(get("ca-cert-data", base::getCaCertData)) + .withClientCertFile(get("client-cert-file", base::getClientCertFile)) + .withClientCertData(get("client-cert-data", base::getClientCertData)) + .withClientKeyFile(get("client-key-file", base::getClientKeyFile)) + .withClientKeyData(get("client-key-data", base::getClientKeyData)) + .withClientKeyPassphrase(get("client-key-passphrase", base::getClientKeyPassphrase)) + .withClientKeyAlgo(get("client-key-algo", base::getClientKeyAlgo)) + .withHttpProxy(get("http-proxy", base::getHttpProxy)) + .withHttpsProxy(get("https-proxy", base::getHttpsProxy)) + .withProxyUsername(get("proxy-username", base::getProxyUsername)) + .withProxyPassword(get("proxy-password", base::getProxyPassword)) + .withNoProxy(get("no-proxy", s -> s.split(",")).orElseGet(base::getNoProxy)) .endConfig() .build(); diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index 6e91a6ddd..1f7f058fd 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -18,10 +18,12 @@ public class KafkaClusterConfig { @JsonIgnore public String clusterKey() { - if (namespace == null || namespace.isBlank()) { - return name; - } - return "%s/%s".formatted(namespace, name); + return hasNamespace() ? "%s/%s".formatted(namespace, name) : name; + } + + @JsonIgnore + public boolean hasNamespace() { + return namespace != null && !namespace.isBlank(); } public String getId() { From 7c016e9a824994bf726dc271a68bdae54e021712 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 31 Jul 2024 17:35:08 -0400 Subject: [PATCH 2/4] Return auth type in Kafka meta, treat PLAIN like SCRAM-SHA Signed-off-by: Michael Edgar --- api/README.md | 5 +- .../streamshub/console/api/ClientFactory.java | 75 ++++--- .../console/api/model/KafkaCluster.java | 203 +++++++++--------- .../console/api/model/RelatableResource.java | 46 +--- .../console/api/model/Resource.java | 12 +- .../console/api/service/BrokerService.java | 2 +- .../api/service/KafkaClusterService.java | 56 +++-- .../console/api/support/KafkaContext.java | 29 +++ .../console/api/support/OASModelFilter.java | 2 +- api/src/main/resources/application.properties | 3 - .../api/KafkaClustersResourceNoK8sIT.java | 6 +- .../systemtest/TestPlainNoK8sProfile.java | 6 +- common/pom.xml | 8 +- .../console/config/KafkaClusterConfig.java | 3 + .../console/config/KafkaConfig.java | 8 + console-config-example.yaml | 10 +- .../api/v1alpha1/spec/KafkaCluster.java | 11 +- 17 files changed, 253 insertions(+), 232 deletions(-) diff --git a/api/README.md b/api/README.md index b28e76746..140151e7b 100644 --- a/api/README.md +++ b/api/README.md @@ -40,10 +40,11 @@ create instances of both Prometheus and Kafka. ### Start Console API in Development Mode -Start the API in development mode from the repository root directory. +Start the API in development mode from the repository root directory. Ensure that the config-path given points to a +valid `console-config.yaml`. See [console-config-example.yaml](../console-config-example.yaml) for an example. ```shell -mvn -am -pl api quarkus:dev +mvn -am -pl api quarkus:dev -Dconsole.config-path=$(pwd)/console-config.yaml ``` ### Using the Instance API diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index e365c7c28..251f75d21 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -62,6 +63,7 @@ import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; +import com.github.streamshub.console.api.support.ValidationProxy; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; @@ -92,19 +94,21 @@ @ApplicationScoped public class ClientFactory { + public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; + public static final String PLAIN = "PLAIN"; + public static final String SCRAM_SHA256 = "SCRAM-SHA-256"; + public static final String SCRAM_SHA512 = "SCRAM-SHA-512"; + private static final String BEARER = "Bearer "; - private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() + " required" + " oauth.access.token=\"%s\" ;"; private static final String BASIC = "Basic "; - private static final String SCRAM_SHA512 = "SCRAM-SHA-512"; - private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName() - + " required" - + " username=\"%s\"" - + " password=\"%s\" ;"; + private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;"; + private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName()); + private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName()); static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; private final Function noSuchKafka = @@ -129,6 +133,9 @@ public class ClientFactory { @Inject KafkaClusterService kafkaClusterService; + @Inject + ValidationProxy validationService; + @Inject Instance trustManager; @@ -192,8 +199,9 @@ public ConsoleConfig produceConsoleConfig() { return consoleConfig; }) + .map(validationService::validate) .orElseGet(() -> { - log.infof("Console configuration not specified"); + log.warn("Console configuration has not been specified using `console.config-path` property"); return new ConsoleConfig(); }); } @@ -213,6 +221,7 @@ Map produceKafkaContexts(ConsoleConfig consoleConfig, consoleConfig.getKafka().getClusters() .stream() .filter(c -> cachedKafkaResource(c).isEmpty()) + .filter(Predicate.not(KafkaClusterConfig::hasNamespace)) .forEach(clusterConfig -> putKafkaContext(contexts, clusterConfig, Optional.empty(), @@ -309,19 +318,14 @@ void putKafkaContext(Map contexts, String clusterKey = clusterConfig.clusterKey(); String clusterId = Optional.ofNullable(clusterConfig.getId()) .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElse(null); + .orElseGet(clusterConfig::getName); - if (clusterId == null) { - log.warnf(""" - Ignoring Kafka cluster %s. Cluster id value missing in \ - configuration and no Strimzi Kafka resources found with matching \ - name and namespace.""", clusterKey); - } else if (!replace && contexts.containsKey(clusterId)) { + if (!replace && contexts.containsKey(clusterId)) { log.warnf(""" Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ configuration must be unique and may not match id values of \ clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); - } else if (truststoreRequired(adminConfigs)) { + } else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) { if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) { log.warnf(""" Ignoring update to Kafka custom resource %s. Connection requires \ @@ -337,13 +341,17 @@ void putKafkaContext(Map contexts, Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) - .map(store -> { + .map(store -> store.getByKey(clusterConfig.clusterKey())) + .or(() -> { String key = clusterConfig.clusterKey(); - Kafka resource = store.getByKey(key); - if (resource == null) { - log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key); + + if (kafkaInformer.isPresent()) { + log.warnf("Configuration references Kubernetes Kafka resource %s, but it was not found", key); + } else { + log.warnf("Configuration references Kubernetes Kafka resource %s, but Kubernetes access is disabled", key); } - return resource; + + return Optional.empty(); }) : Optional.empty(); } @@ -438,7 +446,7 @@ public KafkaContext produceKafkaContext(Map contexts, * configuration. The user must provide the login secrets * in the request in that case. */ - var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class)); + var adminConfigs = maybeAuthenticate(ctx, Admin.class); var admin = adminBuilder.apply(adminConfigs); return new KafkaContext(ctx, filter.apply(admin)); } @@ -463,7 +471,7 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context.configs(Consumer.class)); + var configs = maybeAuthenticate(context, Consumer.class); Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer return () -> client; } @@ -475,7 +483,7 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context.configs(Producer.class)); + var configs = maybeAuthenticate(context, Producer.class); Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer return () -> client; } @@ -484,11 +492,13 @@ public void producerDisposer(@Disposes Supplier> produc producer.get().close(); } - Map maybeAuthenticate(Map configs) { + Map maybeAuthenticate(KafkaContext context, Class clientType) { + Map configs = context.configs(clientType); + if (configs.containsKey(SaslConfigs.SASL_MECHANISM) && !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { configs = new HashMap<>(configs); - configureAuthentication(configs); + configureAuthentication(context.saslMechanism(clientType), configs); } return configs; @@ -665,13 +675,16 @@ void logConfig(String clientType, Map config) { } } - void configureAuthentication(Map configs) { - switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) { + void configureAuthentication(String saslMechanism, Map configs) { + switch (saslMechanism) { case OAUTHBEARER: configureOAuthBearer(configs); break; - case SCRAM_SHA512: - configureScram(configs); + case PLAIN: + configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE); + break; + case SCRAM_SHA256, SCRAM_SHA512: + configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE); break; default: throw new NotAuthorizedException("Unknown"); @@ -693,11 +706,11 @@ void configureOAuthBearer(Map configs) { configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); } - void configureScram(Map configs) { + void configureBasic(Map configs, String template) { log.trace("SASL/SCRAM enabled"); String jaasConfig = getBasicAuthentication() - .map(SASL_SCRAM_CONFIG_TEMPLATE::formatted) + .map(template::formatted) .orElseThrow(() -> new NotAuthorizedException(BASIC.trim())); configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 46da46f1a..e0e3ef9e5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -16,15 +16,15 @@ import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.github.streamshub.console.api.support.ComparatorBuilder; import com.github.streamshub.console.api.support.ListRequestContext; import static java.util.Comparator.comparing; import static java.util.Comparator.nullsLast; -@Schema(name = "KafkaClusterAttributes") -@JsonFilter("fieldFilter") -public class KafkaCluster { +@Schema(name = "KafkaCluster") +public class KafkaCluster extends Resource { public static class Fields { public static final String NAME = "name"; @@ -46,9 +46,9 @@ public static class Fields { static final Map>> COMPARATORS = ComparatorBuilder.bidirectional( Map.of("id", ID_COMPARATOR, - NAME, comparing(KafkaCluster::getName), - NAMESPACE, comparing(KafkaCluster::getNamespace), - CREATION_TIMESTAMP, comparing(KafkaCluster::getCreationTimestamp))); + NAME, comparing(KafkaCluster::name), + NAMESPACE, comparing(KafkaCluster::namespace), + CREATION_TIMESTAMP, comparing(KafkaCluster::creationTimestamp))); public static final ComparatorBuilder COMPARATOR_BUILDER = new ComparatorBuilder<>(KafkaCluster.Fields::comparator, KafkaCluster.Fields.defaultComparator()); @@ -84,15 +84,12 @@ public static Comparator comparator(String fieldName, boolean desc } @Schema(name = "KafkaClusterListResponse") - public static final class ListResponse extends DataList { + public static final class ListResponse extends DataList { public ListResponse(List data, ListRequestContext listSupport) { super(data.stream() .map(entry -> { - var rsrc = new KafkaClusterResource(entry); - rsrc.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); - rsrc.addMeta("configured", entry.isConfigured()); - rsrc.addMeta("managed", entry.isManaged()); - return rsrc; + entry.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); + return entry; }) .toList()); addMeta("page", listSupport.buildPageMeta()); @@ -101,48 +98,56 @@ public ListResponse(List data, ListRequestContext li } @Schema(name = "KafkaClusterResponse") - public static final class SingleResponse extends DataSingleton { + public static final class SingleResponse extends DataSingleton { public SingleResponse(KafkaCluster data) { - super(new KafkaClusterResource(data)); + super(data); } } - @Schema(name = "KafkaCluster") - public static final class KafkaClusterResource extends Resource { - public KafkaClusterResource(KafkaCluster data) { - super(data.id, "kafkas", data); - addMeta("configured", data.isConfigured()); - addMeta("managed", data.isManaged()); + @JsonFilter("fieldFilter") + static class Attributes { + @JsonProperty + String name; // Strimzi Kafka CR only + + @JsonProperty + String namespace; // Strimzi Kafka CR only + + @JsonProperty + String creationTimestamp; // Strimzi Kafka CR only + + @JsonProperty + final List nodes; + + @JsonProperty + final Node controller; + + @JsonProperty + final List authorizedOperations; + + @JsonProperty + List listeners; // Strimzi Kafka CR only + + @JsonProperty + String kafkaVersion; + + @JsonProperty + String status; + + @JsonProperty + List conditions; + + @JsonProperty + List nodePools; + + Attributes(List nodes, Node controller, List authorizedOperations) { + this.nodes = nodes; + this.controller = controller; + this.authorizedOperations = authorizedOperations; } } - String name; // Strimzi Kafka CR only - String namespace; // Strimzi Kafka CR only - String creationTimestamp; // Strimzi Kafka CR only - @JsonIgnore - String id; // non-final, may be overridden by configuration - final List nodes; - final Node controller; - final List authorizedOperations; - List listeners; // Strimzi Kafka CR only - @Schema(readOnly = true, description = """ - Contains the set of metrics optionally retrieved only in a describe operation. - """) - String kafkaVersion; - String status; - List conditions; - @JsonIgnore - boolean configured; - List nodePools; - @JsonIgnore - boolean managed; - public KafkaCluster(String id, List nodes, Node controller, List authorizedOperations) { - super(); - this.id = id; - this.nodes = nodes; - this.controller = controller; - this.authorizedOperations = authorizedOperations; + super(id, "kafkas", new Attributes(nodes, controller, authorizedOperations)); } /** @@ -156,9 +161,9 @@ public static KafkaCluster fromCursor(JsonObject cursor) { KafkaCluster cluster = new KafkaCluster(cursor.getString("id"), null, null, null); JsonObject attr = cursor.getJsonObject("attributes"); - cluster.setName(attr.getString(Fields.NAME, null)); - cluster.setNamespace(attr.getString(Fields.NAMESPACE, null)); - cluster.setCreationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null)); + cluster.name(attr.getString(Fields.NAME, null)); + cluster.namespace(attr.getString(Fields.NAMESPACE, null)); + cluster.creationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null)); return cluster; } @@ -168,9 +173,9 @@ public String toCursor(List sortFields) { .add("id", id == null ? Json.createValue("") : Json.createValue(id)); JsonObjectBuilder attrBuilder = Json.createObjectBuilder(); - maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, name); - maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, namespace); - maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, creationTimestamp); + maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, attributes.name); + maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, attributes.namespace); + maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, attributes.creationTimestamp); cursor.add("attributes", attrBuilder.build()); return Base64.getUrlEncoder().encodeToString(cursor.build().toString().getBytes(StandardCharsets.UTF_8)); @@ -182,103 +187,93 @@ static void maybeAddAttribute(JsonObjectBuilder attrBuilder, List sortFi } } - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; + public String name() { + return attributes.name; } - public String getNamespace() { - return namespace; + public void name(String name) { + attributes.name = name; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public String namespace() { + return attributes.namespace; } - public String getCreationTimestamp() { - return creationTimestamp; + public void namespace(String namespace) { + attributes.namespace = namespace; } - public void setCreationTimestamp(String creationTimestamp) { - this.creationTimestamp = creationTimestamp; + public String creationTimestamp() { + return attributes.creationTimestamp; } - public String getId() { - return id; + public void creationTimestamp(String creationTimestamp) { + attributes.creationTimestamp = creationTimestamp; } public void setId(String id) { this.id = id; } - public List getNodes() { - return nodes; + public List nodes() { + return attributes.nodes; } - public Node getController() { - return controller; + public Node controller() { + return attributes.controller; } - public List getAuthorizedOperations() { - return authorizedOperations; + public List authorizedOperations() { + return attributes.authorizedOperations; } - public List getListeners() { - return listeners; + public List listeners() { + return attributes.listeners; } - public void setListeners(List listeners) { - this.listeners = listeners; + public void listeners(List listeners) { + attributes.listeners = listeners; } - public String getKafkaVersion() { - return kafkaVersion; + public String kafkaVersion() { + return attributes.kafkaVersion; } - public void setKafkaVersion(String kafkaVersion) { - this.kafkaVersion = kafkaVersion; + public void kafkaVersion(String kafkaVersion) { + attributes.kafkaVersion = kafkaVersion; } - public String getStatus() { - return status; + public String status() { + return attributes.status; } - public void setStatus(String status) { - this.status = status; + public void status(String status) { + attributes.status = status; } - public List getConditions() { - return conditions; + public List conditions() { + return attributes.conditions; } - public void setConditions(List conditions) { - this.conditions = conditions; + public void conditions(List conditions) { + attributes.conditions = conditions; } + @JsonIgnore public boolean isConfigured() { - return configured; + return Boolean.TRUE.equals(getMeta("configured")); } + @JsonIgnore public void setConfigured(boolean configured) { - this.configured = configured; - } - - public List getNodePools() { - return nodePools; - } - - public void setNodePools(List nodePools) { - this.nodePools = nodePools; + addMeta("configured", configured); } - public void setManaged(boolean managed) { - this.managed = managed; + public List nodePools() { + return attributes.nodePools; } - public boolean isManaged() { - return managed; + public void nodePools(List nodePools) { + attributes.nodePools = nodePools; } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java b/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java index 304e59e63..de3fcde72 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java @@ -1,14 +1,7 @@ package com.github.streamshub.console.api.model; -import java.util.LinkedHashMap; -import java.util.Map; - -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; - import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.github.streamshub.console.api.support.ErrorCategory; /** * A "resource object", as described by JSON:API. @@ -19,51 +12,16 @@ * @param the type of the relationships model */ @JsonInclude(Include.NON_NULL) -public abstract class RelatableResource { - - protected final String id; - @NotNull(payload = ErrorCategory.InvalidResource.class) - protected final String type; - protected Map meta; - - @Valid - @NotNull(payload = ErrorCategory.InvalidResource.class) - protected final A attributes; +public abstract class RelatableResource extends Resource { protected final R relationships; protected RelatableResource(String id, String type, A attributes, R relationships) { - this.id = id; - this.type = type; - this.attributes = attributes; + super(id, type, attributes); this.relationships = relationships; } - public String getId() { - return id; - } - - public String getType() { - return type; - } - - public A getAttributes() { - return attributes; - } - public R getRelationships() { return relationships; } - - public Map getMeta() { - return meta; - } - - public RelatableResource addMeta(String key, Object value) { - if (meta == null) { - meta = new LinkedHashMap<>(); - } - meta.put(key, value); - return this; - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java index 452afab23..65a812694 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java @@ -20,13 +20,13 @@ @JsonInclude(Include.NON_NULL) public abstract class Resource { - private final String id; + protected String id; @NotNull(payload = ErrorCategory.InvalidResource.class) - private final String type; - private Map meta; + protected final String type; + protected Map meta; @Valid @NotNull(payload = ErrorCategory.InvalidResource.class) - private final T attributes; + protected final T attributes; protected Resource(String id, String type, T attributes) { this.id = id; @@ -50,6 +50,10 @@ public Map getMeta() { return meta; } + public Object getMeta(String key) { + return meta != null ? meta.get(key) : null; + } + public Resource addMeta(String key, Object value) { if (meta == null) { meta = new LinkedHashMap<>(); diff --git a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java index 7c432d062..d546742f7 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java @@ -29,7 +29,7 @@ public class BrokerService { public CompletionStage> describeConfigs(String nodeId) { return clusterService.describeCluster(Collections.emptyList()) .thenApply(cluster -> { - if (cluster.getNodes().stream().mapToInt(Node::id).mapToObj(String::valueOf).noneMatch(nodeId::equals)) { + if (cluster.nodes().stream().mapToInt(Node::id).mapToObj(String::valueOf).noneMatch(nodeId::equals)) { throw new NotFoundException("No such node: " + nodeId); } return cluster; diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index e51aa1f8e..674b365e7 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -23,6 +24,7 @@ import org.jboss.logging.Logger; import com.github.streamshub.console.api.Annotations; +import com.github.streamshub.console.api.ClientFactory; import com.github.streamshub.console.api.model.Condition; import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.KafkaListener; @@ -95,17 +97,11 @@ public List listClusters(ListRequestContext listSupp var config = ctx.getValue().clusterConfig(); return kafkaResources.stream() - .filter(k -> Objects.equals(k.getName(), config.getName())) - .filter(k -> Objects.equals(k.getNamespace(), config.getNamespace())) + .filter(k -> Objects.equals(k.name(), config.getName())) + .filter(k -> Objects.equals(k.namespace(), config.getNamespace())) .map(k -> addKafkaContextData(k, ctx.getValue())) .findFirst() - .orElseGet(() -> { - var k = new KafkaCluster(id, null, null, null); - k.setConfigured(true); - k.setName(config.getName()); - k.setNamespace(config.getNamespace()); - return k; - }); + .orElseGet(() -> addKafkaContextData(new KafkaCluster(id, null, null, null), ctx.getValue())); }) .collect(Collectors.toMap(KafkaCluster::getId, Function.identity())); @@ -159,12 +155,28 @@ KafkaCluster toKafkaCluster(Kafka kafka) { KafkaCluster addKafkaContextData(KafkaCluster cluster, KafkaContext kafkaContext) { var config = kafkaContext.clusterConfig(); cluster.setConfigured(true); + cluster.name(config.getName()); + cluster.namespace(config.getNamespace()); + if (config.getId() != null) { // configuration has overridden the id cluster.setId(config.getId()); } - cluster.setName(config.getName()); - cluster.setNamespace(config.getNamespace()); + + switch (kafkaContext.saslMechanism(Admin.class)) { + case ClientFactory.OAUTHBEARER: + Map authMeta = new HashMap<>(2); + authMeta.put("method", "oauth"); + authMeta.put("tokenUrl", kafkaContext.tokenUrl().orElse(null)); + cluster.addMeta("authentication", authMeta); + break; + case ClientFactory.PLAIN, ClientFactory.SCRAM_SHA256, ClientFactory.SCRAM_SHA512: + cluster.addMeta("authentication", Map.of("method", "basic")); + break; + default: + break; + } + return cluster; } @@ -178,9 +190,9 @@ KafkaCluster addKafkaResourceData(KafkaCluster cluster) { } void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { - cluster.setName(kafka.getMetadata().getName()); - cluster.setNamespace(kafka.getMetadata().getNamespace()); - cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); + cluster.name(kafka.getMetadata().getName()); + cluster.namespace(kafka.getMetadata().getNamespace()); + cluster.creationTimestamp(kafka.getMetadata().getCreationTimestamp()); var comparator = Comparator .comparingInt((GenericKafkaListener listener) -> @@ -205,39 +217,39 @@ void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { getAuthType(listener).orElse(null))) .toList(); - cluster.setListeners(listeners); + cluster.listeners(listeners); setKafkaClusterStatus(cluster, kafka); } void setKafkaClusterStatus(KafkaCluster cluster, Kafka kafka) { Optional.ofNullable(kafka.getStatus()) .ifPresent(status -> { - cluster.setKafkaVersion(status.getKafkaVersion()); + cluster.kafkaVersion(status.getKafkaVersion()); Optional.ofNullable(status.getConditions()) .ifPresent(conditions -> { - cluster.setConditions(conditions.stream().map(Condition::new).toList()); + cluster.conditions(conditions.stream().map(Condition::new).toList()); conditions.stream() .filter(c -> "NotReady".equals(c.getType()) && "True".equals(c.getStatus())) .findFirst() .ifPresentOrElse( - c -> cluster.setStatus("NotReady"), - () -> cluster.setStatus("Ready")); + c -> cluster.status("NotReady"), + () -> cluster.status("Ready")); }); Optional.ofNullable(status.getKafkaNodePools()) - .ifPresent(pools -> cluster.setNodePools(pools.stream().map(pool -> pool.getName()).toList())); + .ifPresent(pools -> cluster.nodePools(pools.stream().map(pool -> pool.getName()).toList())); }); } KafkaCluster setManaged(KafkaCluster cluster) { - cluster.setManaged(findCluster(cluster) + cluster.addMeta("managed", findCluster(cluster) .map(kafkaTopic -> Boolean.TRUE) .orElse(Boolean.FALSE)); return cluster; } private Optional findCluster(KafkaCluster cluster) { - return findCluster(Cache.namespaceKeyFunc(cluster.getNamespace(), cluster.getName())); + return findCluster(Cache.namespaceKeyFunc(cluster.namespace(), cluster.name())); } private Optional findCluster(String clusterKey) { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 48601ee9c..89cd34efe 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -1,15 +1,24 @@ package com.github.streamshub.console.api.support; import java.io.Closeable; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.config.SaslConfigs; import com.github.streamshub.console.config.KafkaClusterConfig; import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth; +import io.strimzi.kafka.oauth.client.ClientConfig; public class KafkaContext implements Closeable { @@ -82,4 +91,24 @@ public Admin admin() { public boolean applicationScoped() { return applicationScoped; } + + public String saslMechanism(Class clientType) { + return configs(clientType).get(SaslConfigs.SASL_MECHANISM) instanceof String auth ? auth : ""; + } + + public Optional tokenUrl() { + return Optional.ofNullable(clusterConfig.getProperties().get(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI)) + .or(() -> Optional.ofNullable(resource()) + .map(Kafka::getSpec) + .map(KafkaSpec::getKafka) + .map(KafkaClusterSpec::getListeners) + .map(Collection::stream) + .orElseGet(Stream::empty) + .filter(listener -> listener.getName().equals(clusterConfig.getListener())) + .findFirst() + .map(GenericKafkaListener::getAuth) + .filter(KafkaListenerAuthenticationOAuth.class::isInstance) + .map(KafkaListenerAuthenticationOAuth.class::cast) + .map(KafkaListenerAuthenticationOAuth::getTokenEndpointUri)); + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java b/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java index 97a2d823c..5b2bf4769 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java @@ -54,7 +54,7 @@ public RequestBody filterRequestBody(RequestBody requestBody) { .filter(example -> Objects.nonNull(example.getExternalValue())) .forEach(example -> { try (InputStream stream = getClass().getResourceAsStream(example.getExternalValue())) { - LOGGER.infof("Loading Example externalValue: %s", example.getExternalValue()); + LOGGER.debugf("Loading Example externalValue: %s", example.getExternalValue()); example.setValue(objectMapper.get().readTree(stream)); example.setExternalValue(null); } catch (IOException e) { diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index c70fec245..703261c30 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -66,14 +66,11 @@ console.kafka.admin.default.api.timeout.ms=10000 %dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG -#%dev.quarkus.log.category."io.vertx.core.http".level=DEBUG -#%dev.quarkus.log.category."io.netty".level=DEBUG ######## %testplain.quarkus.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.override-kubeconfig=true -%testplain.quarkus.log.category."io.fabric8.kubernetes".level=DEBUG #%testplain.quarkus.http.auth.proactive=false #%testplain.quarkus.http.auth.permission."oidc".policy=permit diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java index 8d34928b8..4dc3047d5 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java @@ -62,14 +62,14 @@ void setup() throws IOException { kafkaContainer = deployments.getKafkaContainer(); bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); randomBootstrapServers = URI.create(consoleConfig.getKafka() - .getCluster("default/test-kafka2") + .getCluster("test-kafka2") .map(k -> k.getProperties().get("bootstrap.servers")) .orElseThrow()); utils = new TestHelper(bootstrapServers, config, null); - clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); - clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); + clusterId1 = consoleConfig.getKafka().getCluster("test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("test-kafka2").get().getId(); kafkaClusterService.setListUnconfigured(false); } diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java index b892be8d2..8d479bb04 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java @@ -28,15 +28,17 @@ public Map getConfigOverrides() { kafka: clusters: - name: test-kafka1 - namespace: default id: k1-id properties: bootstrap.servers: ${console.test.external-bootstrap} - name: test-kafka2 - namespace: default id: k2-id properties: bootstrap.servers: ${console.test.random-bootstrap} + - name: test-kafka3 + namespace: default + id: k3-id + listener: listener0 """); return Map.of( diff --git a/common/pom.xml b/common/pom.xml index 4173178a4..920b0f14a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,6 +28,10 @@ com.fasterxml.jackson.core jackson-annotations + + jakarta.validation + jakarta.validation-api + provided + - - \ No newline at end of file + diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index 1f7f058fd..b5a8334a9 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -3,11 +3,14 @@ import java.util.LinkedHashMap; import java.util.Map; +import jakarta.validation.constraints.NotBlank; + import com.fasterxml.jackson.annotation.JsonIgnore; public class KafkaClusterConfig { private String id; + @NotBlank private String name; private String namespace; private String listener; diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java index f42a54ed4..9c68d1622 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java @@ -4,12 +4,20 @@ import java.util.List; import java.util.Optional; +import jakarta.validation.constraints.AssertTrue; + import com.fasterxml.jackson.annotation.JsonIgnore; public class KafkaConfig { List clusters = new ArrayList<>(); + @JsonIgnore + @AssertTrue(message = "Kafka cluster names must be unique") + public boolean hasUniqueClusterNames() { + return clusters.stream().map(KafkaClusterConfig::getName).distinct().count() == clusters.size(); + } + @JsonIgnore public Optional getCluster(String clusterKey) { return clusters.stream() diff --git a/console-config-example.yaml b/console-config-example.yaml index 44f539fde..3b55847b7 100644 --- a/console-config-example.yaml +++ b/console-config-example.yaml @@ -1,9 +1,9 @@ -kafka: - kubernetes: - # enable/disable use of Kubernetes to obtain additional information from Strimzi - # Kafka and KafkaTopic custom resources. Enabled by default - enabled: true +kubernetes: + # enable/disable use of Kubernetes to obtain additional information from Strimzi + # Kafka and KafkaTopic custom resources. Enabled by default + enabled: true +kafka: clusters: - name: my-kafka1 # name of the Strimzi Kafka CR namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional) diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index 3a14c05b2..1d3323971 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -9,11 +9,6 @@ @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) -@ValidationRule( - // The `namespace` property must be wrapped in double underscore to escape it - // due to it being a "reserved" word. - value = "has(self.id) || has(self.__namespace__)", - message = "One of `id` or `namespace` is required") @ValidationRule( // The `namespace` property must be wrapped in double underscore to escape it // due to it being a "reserved" word. @@ -27,10 +22,10 @@ public class KafkaCluster { resource may be discovered using the name and namespace properties, \ this property is optional. Otherwise, the Kafka cluster identifier \ published in the Kafka resource's status will be used. If namespace \ - is not given or the console or Kubernetes is not in use, this property \ - is required. + is not given or the console or Kubernetes is not in use, and this \ + property is not provided, the ID will default to the name. - When provided, this property will override the Kafka cluster id available \ + When provided, this property will override the Kafka cluster ID available \ in the Kafka resource's status.""") private String id; From 536a88e2ca48b8e1d2f8e9d9d9ed8c5354bb66e3 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 1 Aug 2024 12:10:50 -0400 Subject: [PATCH 3/4] Refine ClientFactory config handling, add SCRAM/OAuth tests Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 112 ++++++++--- .../console/api/KafkaClustersResourceIT.java | 184 ++++++++++++------ .../kafka/systemtest/TestPlainProfile.java | 2 - install/000-install-dependency-operators.sh | 0 install/001-deploy-prometheus.sh | 0 5 files changed, 207 insertions(+), 91 deletions(-) mode change 100644 => 100755 install/000-install-dependency-operators.sh mode change 100644 => 100755 install/001-deploy-prometheus.sh diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 251f75d21..cccbd5e54 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -6,6 +6,7 @@ import java.nio.file.Path; import java.util.Base64; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -228,7 +229,7 @@ Map produceKafkaContexts(ConsoleConfig consoleConfig, adminBuilder, false)); - return contexts; + return Collections.unmodifiableMap(contexts); } void addKafkaEventHandler(Map contexts, @@ -237,32 +238,52 @@ void addKafkaEventHandler(Map contexts, kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler() { public void onAdd(Kafka kafka) { - findConfig(kafka).ifPresent(clusterConfig -> putKafkaContext(contexts, - clusterConfig, - Optional.of(kafka), - adminBuilder, - false)); + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s added", Cache.metaNamespaceKeyFunc(kafka)); + } + findConfig(kafka).ifPresentOrElse( + clusterConfig -> { + if (defaultedClusterId(clusterConfig, Optional.of(kafka))) { + log.debugf("Ignoring added Kafka resource %s, cluster ID not yet available and not provided via configuration", + Cache.metaNamespaceKeyFunc(kafka)); + } else { + putKafkaContext(contexts, + clusterConfig, + Optional.of(kafka), + adminBuilder, + false); + } + }, + () -> log.debugf("Ignoring added Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka))); } public void onUpdate(Kafka oldKafka, Kafka newKafka) { - findConfig(newKafka).ifPresent(clusterConfig -> putKafkaContext(contexts, + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s updated", Cache.metaNamespaceKeyFunc(oldKafka)); + } + findConfig(newKafka).ifPresentOrElse( + clusterConfig -> putKafkaContext(contexts, clusterConfig, Optional.of(newKafka), adminBuilder, - true)); + true), + () -> log.debugf("Ignoring updated Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(newKafka))); } public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { - findConfig(kafka).ifPresent(clusterConfig -> { - String clusterKey = clusterConfig.clusterKey(); - String clusterId = Optional.ofNullable(clusterConfig.getId()) - .or(() -> Optional.ofNullable(kafka.getStatus()).map(KafkaStatus::getClusterId)) - .orElse(null); - log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); - log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); - KafkaContext previous = contexts.remove(clusterId); - Optional.ofNullable(previous).ifPresent(KafkaContext::close); - }); + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s deleted", Cache.metaNamespaceKeyFunc(kafka)); + } + findConfig(kafka).ifPresentOrElse( + clusterConfig -> { + String clusterKey = clusterConfig.clusterKey(); + String clusterId = clusterId(clusterConfig, Optional.of(kafka)); + log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); + log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); + KafkaContext previous = contexts.remove(clusterId); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); + }, + () -> log.debugf("Ignoring deleted Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka))); } Optional findConfig(Kafka kafka) { @@ -305,9 +326,9 @@ void putKafkaContext(Map contexts, kafkaResource); Map, Map> clientConfigs = new HashMap<>(); - clientConfigs.put(Admin.class, adminConfigs); - clientConfigs.put(Consumer.class, consumerConfigs); - clientConfigs.put(Producer.class, producerConfigs); + clientConfigs.put(Admin.class, Collections.unmodifiableMap(adminConfigs)); + clientConfigs.put(Consumer.class, Collections.unmodifiableMap(consumerConfigs)); + clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs)); Admin admin = null; @@ -316,29 +337,58 @@ void putKafkaContext(Map contexts, } String clusterKey = clusterConfig.clusterKey(); - String clusterId = Optional.ofNullable(clusterConfig.getId()) - .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElseGet(clusterConfig::getName); + String clusterId = clusterId(clusterConfig, kafkaResource); if (!replace && contexts.containsKey(clusterId)) { log.warnf(""" Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ configuration must be unique and may not match id values of \ clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); - } else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) { - if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) { + } else { + boolean truststoreNowRequired = truststoreRequired(adminConfigs); + + if (resourceStatusDroppedCertificates(contexts.get(clusterId), kafkaResource, truststoreNowRequired)) { log.warnf(""" Ignoring update to Kafka custom resource %s. Connection requires \ trusted certificate which is no longer available.""", clusterKey); + } else { + if (truststoreNowRequired && kafkaResource.isPresent()) { + log.warnf(""" + Connection requires trusted certificate(s) which are not present \ + in the Kafka CR status of resource %s.""", clusterKey); + } + + KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); + log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); + KafkaContext previous = contexts.put(clusterId, ctx); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); } - } else { - KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); - log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); - KafkaContext previous = contexts.put(clusterId, ctx); - Optional.ofNullable(previous).ifPresent(KafkaContext::close); } } + boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { + return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty(); + } + + String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { + return Optional.ofNullable(clusterConfig.getId()) + .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) + .orElseGet(clusterConfig::getName); + } + + /** + * Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being + * removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore + * this update and keep the old KafkaContext. + */ + boolean resourceStatusDroppedCertificates(KafkaContext context, Optional kafkaResource, boolean truststoreNowRequired) { + if (!truststoreNowRequired || context == null || kafkaResource.isEmpty()) { + return false; + } + + return !truststoreRequired(context.configs(Admin.class)); + } + Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> store.getByKey(clusterConfig.clusterKey())) diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 879f392ab..aec59c4d1 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -24,12 +24,13 @@ import jakarta.ws.rs.core.Response.Status; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.eclipse.microprofile.config.Config; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -56,7 +57,9 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuthBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512Builder; +import io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler; import io.strimzi.test.container.StrimziKafkaContainer; import static com.github.streamshub.console.test.TestHelper.whenRequesting; @@ -65,7 +68,6 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -75,6 +77,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -491,32 +494,6 @@ void testListClustersWithUnexpectedPageCursorData() { .body("errors.source.parameter", contains("page[after]")); } - @Test - @Disabled("Only configured clusters are returned at this time") - void testListClustersWithUnconfiguredCluster() { - String clusterId = UUID.randomUUID().toString(); - String clusterName = "test-kafka-" + clusterId; - - // Create a Kafka CR with SCRAM-SHA that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource(clusterName, clusterId, bootstrapServers)) - .create(); - - // Wait for the informer cache to be populated with all Kafka CRs - await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 3)); - - whenRequesting(req -> req.get()) - .assertThat() - .statusCode(is(Status.OK.getStatusCode())) - .body("data.size()", equalTo(3)) - .body("data.id", containsInAnyOrder(clusterId1, clusterId2, clusterId)) - .body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2", clusterName)) - .body("data.find { it.attributes.name == 'test-kafka1'}.meta.configured", is(true)) - .body("data.find { it.attributes.name == 'test-kafka2'}.meta.configured", is(true)) - .body("data.find { it.attributes.name == '" + clusterName + "'}.meta.configured", is(false)); - } - @Test void testDescribeClusterWithCustomAuthType() { mockAdminClient(); @@ -539,10 +516,17 @@ void testDescribeClusterWithCertificates() { String clusterId = UUID.randomUUID().toString(); /* - * Create a Kafka CR with OAuth that proxies to kafka1. - * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL + * Create a Kafka CR that proxies to kafka1. + * test-kafka3 is predefined in KafkaUnsecuredResourceManager */ Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(true) + .endListener() + .endKafka() + .endSpec() .editStatus() .editMatchingListener(l -> "listener0".equals(l.getName())) .addToCertificates(""" @@ -563,7 +547,8 @@ void testDescribeClusterWithCertificates() { whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() - .statusCode(is(Status.OK.getStatusCode())); + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka3")); // Ignoring response data since they are from test-kafka-1 assertEquals("SSL", clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); @@ -577,14 +562,24 @@ void testDescribeClusterWithTLSMissingCertificates() { /* * Create a Kafka CR without certificates - * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL + * test-kafka3 is predefined in KafkaUnsecuredResourceManager */ Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(true) + .endListener() + .endKafka() + .endSpec() .build(); Map clientConfig = mockAdminClient(); - client.resources(Kafka.class).resource(kafka).create(); + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); await().atMost(Duration.ofSeconds(5)) .until(() -> kafkaInformer.get() @@ -597,29 +592,13 @@ void testDescribeClusterWithTLSMissingCertificates() { whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() - .statusCode(is(Status.NOT_FOUND.getStatusCode())); + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka3")); // Ignoring response data since they are from test-kafka-1 - // adminBuilder was never called to update configuration - assertThat(clientConfig, is(anEmptyMap())); - } - - @Test - void testDescribeClusterWithScram() { - String clusterId = UUID.randomUUID().toString(); - - // Create a Kafka CR with SCRAM-SHA that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, - new KafkaListenerAuthenticationScramSha512Builder().build())) - .create(); - - whenRequesting(req -> req.get("{clusterId}", clusterId)) - .assertThat() - .statusCode(is(Status.NOT_FOUND.getStatusCode())) - .body("errors.size()", is(1)) - .body("errors.status", contains("404")) - .body("errors.code", contains("4041")); + assertEquals("SSL", clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertThat(clientConfig, not(hasKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))); + assertThat(clientConfig, not(hasKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG))); } @Test @@ -627,10 +606,8 @@ void testDescribeClusterWithCustomNonOAuth() { String clusterId = UUID.randomUUID().toString(); // Create a Kafka CR with generic custom authentication that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, - new KafkaListenerAuthenticationCustomBuilder().build())) - .create(); + utils.apply(client, utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, + new KafkaListenerAuthenticationCustomBuilder().build())); whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() @@ -660,6 +637,97 @@ void testDescribeClusterWithNoSuchCluster() { .body("errors.code", contains("4041")); } + @ParameterizedTest + @CsvSource({ + "true, SASL_SSL", + "false, SASL_PLAINTEXT" + }) + void testDescribeClusterWithOAuthTokenUrl(boolean tls, String expectedProtocol) { + String clusterId = UUID.randomUUID().toString(); + + /* + * Create a Kafka CR that proxies to kafka1. + * test-kafka3 is predefined in KafkaUnsecuredResourceManager + */ + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, + new KafkaListenerAuthenticationOAuthBuilder() + .withTokenEndpointUri("https://example.com/token") + .build())) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(tls) + .endListener() + .endKafka() + .endSpec() + .build(); + + Map clientConfig = mockAdminClient(); + + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); + + whenRequesting(req -> req + .auth().oauth2("my-secure-token") + .get("{clusterId}", clusterId)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", is("test-kafka3")) + .body("data.meta.authentication.method", is("oauth")) + .body("data.meta.authentication.tokenUrl", is("https://example.com/token")); + + assertEquals(expectedProtocol, clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertEquals("OAUTHBEARER", clientConfig.get(SaslConfigs.SASL_MECHANISM)); + assertEquals(JaasClientOauthLoginCallbackHandler.class.getName(), + clientConfig.get(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS)); + } + + @ParameterizedTest + @CsvSource({ + "true, SASL_SSL", + "false, SASL_PLAINTEXT" + }) + void testDescribeClusterWithScram(boolean tls, String expectedProtocol) { + String clusterId = UUID.randomUUID().toString(); + + /* + * Create a Kafka CR with SCRAM-SHA that proxies to kafka1 + * test-kafka3 is predefined in KafkaUnsecuredResourceManager + */ + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, + new KafkaListenerAuthenticationScramSha512Builder().build())) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(tls) + .endListener() + .endKafka() + .endSpec() + .build(); + + Map clientConfig = mockAdminClient(); + + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); + + whenRequesting(req -> req + .auth().basic("u", "p") + .get("{clusterId}", clusterId)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", is("test-kafka3")) + .body("data.meta.authentication.method", is("basic")); + + assertEquals(expectedProtocol, clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertEquals("SCRAM-SHA-512", clientConfig.get(SaslConfigs.SASL_MECHANISM)); + assertThat(String.valueOf(clientConfig.get(SaslConfigs.SASL_JAAS_CONFIG)), + containsString(ScramLoginModule.class.getName())); + } + // Helper methods static Map mockAdminClient() { diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index 340b7c08d..2f1b5e290 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -52,8 +52,6 @@ public Map getConfigOverrides() { namespace: default # listener is named and bootstrap.servers not set (will be retrieved from Kafka CR) listener: listener0 - properties: - security.protocol: SSL """); return Map.of("console.config-path", configFile.getAbsolutePath()); diff --git a/install/000-install-dependency-operators.sh b/install/000-install-dependency-operators.sh old mode 100644 new mode 100755 diff --git a/install/001-deploy-prometheus.sh b/install/001-deploy-prometheus.sh old mode 100644 new mode 100755 From 9c92de851cac9594f2e34ee62da9ecd610ffe130 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 8 Aug 2024 16:00:16 -0400 Subject: [PATCH 4/4] Support anonymous mode when JAAS configuration given via YAML Signed-off-by: Michael Edgar --- .../api/service/KafkaClusterService.java | 33 ++++++++++++++----- .../console/api/support/KafkaContext.java | 4 +++ api/src/main/resources/application.properties | 2 +- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index 674b365e7..5a175aa5d 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -50,6 +50,9 @@ @ApplicationScoped public class KafkaClusterService { + private static final String AUTHN_KEY = "authentication"; + private static final String AUTHN_METHOD_KEY = "method"; + @Inject Logger logger; @@ -163,25 +166,37 @@ KafkaCluster addKafkaContextData(KafkaCluster cluster, KafkaContext kafkaContext cluster.setId(config.getId()); } + if (kafkaContext.applicationScoped()) { + if (kafkaContext.hasCredentials(Admin.class)) { + cluster.addMeta(AUTHN_KEY, Map.of(AUTHN_METHOD_KEY, "anonymous")); + } else { + addAuthenticationMethod(cluster, kafkaContext); + } + } else { + addAuthenticationMethod(cluster, kafkaContext); + } + + return cluster; + } + + KafkaCluster addKafkaContextData(KafkaCluster cluster) { + return addKafkaContextData(cluster, kafkaContext); + } + + void addAuthenticationMethod(KafkaCluster cluster, KafkaContext kafkaContext) { switch (kafkaContext.saslMechanism(Admin.class)) { case ClientFactory.OAUTHBEARER: Map authMeta = new HashMap<>(2); - authMeta.put("method", "oauth"); + authMeta.put(AUTHN_METHOD_KEY, "oauth"); authMeta.put("tokenUrl", kafkaContext.tokenUrl().orElse(null)); - cluster.addMeta("authentication", authMeta); + cluster.addMeta(AUTHN_KEY, authMeta); break; case ClientFactory.PLAIN, ClientFactory.SCRAM_SHA256, ClientFactory.SCRAM_SHA512: - cluster.addMeta("authentication", Map.of("method", "basic")); + cluster.addMeta(AUTHN_KEY, Map.of(AUTHN_METHOD_KEY, "basic")); break; default: break; } - - return cluster; - } - - KafkaCluster addKafkaContextData(KafkaCluster cluster) { - return addKafkaContextData(cluster, kafkaContext); } KafkaCluster addKafkaResourceData(KafkaCluster cluster) { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 89cd34efe..2c505f1f4 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -96,6 +96,10 @@ public String saslMechanism(Class clientType) { return configs(clientType).get(SaslConfigs.SASL_MECHANISM) instanceof String auth ? auth : ""; } + public boolean hasCredentials(Class clientType) { + return configs(clientType).get(SaslConfigs.SASL_JAAS_CONFIG) instanceof String; + } + public Optional tokenUrl() { return Optional.ofNullable(clusterConfig.getProperties().get(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI)) .or(() -> Optional.ofNullable(resource()) diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 703261c30..0378be55a 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -1,7 +1,7 @@ quarkus.http.access-log.enabled=true quarkus.http.record-request-start-time=true # Default access-log pattern with `%u` removed. Due to the mixing of Quarkus and Vert.x authorization, the user authenticated cannot be obtained at this time -quarkus.http.access-log.pattern=%{REMOTE_HOST} %l "%{REQUEST_LINE}" %{RESPONSE_CODE} %{RESPONSE_TIME} %b +quarkus.http.access-log.pattern=%{REMOTE_HOST} %l "%{REQUEST_LINE}" %{RESPONSE_CODE} %{RESPONSE_TIME}ms %{BYTES_SENT} quarkus.http.access-log.exclude-pattern=(?:/health(/live|/ready|/started)?|/metrics) quarkus.http.non-application-root-path=${quarkus.http.root-path} quarkus.http.http2=false