Skip to content

Commit

Permalink
Obtain credentials from request when missing configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Jul 19, 2024
1 parent bf8b726 commit dcda0db
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 63 deletions.
236 changes: 184 additions & 52 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -30,7 +31,9 @@
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.ws.rs.NotAuthorizedException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.UriInfo;

import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -44,6 +47,9 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
Expand All @@ -63,7 +69,11 @@
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthentication;
import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus;

/**
Expand All @@ -82,6 +92,20 @@
@ApplicationScoped
public class ClientFactory {

private static final String BEARER = "Bearer ";
private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ " required"
+ " oauth.access.token=\"%s\" ;";

private static final String BASIC = "Basic ";
private static final String SCRAM_SHA512 = "SCRAM-SHA-512";
private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName()
+ " required"
+ " username=\"%s\""
+ " password=\"%s\" ;";

static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName));
Expand All @@ -108,6 +132,9 @@ public class ClientFactory {
@Inject
Instance<TrustAllCertificateManager> trustManager;

@Inject
HttpHeaders headers;

@Inject
UriInfo requestUri;

Expand Down Expand Up @@ -275,7 +302,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,

Admin admin = null;

if (establishGlobalConnection(clusterConfig, adminConfigs)) {
if (establishGlobalConnection(adminConfigs)) {
admin = adminBuilder.apply(adminConfigs);
}

Expand Down Expand Up @@ -309,15 +336,15 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
}

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return kafkaInformer.map(SharedIndexInformer::getStore)
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> {
String key = clusterConfig.clusterKey();
Kafka resource = store.getByKey(key);
if (resource == null) {
log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key);
}
return resource;
});
}) : Optional.empty();
}

Map<String, Object> requiredAdminConfig() {
Expand Down Expand Up @@ -349,7 +376,7 @@ Map<String, Object> requiredProducerConfig() {
return configs;
}

static boolean establishGlobalConnection(KafkaClusterConfig clusterConfig, Map<String, Object> configs) {
static boolean establishGlobalConnection(Map<String, Object> configs) {
if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
return false;
}
Expand Down Expand Up @@ -405,30 +432,39 @@ public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,

return Optional.ofNullable(contexts.get(clusterId))
.map(ctx -> {
Admin admin = Optional.ofNullable(ctx.admin())
/*
* Admin may be null if credentials were not given
* in the configuration. The user must provide the
* login secrets in the request in that case.
*/
.orElseGet(() -> adminBuilder.apply(ctx.configs(Admin.class)));
return new KafkaContext(ctx, filter.apply(admin));
if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class));
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}

return ctx;
})
.orElseThrow(() -> noSuchKafka.apply(clusterId));
}

public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, KafkaContext> contexts) {
if (!contexts.values().contains(context)) {
log.infof("Closing out-of-date KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource()));
var clusterKey = context.clusterConfig().clusterKey();
if (context.applicationScoped()) {
log.infof("Closing out-of-date KafkaContext: %s", clusterKey);
} else {
log.debugf("Closing request-scoped KafkaContext: %s", clusterKey);
}
context.close();
}
}

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = context.configs(Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs);
var configs = maybeAuthenticate(context.configs(Consumer.class));
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}

Expand All @@ -439,15 +475,25 @@ public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consum
@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = context.configs(Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs);
var configs = maybeAuthenticate(context.configs(Producer.class));
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
}

Map<String, Object> maybeAuthenticate(Map<String, Object> configs) {
if (configs.containsKey(SaslConfigs.SASL_MECHANISM)
&& !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
configs = new HashMap<>(configs);
configureAuthentication(configs);
}

return configs;
}

Map<String, Object> buildConfig(Set<String> configNames,
KafkaClusterConfig config,
String clientType,
Expand All @@ -465,49 +511,43 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

if (!cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
cluster.map(Kafka::getStatus)
var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> listener.getName().equals(config.getListener()))
.findFirst();

var listenerStatus = cluster.map(Kafka::getStatus)
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> listener.getName().equals(config.getListener()))
.map(ListenerStatus::getBootstrapServers)
.findFirst()
.ifPresent(bootstrapServers -> cfg.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
.findFirst();

listenerSpec.ifPresent(listener -> applyListenerConfiguration(cfg, listener));

listenerStatus.map(ListenerStatus::getBootstrapServers)
.ifPresent(bootstrap -> cfg.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap));

listenerStatus.map(ListenerStatus::getCertificates)
.filter(Objects::nonNull)
.filter(Predicate.not(Collection::isEmpty))
.map(certificates -> String.join("\n", certificates).trim())
.ifPresent(certificates -> {
cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates);
});

if (truststoreRequired(cfg)) {
if (trustManager.isResolvable()) {
if (cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) && trustManager.isResolvable()) {
trustManager.get().trustClusterCertificate(cfg);
} else {
cluster.map(Kafka::getStatus)
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> {
if (listener.getName().equals(config.getListener())) {
return true;
}

return cfg.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "")
.toString()
.contains(listener.getBootstrapServers());
})
.map(ListenerStatus::getCertificates)
.filter(Objects::nonNull)
.filter(Predicate.not(Collection::isEmpty))
.findFirst()
.ifPresent(certificates -> {
cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, String.join("\n", certificates).trim());
});

if (truststoreRequired(cfg)) {
log.warnf("""
Failed to set configuration for %s client to Kafka cluster %s. Connection \
requires truststore which could not be obtained from the Kafka resource status."""
.formatted(clientType, config.clusterKey()));
}
log.warnf("""
Failed to set configuration for %s client to Kafka cluster %s. Connection \
requires truststore which could not be obtained from the Kafka resource status."""
.formatted(clientType, config.clusterKey()));
}
}

Expand All @@ -522,6 +562,42 @@ Map<String, Object> buildConfig(Set<String> configNames,
return cfg;
}

private void applyListenerConfiguration(Map<String, Object> cfg, GenericKafkaListener listener) {
var authType = Optional.ofNullable(listener.getAuth())
.map(KafkaListenerAuthentication::getType)
.orElse("");
boolean saslEnabled;

switch (authType) {
case "oauth":
saslEnabled = true;
cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, OAUTHBEARER);
cfg.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK);
break;
case "scram-sha-512":
cfg.putIfAbsent(SaslConfigs.SASL_MECHANISM, SCRAM_SHA512);
saslEnabled = true;
break;
default:
saslEnabled = false;
break;
}

StringBuilder protocol = new StringBuilder();

if (saslEnabled) {
protocol.append("SASL_");
}

if (listener.isTls()) {
protocol.append(SecurityProtocol.SSL.name);
} else {
protocol.append(SecurityProtocol.PLAINTEXT.name);
}

cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString());
}

private void resolveValues(Map<String, String> properties) {
properties.entrySet().forEach(entry ->
entry.setValue(resolveValue(entry.getValue())));
Expand Down Expand Up @@ -589,6 +665,62 @@ void logConfig(String clientType, Map<String, Object> config) {
}
}

void configureAuthentication(Map<String, Object> configs) {
switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) {
case OAUTHBEARER:
configureOAuthBearer(configs);
break;
case SCRAM_SHA512:
configureScram(configs);
break;
default:
throw new NotAuthorizedException("Unknown");
}
}

void configureOAuthBearer(Map<String, Object> configs) {
log.trace("SASL/OAUTHBEARER enabled");

configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK);
// Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin)
// May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS.
configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0");

String jaasConfig = getAuthorization(BEARER)
.map(SASL_OAUTH_CONFIG_TEMPLATE::formatted)
.orElseThrow(() -> new NotAuthorizedException(BEARER.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

void configureScram(Map<String, Object> configs) {
log.trace("SASL/SCRAM enabled");

String jaasConfig = getBasicAuthentication()
.map(SASL_SCRAM_CONFIG_TEMPLATE::formatted)
.orElseThrow(() -> new NotAuthorizedException(BASIC.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

Optional<String[]> getBasicAuthentication() {
return getAuthorization(BASIC)
.map(Base64.getDecoder()::decode)
.map(String::new)
.filter(authn -> authn.indexOf(':') >= 0)
.map(authn -> new String[] {
authn.substring(0, authn.indexOf(':')),
authn.substring(authn.indexOf(':') + 1)
})
.filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
}

Optional<String> getAuthorization(String scheme) {
return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION))
.filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
.map(header -> header.substring(scheme.length()));
}

private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
private static final Pattern EMBEDDED_STRING = Pattern.compile("\"[^\"]*\"");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ public class KafkaContext implements Closeable {
final Kafka resource;
final Map<Class<?>, Map<String, Object>> configs;
final Admin admin;
boolean applicationScoped;

public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map<Class<?>, Map<String, Object>> configs, Admin admin) {
this.clusterConfig = clusterConfig;
this.resource = resource;
this.configs = Map.copyOf(configs);
this.admin = admin;
this.applicationScoped = true;
}

public KafkaContext(KafkaContext other, Admin admin) {
this(other.clusterConfig, other.resource, other.configs, admin);
this.applicationScoped = false;
}

@Override
Expand Down Expand Up @@ -75,4 +78,8 @@ public Map<String, Object> configs(Class<?> type) {
public Admin admin() {
return admin;
}

public boolean applicationScoped() {
return applicationScoped;
}
}
Loading

0 comments on commit dcda0db

Please sign in to comment.