Skip to content

Commit

Permalink
Refine ClientFactory config handling, add SCRAM/OAuth tests
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Aug 2, 2024
1 parent c8e0cfe commit 53655cc
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 91 deletions.
112 changes: 81 additions & 31 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -228,7 +229,7 @@ Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
adminBuilder,
false));

return contexts;
return Collections.unmodifiableMap(contexts);
}

void addKafkaEventHandler(Map<String, KafkaContext> contexts,
Expand All @@ -237,32 +238,52 @@ void addKafkaEventHandler(Map<String, KafkaContext> contexts,

kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler<Kafka>() {
public void onAdd(Kafka kafka) {
findConfig(kafka).ifPresent(clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.of(kafka),
adminBuilder,
false));
if (log.isDebugEnabled()) {
log.debugf("Kafka resource %s added", Cache.metaNamespaceKeyFunc(kafka));
}
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
if (defaultedClusterId(clusterConfig, Optional.of(kafka))) {
log.debugf("Ignoring added Kafka resource %s, cluster ID not yet available and not provided via configuration",
Cache.metaNamespaceKeyFunc(kafka));
} else {
putKafkaContext(contexts,
clusterConfig,
Optional.of(kafka),
adminBuilder,
false);
}
},
() -> log.debugf("Ignoring added Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka)));
}

public void onUpdate(Kafka oldKafka, Kafka newKafka) {
findConfig(newKafka).ifPresent(clusterConfig -> putKafkaContext(contexts,
if (log.isDebugEnabled()) {
log.debugf("Kafka resource %s updated", Cache.metaNamespaceKeyFunc(oldKafka));
}
findConfig(newKafka).ifPresentOrElse(
clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.of(newKafka),
adminBuilder,
true));
true),
() -> log.debugf("Ignoring updated Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(newKafka)));
}

public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) {
findConfig(kafka).ifPresent(clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
String clusterId = Optional.ofNullable(clusterConfig.getId())
.or(() -> Optional.ofNullable(kafka.getStatus()).map(KafkaStatus::getClusterId))
.orElse(null);
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
});
if (log.isDebugEnabled()) {
log.debugf("Kafka resource %s deleted", Cache.metaNamespaceKeyFunc(kafka));
}
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, Optional.of(kafka));
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
},
() -> log.debugf("Ignoring deleted Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka)));
}

Optional<KafkaClusterConfig> findConfig(Kafka kafka) {
Expand Down Expand Up @@ -305,9 +326,9 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
kafkaResource);

Map<Class<?>, Map<String, Object>> clientConfigs = new HashMap<>();
clientConfigs.put(Admin.class, adminConfigs);
clientConfigs.put(Consumer.class, consumerConfigs);
clientConfigs.put(Producer.class, producerConfigs);
clientConfigs.put(Admin.class, Collections.unmodifiableMap(adminConfigs));
clientConfigs.put(Consumer.class, Collections.unmodifiableMap(consumerConfigs));
clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs));

Admin admin = null;

Expand All @@ -316,29 +337,58 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
}

String clusterKey = clusterConfig.clusterKey();
String clusterId = Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElseGet(clusterConfig::getName);
String clusterId = clusterId(clusterConfig, kafkaResource);

if (!replace && contexts.containsKey(clusterId)) {
log.warnf("""
Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \
configuration must be unique and may not match id values of \
clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey);
} else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) {
if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) {
} else {
boolean truststoreNowRequired = truststoreRequired(adminConfigs);

if (resourceStatusDroppedCertificates(contexts.get(clusterId), kafkaResource, truststoreNowRequired)) {
log.warnf("""
Ignoring update to Kafka custom resource %s. Connection requires \
trusted certificate which is no longer available.""", clusterKey);
} else {
if (truststoreNowRequired && kafkaResource.isPresent()) {
log.warnf("""
Connection requires trusted certificate(s) which are not present \
in the Kafka CR status of resource %s.""", clusterKey);
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId);
KafkaContext previous = contexts.put(clusterId, ctx);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
}
} else {
KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId);
KafkaContext previous = contexts.put(clusterId, ctx);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
}
}

boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource) {
return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty();
}

String clusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource) {
return Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElseGet(clusterConfig::getName);
}

/**
* Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being
* removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore
* this update and keep the old KafkaContext.
*/
boolean resourceStatusDroppedCertificates(KafkaContext context, Optional<Kafka> kafkaResource, boolean truststoreNowRequired) {
if (!truststoreNowRequired || context == null || kafkaResource.isEmpty()) {
return false;
}

return !truststoreRequired(context.configs(Admin.class));
}

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> store.getByKey(clusterConfig.clusterKey()))
Expand Down
Loading

0 comments on commit 53655cc

Please sign in to comment.