diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 2e2893eeb..e365c7c28 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -4,6 +4,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Path; +import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -30,7 +31,9 @@ import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import jakarta.inject.Named; +import jakarta.ws.rs.NotAuthorizedException; import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.UriInfo; import org.apache.kafka.clients.CommonClientConfigs; @@ -44,6 +47,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; @@ -63,7 +69,11 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; import io.strimzi.api.kafka.model.kafka.KafkaStatus; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthentication; import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; /** @@ -82,6 +92,20 @@ @ApplicationScoped public class ClientFactory { + private static final String BEARER = "Bearer "; + private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; + private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; + private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() + + " required" + + " oauth.access.token=\"%s\" ;"; + + private static final String BASIC = "Basic "; + private static final String SCRAM_SHA512 = "SCRAM-SHA-512"; + private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName() + + " required" + + " username=\"%s\"" + + " password=\"%s\" ;"; + static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; private final Function noSuchKafka = clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName)); @@ -108,6 +132,9 @@ public class ClientFactory { @Inject Instance trustManager; + @Inject + HttpHeaders headers; + @Inject UriInfo requestUri; @@ -275,7 +302,7 @@ void putKafkaContext(Map contexts, Admin admin = null; - if (establishGlobalConnection(clusterConfig, adminConfigs)) { + if (establishGlobalConnection(adminConfigs)) { admin = adminBuilder.apply(adminConfigs); } @@ -309,7 +336,7 @@ void putKafkaContext(Map contexts, } Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { - return kafkaInformer.map(SharedIndexInformer::getStore) + return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> { String key = clusterConfig.clusterKey(); Kafka resource = store.getByKey(key); @@ -317,7 +344,7 @@ Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key); } return resource; - }); + }) : Optional.empty(); } Map requiredAdminConfig() { @@ -349,7 +376,7 @@ Map requiredProducerConfig() { return configs; } - static boolean establishGlobalConnection(KafkaClusterConfig clusterConfig, Map configs) { + static boolean establishGlobalConnection(Map configs) { if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { return false; } @@ -405,21 +432,30 @@ public KafkaContext produceKafkaContext(Map contexts, return Optional.ofNullable(contexts.get(clusterId)) .map(ctx -> { - Admin admin = Optional.ofNullable(ctx.admin()) - /* - * Admin may be null if credentials were not given - * in the configuration. The user must provide the - * login secrets in the request in that case. - */ - .orElseGet(() -> adminBuilder.apply(ctx.configs(Admin.class))); - return new KafkaContext(ctx, filter.apply(admin)); + if (ctx.admin() == null) { + /* + * Admin may be null if credentials were not given in the + * configuration. The user must provide the login secrets + * in the request in that case. + */ + var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class)); + var admin = adminBuilder.apply(adminConfigs); + return new KafkaContext(ctx, filter.apply(admin)); + } + + return ctx; }) .orElseThrow(() -> noSuchKafka.apply(clusterId)); } public void disposeKafkaContext(@Disposes KafkaContext context, Map contexts) { if (!contexts.values().contains(context)) { - log.infof("Closing out-of-date KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource())); + var clusterKey = context.clusterConfig().clusterKey(); + if (context.applicationScoped()) { + log.infof("Closing out-of-date KafkaContext: %s", clusterKey); + } else { + log.debugf("Closing request-scoped KafkaContext: %s", clusterKey); + } context.close(); } } @@ -427,8 +463,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = context.configs(Consumer.class); - Consumer client = new KafkaConsumer<>(configs); + var configs = maybeAuthenticate(context.configs(Consumer.class)); + Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer return () -> client; } @@ -439,8 +475,8 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = context.configs(Producer.class); - Producer client = new KafkaProducer<>(configs); + var configs = maybeAuthenticate(context.configs(Producer.class)); + Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer return () -> client; } @@ -448,6 +484,16 @@ public void producerDisposer(@Disposes Supplier> produc producer.get().close(); } + Map maybeAuthenticate(Map configs) { + if (configs.containsKey(SaslConfigs.SASL_MECHANISM) + && !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + configs = new HashMap<>(configs); + configureAuthentication(configs); + } + + return configs; + } + Map buildConfig(Set configNames, KafkaClusterConfig config, String clientType, @@ -465,49 +511,43 @@ Map buildConfig(Set configNames, .map(Optional::get) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); - if (!cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { - cluster.map(Kafka::getStatus) + var listenerSpec = cluster.map(Kafka::getSpec) + .map(KafkaSpec::getKafka) + .map(KafkaClusterSpec::getListeners) + .map(Collection::stream) + .orElseGet(Stream::empty) + .filter(listener -> listener.getName().equals(config.getListener())) + .findFirst(); + + var listenerStatus = cluster.map(Kafka::getStatus) .map(KafkaStatus::getListeners) .map(Collection::stream) .orElseGet(Stream::empty) .filter(listener -> listener.getName().equals(config.getListener())) - .map(ListenerStatus::getBootstrapServers) - .findFirst() - .ifPresent(bootstrapServers -> cfg.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); - } + .findFirst(); + + listenerSpec.ifPresent(listener -> applyListenerConfiguration(cfg, listener)); + + listenerStatus.map(ListenerStatus::getBootstrapServers) + .ifPresent(bootstrap -> cfg.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap)); + + listenerStatus.map(ListenerStatus::getCertificates) + .filter(Objects::nonNull) + .filter(Predicate.not(Collection::isEmpty)) + .map(certificates -> String.join("\n", certificates).trim()) + .ifPresent(certificates -> { + cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates); + }); if (truststoreRequired(cfg)) { - if (trustManager.isResolvable()) { + if (cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) && trustManager.isResolvable()) { trustManager.get().trustClusterCertificate(cfg); } else { - cluster.map(Kafka::getStatus) - .map(KafkaStatus::getListeners) - .map(Collection::stream) - .orElseGet(Stream::empty) - .filter(listener -> { - if (listener.getName().equals(config.getListener())) { - return true; - } - - return cfg.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "") - .toString() - .contains(listener.getBootstrapServers()); - }) - .map(ListenerStatus::getCertificates) - .filter(Objects::nonNull) - .filter(Predicate.not(Collection::isEmpty)) - .findFirst() - .ifPresent(certificates -> { - cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); - cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, String.join("\n", certificates).trim()); - }); - - if (truststoreRequired(cfg)) { - log.warnf(""" - Failed to set configuration for %s client to Kafka cluster %s. Connection \ - requires truststore which could not be obtained from the Kafka resource status.""" - .formatted(clientType, config.clusterKey())); - } + log.warnf(""" + Failed to set configuration for %s client to Kafka cluster %s. Connection \ + requires truststore which could not be obtained from the Kafka resource status.""" + .formatted(clientType, config.clusterKey())); } } @@ -522,6 +562,42 @@ Map buildConfig(Set configNames, return cfg; } + private void applyListenerConfiguration(Map cfg, GenericKafkaListener listener) { + var authType = Optional.ofNullable(listener.getAuth()) + .map(KafkaListenerAuthentication::getType) + .orElse(""); + boolean saslEnabled; + + switch (authType) { + case "oauth": + saslEnabled = true; + cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, OAUTHBEARER); + cfg.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK); + break; + case "scram-sha-512": + cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, SCRAM_SHA512); + saslEnabled = true; + break; + default: + saslEnabled = false; + break; + } + + StringBuilder protocol = new StringBuilder(); + + if (saslEnabled) { + protocol.append("SASL_"); + } + + if (listener.isTls()) { + protocol.append(SecurityProtocol.SSL.name); + } else { + protocol.append(SecurityProtocol.PLAINTEXT.name); + } + + cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + } + private void resolveValues(Map properties) { properties.entrySet().forEach(entry -> entry.setValue(resolveValue(entry.getValue()))); @@ -589,6 +665,62 @@ void logConfig(String clientType, Map config) { } } + void configureAuthentication(Map configs) { + switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) { + case OAUTHBEARER: + configureOAuthBearer(configs); + break; + case SCRAM_SHA512: + configureScram(configs); + break; + default: + throw new NotAuthorizedException("Unknown"); + } + } + + void configureOAuthBearer(Map configs) { + log.trace("SASL/OAUTHBEARER enabled"); + + configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK); + // Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin) + // May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS. + configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0"); + + String jaasConfig = getAuthorization(BEARER) + .map(SASL_OAUTH_CONFIG_TEMPLATE::formatted) + .orElseThrow(() -> new NotAuthorizedException(BEARER.trim())); + + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + } + + void configureScram(Map configs) { + log.trace("SASL/SCRAM enabled"); + + String jaasConfig = getBasicAuthentication() + .map(SASL_SCRAM_CONFIG_TEMPLATE::formatted) + .orElseThrow(() -> new NotAuthorizedException(BASIC.trim())); + + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + } + + Optional getBasicAuthentication() { + return getAuthorization(BASIC) + .map(Base64.getDecoder()::decode) + .map(String::new) + .filter(authn -> authn.indexOf(':') >= 0) + .map(authn -> new String[] { + authn.substring(0, authn.indexOf(':')), + authn.substring(authn.indexOf(':') + 1) + }) + .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty()); + } + + Optional getAuthorization(String scheme) { + return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION)) + .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length())) + .map(header -> header.substring(scheme.length())); + } + private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)"); private static final Pattern EMBEDDED_STRING = Pattern.compile("\"[^\"]*\""); } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 7226bb34d..48601ee9c 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -19,16 +19,19 @@ public class KafkaContext implements Closeable { final Kafka resource; final Map, Map> configs; final Admin admin; + boolean applicationScoped; public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map, Map> configs, Admin admin) { this.clusterConfig = clusterConfig; this.resource = resource; this.configs = Map.copyOf(configs); this.admin = admin; + this.applicationScoped = true; } public KafkaContext(KafkaContext other, Admin admin) { this(other.clusterConfig, other.resource, other.configs, admin); + this.applicationScoped = false; } @Override @@ -75,4 +78,8 @@ public Map configs(Class type) { public Admin admin() { return admin; } + + public boolean applicationScoped() { + return applicationScoped; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java b/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java index 3e13eb08a..112c967b1 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java @@ -7,6 +7,7 @@ import java.security.cert.X509Certificate; import java.util.Base64; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -32,14 +33,26 @@ public class TrustAllCertificateManager implements X509TrustManager { @Inject Logger log; + Map trustedCertificates = new ConcurrentHashMap<>(); + public void trustClusterCertificate(Map cfg) { + String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + String trusted = trustedCertificates.computeIfAbsent(bootstrap, this::loadCertificates); + + if (trusted != null) { + cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, trusted); + cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + } + } + + private String loadCertificates(String bootstrap) { TrustManager[] trustAllCerts = {this}; + String certificates = null; try { SSLContext sc = SSLContext.getInstance("TLSv1.2"); sc.init(null, trustAllCerts, new SecureRandom()); SSLSocketFactory factory = sc.getSocketFactory(); - String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); String[] hostport = bootstrap.split(",")[0].split(":"); ByteArrayOutputStream certificateOut = new ByteArrayOutputStream(); @@ -52,13 +65,13 @@ public void trustClusterCertificate(Map cfg) { } } - cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, - new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim()); - cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + certificates = new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim(); log.warnf("Certificate hosted at %s:%s is automatically trusted", hostport[0], hostport[1]); } catch (Exception e) { log.infof("Exception setting up trusted certificate: %s", e.getMessage()); } + + return certificates; } public X509Certificate[] getAcceptedIssuers() { diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 6f2baaade..c70fec245 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -63,8 +63,11 @@ console.kafka.admin.default.api.timeout.ms=10000 ######## #%dev.quarkus.http.auth.proactive=false #%dev.quarkus.http.auth.permission."oidc".policy=permit +%dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG +#%dev.quarkus.log.category."io.vertx.core.http".level=DEBUG +#%dev.quarkus.log.category."io.netty".level=DEBUG ######## %testplain.quarkus.devservices.enabled=true diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index ece32151f..879f392ab 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -56,7 +56,6 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; -import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuthBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512Builder; import io.strimzi.test.container.StrimziKafkaContainer; @@ -543,8 +542,7 @@ void testDescribeClusterWithCertificates() { * Create a Kafka CR with OAuth that proxies to kafka1. * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL */ - Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, - new KafkaListenerAuthenticationOAuthBuilder().build())) + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) .editStatus() .editMatchingListener(l -> "listener0".equals(l.getName())) .addToCertificates(""" diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index 6e91a6ddd..1f7f058fd 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -18,10 +18,12 @@ public class KafkaClusterConfig { @JsonIgnore public String clusterKey() { - if (namespace == null || namespace.isBlank()) { - return name; - } - return "%s/%s".formatted(namespace, name); + return hasNamespace() ? "%s/%s".formatted(namespace, name) : name; + } + + @JsonIgnore + public boolean hasNamespace() { + return namespace != null && !namespace.isBlank(); } public String getId() {