diff --git a/api/pom.xml b/api/pom.xml index 21146d1ad..d2ee65918 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -179,11 +179,6 @@ docker-java-api test - - org.junit.platform - junit-platform-launcher - test - io.strimzi strimzi-test-container diff --git a/api/src/main/java/com/github/streamshub/console/api/Annotations.java b/api/src/main/java/com/github/streamshub/console/api/Annotations.java index 5cf13832c..988083660 100644 --- a/api/src/main/java/com/github/streamshub/console/api/Annotations.java +++ b/api/src/main/java/com/github/streamshub/console/api/Annotations.java @@ -14,17 +14,7 @@ public enum Annotations { * Annotation to identify a listener in Strimzi Kafka resources to be used for * connections directly from the Console API. */ - CONSOLE_LISTENER("console-listener"), - - /** - * Annotation to identify a listener in Strimzi Kafka resources to be used for - * public connections. This may be used to differentiate a listener to be - * exposed via the KafkaCluster resource and published in the UI. - * - * @deprecated - */ - @Deprecated(forRemoval = true) - EXPOSED_LISTENER("exposed-listener"); + CONSOLE_LISTENER("console-listener"); private static final String NAMESPACE = "streamshub.github.com"; private final String value; diff --git a/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java b/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java index 4ae3b85e5..7109aba39 100644 --- a/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java @@ -1,7 +1,6 @@ package com.github.streamshub.console.api; import java.util.concurrent.CompletionStage; -import java.util.function.Supplier; import jakarta.inject.Inject; import jakarta.ws.rs.GET; @@ -11,7 +10,6 @@ import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; -import org.apache.kafka.clients.admin.Admin; import org.eclipse.microprofile.openapi.annotations.media.Content; import org.eclipse.microprofile.openapi.annotations.parameters.Parameter; import org.eclipse.microprofile.openapi.annotations.responses.APIResponse; @@ -24,9 +22,6 @@ @Tag(name = "Kafka Cluster Resources") public class BrokersResource { - @Inject - Supplier clientSupplier; - @Inject BrokerService brokerService; diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 3b07aaa1b..2e2893eeb 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -10,11 +10,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -27,7 +31,6 @@ import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.ws.rs.NotFoundException; -import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.UriInfo; import org.apache.kafka.clients.CommonClientConfigs; @@ -39,6 +42,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -49,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.service.KafkaClusterService; +import com.github.streamshub.console.api.support.Holder; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; @@ -86,12 +92,15 @@ public class ClientFactory { @Inject Config config; + @Inject + ScheduledExecutorService scheduler; + @Inject @ConfigProperty(name = "console.config-path") Optional configPath; @Inject - SharedIndexInformer kafkaInformer; + Holder> kafkaInformer; @Inject KafkaClusterService kafkaClusterService; @@ -99,9 +108,6 @@ public class ClientFactory { @Inject Instance trustManager; - @Inject - HttpHeaders headers; - @Inject UriInfo requestUri; @@ -165,152 +171,265 @@ public ConsoleConfig produceConsoleConfig() { }); } - /** - * Provides the Strimzi Kafka custom resource addressed by the current request - * URL as an injectable bean. This allows for the Kafka to be obtained by - * application logic without an additional lookup. - * - * @return a supplier that gives the Strimzi Kafka CR specific to the current - * request - * @throws IllegalStateException when an attempt is made to access an injected - * Kafka Supplier but the current request does not - * include the Kafka clusterId path parameter. - * @throws NotFoundException when the provided Kafka clusterId does not - * match any known Kafka cluster. - */ @Produces - @RequestScoped - public Supplier kafkaResourceSupplier() { - String clusterId = requestUri.getPathParameters().getFirst("clusterId"); + @ApplicationScoped + Map produceKafkaContexts(ConsoleConfig consoleConfig, + Function, Admin> adminBuilder) { - if (clusterId == null) { - throw new IllegalStateException("Admin client was accessed, " - + "but the requested operation does not provide a Kafka cluster ID"); - } + final Map contexts = new ConcurrentHashMap<>(); - Kafka cluster = kafkaClusterService.findCluster(clusterId) - .orElseThrow(() -> noSuchKafka.apply(clusterId)); + if (kafkaInformer.isPresent()) { + addKafkaEventHandler(contexts, consoleConfig, adminBuilder); + } - return () -> cluster; + // Configure clusters that will not be configured by events + consoleConfig.getKafka().getClusters() + .stream() + .filter(c -> cachedKafkaResource(c).isEmpty()) + .forEach(clusterConfig -> putKafkaContext(contexts, + clusterConfig, + Optional.empty(), + adminBuilder, + false)); + + return contexts; } - @Produces - @ApplicationScoped - Map getAdmins(ConsoleConfig consoleConfig, Function, Admin> adminBuilder) { - final Map adminClients = new HashMap<>(); + void addKafkaEventHandler(Map contexts, + ConsoleConfig consoleConfig, + Function, Admin> adminBuilder) { - kafkaInformer.addEventHandlerWithResyncPeriod(new ResourceEventHandler() { + kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler() { public void onAdd(Kafka kafka) { - put(kafka, "Adding"); + findConfig(kafka).ifPresent(clusterConfig -> putKafkaContext(contexts, + clusterConfig, + Optional.of(kafka), + adminBuilder, + false)); } public void onUpdate(Kafka oldKafka, Kafka newKafka) { - put(newKafka, "Updating"); + findConfig(newKafka).ifPresent(clusterConfig -> putKafkaContext(contexts, + clusterConfig, + Optional.of(newKafka), + adminBuilder, + true)); } - private void put(Kafka kafka, String eventType) { - String clusterKey = Cache.metaNamespaceKeyFunc(kafka); - - consoleConfig.getKafka() - .getCluster(clusterKey) - .map(e -> { - var configs = buildConfig(AdminClientConfig.configNames(), e, "admin", e::getAdminProperties, kafka); - - if (truststoreRequired(configs)) { - log.warnf(""" - %s Admin client for Kafka cluster %s failed. Connection \ - requires truststore which could not be obtained from the \ - Kafka resource status. - """ - .formatted(eventType, kafka.getStatus().getClusterId())); - return null; - } else { - logConfig("Admin[name=%s, namespace=%s, id=%s]".formatted( - e.getName(), - e.getNamespace(), - kafka.getStatus().getClusterId()), - configs); - return adminBuilder.apply(configs); - } - }) - .ifPresent(client -> { - log.info("%s Admin client for Kafka cluster %s".formatted(eventType, kafka.getStatus().getClusterId())); - Admin previous = adminClients.put(clusterKey, client); - Optional.ofNullable(previous).ifPresent(Admin::close); - }); + public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { + findConfig(kafka).ifPresent(clusterConfig -> { + String clusterKey = clusterConfig.clusterKey(); + String clusterId = Optional.ofNullable(clusterConfig.getId()) + .or(() -> Optional.ofNullable(kafka.getStatus()).map(KafkaStatus::getClusterId)) + .orElse(null); + log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); + log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); + KafkaContext previous = contexts.remove(clusterId); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); + }); } - public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { + Optional findConfig(Kafka kafka) { String clusterKey = Cache.metaNamespaceKeyFunc(kafka); - log.info("Removing Admin client for Kafka cluster %s".formatted(kafka.getStatus().getClusterId())); - Admin admin = adminClients.remove(clusterKey); - Optional.ofNullable(admin).ifPresent(Admin::close); + return consoleConfig.getKafka().getCluster(clusterKey); } }, TimeUnit.MINUTES.toMillis(1)); + } + + void putKafkaContext(Map contexts, + KafkaClusterConfig clusterConfig, + Optional kafkaResource, + Function, Admin> adminBuilder, + boolean replace) { + + var adminConfigs = buildConfig(AdminClientConfig.configNames(), + clusterConfig, + "admin", + clusterConfig::getAdminProperties, + requiredAdminConfig(), + kafkaResource); + + Set configNames = ConsumerConfig.configNames().stream() + // Do not allow a group Id to be set for this application + .filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals)) + .collect(Collectors.toSet()); + + var consumerConfigs = buildConfig(configNames, + clusterConfig, + "consumer", + clusterConfig::getConsumerProperties, + requiredConsumerConfig(), + kafkaResource); + + var producerConfigs = buildConfig(ProducerConfig.configNames(), + clusterConfig, + "producer", + clusterConfig::getProducerProperties, + requiredProducerConfig(), + kafkaResource); + + Map, Map> clientConfigs = new HashMap<>(); + clientConfigs.put(Admin.class, adminConfigs); + clientConfigs.put(Consumer.class, consumerConfigs); + clientConfigs.put(Producer.class, producerConfigs); + + Admin admin = null; + + if (establishGlobalConnection(clusterConfig, adminConfigs)) { + admin = adminBuilder.apply(adminConfigs); + } + + String clusterKey = clusterConfig.clusterKey(); + String clusterId = Optional.ofNullable(clusterConfig.getId()) + .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) + .orElse(null); + + if (clusterId == null) { + log.warnf(""" + Ignoring Kafka cluster %s. Cluster id value missing in \ + configuration and no Strimzi Kafka resources found with matching \ + name and namespace.""", clusterKey); + } else if (!replace && contexts.containsKey(clusterId)) { + log.warnf(""" + Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ + configuration must be unique and may not match id values of \ + clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); + } else if (truststoreRequired(adminConfigs)) { + if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) { + log.warnf(""" + Ignoring update to Kafka custom resource %s. Connection requires \ + trusted certificate which is no longer available.""", clusterKey); + } + } else { + KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); + log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); + KafkaContext previous = contexts.put(clusterId, ctx); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); + } + } + + Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { + return kafkaInformer.map(SharedIndexInformer::getStore) + .map(store -> { + String key = clusterConfig.clusterKey(); + Kafka resource = store.getByKey(key); + if (resource == null) { + log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key); + } + return resource; + }); + } + + Map requiredAdminConfig() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + return configs; + } - return adminClients; + Map requiredConsumerConfig() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000); + configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); + return configs; } - void closeAdmins(@Disposes Map admins) { - admins.values().parallelStream().forEach(admin -> { + Map requiredProducerConfig() { + Map configs = new HashMap<>(); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); + configs.put(ProducerConfig.RETRIES_CONFIG, 0); + return configs; + } + + static boolean establishGlobalConnection(KafkaClusterConfig clusterConfig, Map configs) { + if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { + return false; + } + + if (truststoreRequired(configs)) { + return false; + } + + if (configs.containsKey(SaslConfigs.SASL_MECHANISM)) { + return configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG); + } + + return false; + } + + void disposeKafkaContexts(@Disposes Map contexts) { + log.infof("Closing all known KafkaContexts"); + + contexts.values().parallelStream().forEach(context -> { + log.infof("Closing KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource())); try { - admin.close(); + context.close(); } catch (Exception e) { - log.warnf("Exception occurred closing admin: %s", e.getMessage()); + log.warnf("Exception occurred closing context: %s", e.getMessage()); } }); } + /** + * Provides the Strimzi Kafka custom resource addressed by the current request + * URL as an injectable bean. This allows for the Kafka to be obtained by + * application logic without an additional lookup. + * + * @return a supplier that gives the Strimzi Kafka CR specific to the current + * request + * @throws IllegalStateException when an attempt is made to access an injected + * Kafka Supplier but the current request does not + * include the Kafka clusterId path parameter. + * @throws NotFoundException when the provided Kafka clusterId does not + * match any known Kafka cluster. + */ @Produces @RequestScoped - public Supplier adminClientSupplier(Supplier cluster, Map admins, UnaryOperator filter) { - String clusterKey = Cache.metaNamespaceKeyFunc(cluster.get()); + public KafkaContext produceKafkaContext(Map contexts, + UnaryOperator filter, + Function, Admin> adminBuilder) { - return Optional.ofNullable(admins.get(clusterKey)) - .map(filter::apply) - .>map(client -> () -> client) - .orElseThrow(() -> noSuchKafka.apply(cluster.get().getStatus().getClusterId())); - } + String clusterId = requestUri.getPathParameters().getFirst("clusterId"); - public void adminClientDisposer(@Disposes Supplier client, Map admins) { - Admin admin = client.get(); + if (clusterId == null) { + return KafkaContext.EMPTY; + } - if (!admins.values().contains(admin)) { - admin.close(); + return Optional.ofNullable(contexts.get(clusterId)) + .map(ctx -> { + Admin admin = Optional.ofNullable(ctx.admin()) + /* + * Admin may be null if credentials were not given + * in the configuration. The user must provide the + * login secrets in the request in that case. + */ + .orElseGet(() -> adminBuilder.apply(ctx.configs(Admin.class))); + return new KafkaContext(ctx, filter.apply(admin)); + }) + .orElseThrow(() -> noSuchKafka.apply(clusterId)); + } + + public void disposeKafkaContext(@Disposes KafkaContext context, Map contexts) { + if (!contexts.values().contains(context)) { + log.infof("Closing out-of-date KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource())); + context.close(); } } @Produces @RequestScoped - public Supplier> consumerSupplier(ConsoleConfig consoleConfig, Supplier cluster) { - String clusterKey = Cache.metaNamespaceKeyFunc(cluster.get()); - - return consoleConfig.getKafka() - .getCluster(clusterKey) - .>>map(e -> { - - Set configNames = ConsumerConfig.configNames().stream() - // Do not allow a group Id to be set for this application - .filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals)) - .collect(Collectors.toSet()); - - var configs = buildConfig(configNames, e, "consumer", e::getConsumerProperties, cluster.get()); - configs.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000); - configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); - - logConfig("Consumer[name=%s, namespace=%s]".formatted( - e.getName(), - e.getNamespace()), - configs); - @SuppressWarnings("resource") // no resource leak - client closed by disposer - Consumer client = new KafkaConsumer<>(configs); - return () -> client; - }) - .orElseThrow(() -> noSuchKafka.apply(cluster.get().getStatus().getClusterId())); + public Supplier> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + var configs = context.configs(Consumer.class); + Consumer client = new KafkaConsumer<>(configs); + return () -> client; } public void consumerDisposer(@Disposes Supplier> consumer) { @@ -319,29 +438,10 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped - public Supplier> producerSupplier(ConsoleConfig consoleConfig, Supplier cluster) { - String clusterKey = Cache.metaNamespaceKeyFunc(cluster.get()); - - return consoleConfig.getKafka() - .getCluster(clusterKey) - .>>map(e -> { - var configs = buildConfig(ProducerConfig.configNames(), e, "producer", e::getProducerProperties, cluster.get()); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - configs.put(ProducerConfig.ACKS_CONFIG, "all"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); - configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); - configs.put(ProducerConfig.RETRIES_CONFIG, 0); - - logConfig("Producer[name=%s, namespace=%s]".formatted( - e.getName(), - e.getNamespace()), - configs); - @SuppressWarnings("resource") // no resource leak - client closed by disposer - Producer client = new KafkaProducer<>(configs); - return () -> client; - }) - .orElseThrow(() -> noSuchKafka.apply(cluster.get().getStatus().getClusterId())); + public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + var configs = context.configs(Producer.class); + Producer client = new KafkaProducer<>(configs); + return () -> client; } public void producerDisposer(@Disposes Supplier> producer) { @@ -352,7 +452,8 @@ Map buildConfig(Set configNames, KafkaClusterConfig config, String clientType, Supplier> clientProperties, - Kafka cluster) { + Map overrideProperties, + Optional cluster) { Map cfg = configNames .stream() @@ -362,10 +463,10 @@ Map buildConfig(Set configNames, .map(configValue -> Map.entry(configName, configValue))) .filter(Optional::isPresent) .map(Optional::get) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); if (!cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { - Optional.ofNullable(cluster.getStatus()) + cluster.map(Kafka::getStatus) .map(KafkaStatus::getListeners) .map(Collection::stream) .orElseGet(Stream::empty) @@ -379,7 +480,7 @@ Map buildConfig(Set configNames, if (trustManager.isResolvable()) { trustManager.get().trustClusterCertificate(cfg); } else { - Optional.ofNullable(cluster.getStatus()) + cluster.map(Kafka::getStatus) .map(KafkaStatus::getListeners) .map(Collection::stream) .orElseGet(Stream::empty) @@ -400,9 +501,24 @@ Map buildConfig(Set configNames, cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, String.join("\n", certificates).trim()); }); + + if (truststoreRequired(cfg)) { + log.warnf(""" + Failed to set configuration for %s client to Kafka cluster %s. Connection \ + requires truststore which could not be obtained from the Kafka resource status.""" + .formatted(clientType, config.clusterKey())); + } } } + cfg.putAll(overrideProperties); + + logConfig("%s[key=%s, id=%s]".formatted( + clientType, + config.clusterKey(), + cluster.map(Kafka::getStatus).map(KafkaStatus::getClusterId).orElse("UNKNOWN")), + cfg); + return cfg; } @@ -442,7 +558,7 @@ String unquote(String cfg) { return BOUNDARY_QUOTES.matcher(cfg).replaceAll(""); } - boolean truststoreRequired(Map cfg) { + static boolean truststoreRequired(Map cfg) { if (cfg.containsKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)) { return false; } @@ -453,15 +569,26 @@ boolean truststoreRequired(Map cfg) { } void logConfig(String clientType, Map config) { - if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { String msg = config.entrySet() .stream() - .map(entry -> "\t%s = %s".formatted(entry.getKey(), entry.getValue())) + .map(entry -> { + String value = String.valueOf(entry.getValue()); + + if (SaslConfigs.SASL_JAAS_CONFIG.equals(entry.getKey())) { + // Mask sensitive information in saas.jaas.config + Matcher m = EMBEDDED_STRING.matcher(value); + value = m.replaceAll("\"******\""); + } + + return "\t%s = %s".formatted(entry.getKey(), value); + }) .collect(Collectors.joining("\n", "%s configuration:\n", "")); - log.debugf(msg, clientType); + + log.tracef(msg, clientType); } } private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)"); - + private static final Pattern EMBEDDED_STRING = Pattern.compile("\"[^\"]*\""); } diff --git a/api/src/main/java/com/github/streamshub/console/api/errors/server/KafkaServerExceptionHandlers.java b/api/src/main/java/com/github/streamshub/console/api/errors/server/KafkaServerExceptionHandlers.java new file mode 100644 index 000000000..0f3f7d3fa --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/errors/server/KafkaServerExceptionHandlers.java @@ -0,0 +1,29 @@ +package com.github.streamshub.console.api.errors.server; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.ext.Provider; + +import org.apache.kafka.common.errors.SslAuthenticationException; + +import com.github.streamshub.console.api.support.ErrorCategory; + +public class KafkaServerExceptionHandlers { + + private KafkaServerExceptionHandlers() { + } + + @Provider + @ApplicationScoped + public static class AuthenticationExceptionHandler + extends AbstractServerExceptionHandler { + + public AuthenticationExceptionHandler() { + super(ErrorCategory.ServerError.class); + } + + @Override + public boolean handlesException(Throwable thrown) { + return thrown instanceof SslAuthenticationException; + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index ad82d0f0c..46da46f1a 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -55,13 +55,7 @@ NAMESPACE, comparing(KafkaCluster::getNamespace), public static final String LIST_DEFAULT = NAME + ", " - + NAMESPACE + ", " - + CREATION_TIMESTAMP + ", " - + LISTENERS + ", " - + KAFKA_VERSION + ", " - + STATUS + ", " - + CONDITIONS + ", " - + NODE_POOLS; + + NAMESPACE; public static final String DESCRIBE_DEFAULT = NAME + ", " @@ -97,6 +91,7 @@ public ListResponse(List data, ListRequestContext li var rsrc = new KafkaClusterResource(entry); rsrc.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); rsrc.addMeta("configured", entry.isConfigured()); + rsrc.addMeta("managed", entry.isManaged()); return rsrc; }) .toList()); @@ -116,6 +111,8 @@ public SingleResponse(KafkaCluster data) { public static final class KafkaClusterResource extends Resource { public KafkaClusterResource(KafkaCluster data) { super(data.id, "kafkas", data); + addMeta("configured", data.isConfigured()); + addMeta("managed", data.isManaged()); } } @@ -123,7 +120,7 @@ public KafkaClusterResource(KafkaCluster data) { String namespace; // Strimzi Kafka CR only String creationTimestamp; // Strimzi Kafka CR only @JsonIgnore - final String id; + String id; // non-final, may be overridden by configuration final List nodes; final Node controller; final List authorizedOperations; @@ -137,6 +134,8 @@ public KafkaClusterResource(KafkaCluster data) { @JsonIgnore boolean configured; List nodePools; + @JsonIgnore + boolean managed; public KafkaCluster(String id, List nodes, Node controller, List authorizedOperations) { super(); @@ -211,6 +210,10 @@ public String getId() { return id; } + public void setId(String id) { + this.id = id; + } + public List getNodes() { return nodes; } @@ -270,4 +273,12 @@ public List getNodePools() { public void setNodePools(List nodePools) { this.nodePools = nodePools; } + + public void setManaged(boolean managed) { + this.managed = managed; + } + + public boolean isManaged() { + return managed; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java b/api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java index 33741dbc9..ed7cb3409 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java @@ -9,7 +9,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.function.Supplier; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -23,17 +22,18 @@ import com.github.streamshub.console.api.model.ConfigEntry; import com.github.streamshub.console.api.model.Either; +import com.github.streamshub.console.api.support.KafkaContext; @ApplicationScoped public class ConfigService { @Inject - Supplier clientSupplier; + KafkaContext kafkaContext; public CompletionStage> describeConfigs(ConfigResource.Type type, String name) { ConfigResource nodeKey = new ConfigResource(type, name); - return describeConfigs(clientSupplier.get(), List.of(nodeKey)) + return describeConfigs(kafkaContext.admin(), List.of(nodeKey)) .thenApply(configs -> configs.get(name)) .thenApply(configs -> configs.getOrThrow(CompletionException::new)); } @@ -52,7 +52,7 @@ public CompletionStage> describeConfigs(ConfigResource. * complete */ public CompletionStage alterConfigs(ConfigResource.Type type, String name, Map alteredConfigs, boolean validateOnly) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); var resourceKey = new ConfigResource(type, name); return adminClient.incrementalAlterConfigs(Map.of(resourceKey, fromMap(alteredConfigs)), new AlterConfigsOptions() diff --git a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java index e576fb702..7219a9666 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java @@ -53,6 +53,7 @@ import com.github.streamshub.console.api.model.Topic; import com.github.streamshub.console.api.support.ConsumerGroupValidation; import com.github.streamshub.console.api.support.FetchFilterPredicate; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.KafkaOffsetSpec; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.UnknownTopicIdPatch; @@ -84,7 +85,7 @@ public class ConsumerGroupService { ThreadContext threadContext; @Inject - Supplier clientSupplier; + KafkaContext kafkaContext; @Inject TopicService topicService; @@ -99,7 +100,7 @@ public CompletionStage> listConsumerGroups(List incl public CompletionStage> listConsumerGroups(String topicId, List includes, ListRequestContext listSupport) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); Uuid id = Uuid.fromString(topicId); Executor asyncExec = threadContext.currentContextExecutor(); @@ -120,7 +121,7 @@ public CompletionStage> listConsumerGroups(String topicId, L } CompletionStage> listConsumerGroups(List groupIds, List includes, ListRequestContext listSupport) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); Set states = listSupport.filters() .stream() @@ -157,7 +158,7 @@ CompletionStage> listConsumerGroups(List groupIds, L } public CompletionStage describeConsumerGroup(String requestGroupId, List includes) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); String groupId = preprocessGroupId(requestGroupId); return assertConsumerGroupExists(adminClient, groupId) @@ -167,7 +168,7 @@ public CompletionStage describeConsumerGroup(String requestGroupI } public CompletionStage>> listConsumerGroupMembership(Collection topicIds) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); return adminClient.listConsumerGroups(new ListConsumerGroupsOptions() .inStates(Set.of( @@ -209,7 +210,7 @@ public CompletionStage>> listConsumerGroupMembership(Co } public CompletionStage patchConsumerGroup(ConsumerGroup patch) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); String groupId = preprocessGroupId(patch.getGroupId()); return assertConsumerGroupExists(adminClient, groupId) @@ -389,7 +390,7 @@ static CompletableFuture allOf(Collection deleteConsumerGroup(String requestGroupId) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); String groupId = preprocessGroupId(requestGroupId); return adminClient.deleteConsumerGroups(List.of(groupId)) diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index 69d4ba0c2..e51aa1f8e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -3,11 +3,13 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.enterprise.context.ApplicationScoped; @@ -17,6 +19,7 @@ import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.common.KafkaFuture; +import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; import com.github.streamshub.console.api.Annotations; @@ -24,6 +27,8 @@ import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.KafkaListener; import com.github.streamshub.console.api.model.Node; +import com.github.streamshub.console.api.support.Holder; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.config.ConsoleConfig; @@ -46,35 +51,80 @@ public class KafkaClusterService { @Inject Logger logger; + /** + * ThreadContext of the request thread. This is used to execute asynchronous + * tasks to allow access to request-scoped beans. + */ @Inject - SharedIndexInformer kafkaInformer; + ThreadContext threadContext; + + @Inject + Holder> kafkaInformer; @Inject ConsoleConfig consoleConfig; @Inject - Supplier clientSupplier; + /** + * All Kafka contexts known to the application + */ + Map kafkaContexts; + + @Inject + /** + * Kafka context for a single-Kafka request. + * E.g. {@linkplain #describeCluster(List) describeCluster}. + */ + KafkaContext kafkaContext; boolean listUnconfigured = false; Predicate includeAll = k -> listUnconfigured; public List listClusters(ListRequestContext listSupport) { - return kafkaInformer.getStore() - .list() - .stream() - .filter(Predicate.not(k -> annotatedKafka(k, Annotations.CONSOLE_HIDDEN))) + List kafkaResources = kafkaResources() .map(this::toKafkaCluster) - .filter(includeAll.or(KafkaCluster::isConfigured)) // Hide unconfigured clusters for now. + // Hide unconfigured clusters for now. + .filter(includeAll.or(KafkaCluster::isConfigured)) + .toList(); + + Map configuredClusters = kafkaContexts + .entrySet() + .stream() + .map(ctx -> { + String id = ctx.getKey(); + var config = ctx.getValue().clusterConfig(); + + return kafkaResources.stream() + .filter(k -> Objects.equals(k.getName(), config.getName())) + .filter(k -> Objects.equals(k.getNamespace(), config.getNamespace())) + .map(k -> addKafkaContextData(k, ctx.getValue())) + .findFirst() + .orElseGet(() -> { + var k = new KafkaCluster(id, null, null, null); + k.setConfigured(true); + k.setName(config.getName()); + k.setNamespace(config.getNamespace()); + return k; + }); + }) + .collect(Collectors.toMap(KafkaCluster::getId, Function.identity())); + + List otherClusters = kafkaResources.stream() + .filter(k -> !configuredClusters.containsKey(k.getId())) + .toList(); + + return Stream.concat(configuredClusters.values().stream(), otherClusters.stream()) .map(listSupport::tally) .filter(listSupport::betweenCursors) .sorted(listSupport.getSortComparator()) .dropWhile(listSupport::beforePageBegin) .takeWhile(listSupport::pageCapacityAvailable) + .map(this::setManaged) .toList(); } public CompletionStage describeCluster(List fields) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); DescribeClusterOptions options = new DescribeClusterOptions() .includeAuthorizedOperations(fields.contains(KafkaCluster.Fields.AUTHORIZED_OPERATIONS)); DescribeClusterResult result = adminClient.describeCluster(options); @@ -90,7 +140,9 @@ public CompletionStage describeCluster(List fields) { get(result::nodes).stream().map(Node::fromKafkaModel).toList(), Node.fromKafkaModel(get(result::controller)), enumNames(get(result::authorizedOperations)))) - .thenApply(this::addKafkaResourceData); + .thenApplyAsync(this::addKafkaContextData, threadContext.currentContextExecutor()) + .thenApply(this::addKafkaResourceData) + .thenApply(this::setManaged); } KafkaCluster toKafkaCluster(Kafka kafka) { @@ -104,10 +156,24 @@ KafkaCluster toKafkaCluster(Kafka kafka) { return cluster; } - KafkaCluster addKafkaResourceData(KafkaCluster cluster) { - findCluster(cluster.getId()) - .ifPresent(kafka -> setKafkaClusterProperties(cluster, kafka)); + KafkaCluster addKafkaContextData(KafkaCluster cluster, KafkaContext kafkaContext) { + var config = kafkaContext.clusterConfig(); + cluster.setConfigured(true); + if (config.getId() != null) { + // configuration has overridden the id + cluster.setId(config.getId()); + } + cluster.setName(config.getName()); + cluster.setNamespace(config.getNamespace()); + return cluster; + } + + KafkaCluster addKafkaContextData(KafkaCluster cluster) { + return addKafkaContextData(cluster, kafkaContext); + } + KafkaCluster addKafkaResourceData(KafkaCluster cluster) { + findCluster(cluster).ifPresent(kafka -> setKafkaClusterProperties(cluster, kafka)); return cluster; } @@ -116,11 +182,8 @@ void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { cluster.setNamespace(kafka.getMetadata().getNamespace()); cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); - @SuppressWarnings("removal") var comparator = Comparator .comparingInt((GenericKafkaListener listener) -> - listenerSortKey(listener, Annotations.EXPOSED_LISTENER)) - .thenComparingInt((GenericKafkaListener listener) -> listenerSortKey(listener, Annotations.CONSOLE_LISTENER)) .thenComparingInt((GenericKafkaListener listener) -> { if (KafkaListenerType.INTERNAL.equals(listener.getType())) { @@ -166,15 +229,32 @@ void setKafkaClusterStatus(KafkaCluster cluster, Kafka kafka) { }); } - public Optional findCluster(String clusterId) { - return kafkaInformer.getStore() - .list() - .stream() - .filter(k -> Objects.equals(clusterId, k.getStatus().getClusterId())) - .filter(Predicate.not(k -> annotatedKafka(k, Annotations.CONSOLE_HIDDEN))) + KafkaCluster setManaged(KafkaCluster cluster) { + cluster.setManaged(findCluster(cluster) + .map(kafkaTopic -> Boolean.TRUE) + .orElse(Boolean.FALSE)); + return cluster; + } + + private Optional findCluster(KafkaCluster cluster) { + return findCluster(Cache.namespaceKeyFunc(cluster.getNamespace(), cluster.getName())); + } + + private Optional findCluster(String clusterKey) { + return kafkaResources() + .filter(k -> Objects.equals(clusterKey, Cache.metaNamespaceKeyFunc(k))) .findFirst(); } + private Stream kafkaResources() { + return kafkaInformer.map(informer -> informer + .getStore() + .list() + .stream() + .filter(Predicate.not(k -> annotatedKafka(k, Annotations.CONSOLE_HIDDEN)))) + .orElseGet(Stream::empty); + } + static int listenerSortKey(GenericKafkaListener listener, Annotations listenerAnnotation) { return annotatedListener(listener, listenerAnnotation) ? -1 : 1; } @@ -207,16 +287,6 @@ static Optional listenerStatus(Kafka kafka, GenericKafkaListener .findFirst(); } - public static Optional getAuthType(Kafka kafka, ListenerStatus listener) { - return kafka.getSpec() - .getKafka() - .getListeners() - .stream() - .filter(sl -> sl.getName().equals(listener.getName())) - .findFirst() - .flatMap(KafkaClusterService::getAuthType); - } - static Optional getAuthType(GenericKafkaListener listener) { return Optional.of(listener) .map(GenericKafkaListener::getAuth) diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 42169071e..0b8c40a3d 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -31,7 +31,6 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.Consumer; @@ -50,6 +49,7 @@ import org.jboss.logging.Logger; import com.github.streamshub.console.api.model.KafkaRecord; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.SizeLimitedSortedSet; import static java.util.Objects.requireNonNullElse; @@ -64,7 +64,7 @@ public class RecordService { Logger logger; @Inject - Supplier clientSupplier; + KafkaContext kafkaContext; @Inject Supplier> consumerSupplier; @@ -198,7 +198,7 @@ public byte[] value() { CompletionStage topicNameForId(String topicId) { Uuid kafkaTopicId = Uuid.fromString(topicId); - return clientSupplier.get() + return kafkaContext.admin() .listTopics(new ListTopicsOptions().listInternal(true)) .listings() .toCompletionStage() diff --git a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java index 965147aff..b431d3fd8 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java @@ -16,7 +16,6 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -54,6 +53,7 @@ import com.github.streamshub.console.api.model.ReplicaLocalStorage; import com.github.streamshub.console.api.model.Topic; import com.github.streamshub.console.api.model.TopicPatch; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.KafkaOffsetSpec; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.TopicValidation; @@ -101,10 +101,7 @@ public class TopicService { ValidationProxy validationService; @Inject - Supplier kafkaCluster; - - @Inject - Supplier clientSupplier; + KafkaContext kafkaContext; @Inject @Named("KafkaTopics") @@ -120,8 +117,8 @@ public class TopicService { ConsumerGroupService consumerGroupService; public CompletionStage createTopic(NewTopic topic, boolean validateOnly) { - Kafka kafka = kafkaCluster.get(); - Admin adminClient = clientSupplier.get(); + Kafka kafka = kafkaContext.resource(); + Admin adminClient = kafkaContext.admin(); validationService.validate(new TopicValidation.NewTopicInputs(kafka, Collections.emptyMap(), topic)); @@ -164,7 +161,7 @@ public CompletionStage> listTopics(List fields, String offse fetchList.add(Topic.Fields.CONFIGS); } - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); final Map statuses = new HashMap<>(); listSupport.meta().put("summary", Map.of("statuses", statuses)); @@ -201,7 +198,7 @@ CompletableFuture> listTopics(Admin adminClient, boolean list } public CompletionStage describeTopic(String topicId, List fields, String offsetSpec) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); Uuid id = Uuid.fromString(topicId); CompletableFuture describePromise = describeTopics(adminClient, List.of(id), fields, offsetSpec) @@ -232,7 +229,7 @@ public CompletionStage describeTopic(String topicId, List fields, * */ public CompletionStage patchTopic(String topicId, TopicPatch patch, boolean validateOnly) { - Kafka kafka = kafkaCluster.get(); + Kafka kafka = kafkaContext.resource(); return describeTopic(topicId, List.of(Topic.Fields.CONFIGS), KafkaOffsetSpec.LATEST) .thenApply(topic -> validationService.validate(new TopicValidation.TopicPatchInputs(kafka, topic, patch))) @@ -318,7 +315,7 @@ CompletableFuture maybeCreatePartitions(Topic topic, TopicPatch topicPatch } CompletionStage createPartitions(String topicName, int totalCount, List> newAssignments, boolean validateOnly) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); org.apache.kafka.clients.admin.NewPartitions newPartitions; @@ -357,7 +354,7 @@ List> maybeAlterPartitionAssignments(Topic topic, TopicP return Collections.emptyList(); } - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); if (logger.isDebugEnabled()) { logPartitionReassignments(topic, alteredAssignments); @@ -401,8 +398,8 @@ void logPartitionReassignments(Topic topic, }); logger.debugf("Altering partition reassignments for cluster %s[%s], topic %s[%s], changes=[ %s ]", - kafkaCluster.get().getMetadata().getName(), - kafkaCluster.get().getStatus().getClusterId(), + kafkaContext.resource().getMetadata().getName(), + kafkaContext.resource().getStatus().getClusterId(), topic.name(), topic.getId(), changes); @@ -417,7 +414,7 @@ CompletableFuture maybeAlterConfigs(Topic topic, TopicPatch topicPatch, bo } public CompletionStage deleteTopic(String topicId) { - Admin adminClient = clientSupplier.get(); + Admin adminClient = kafkaContext.admin(); Uuid id = Uuid.fromString(topicId); return adminClient.deleteTopics(TopicCollection.ofTopicIds(List.of(id))) @@ -434,12 +431,12 @@ Topic setManaged(Topic topic) { } Optional getManagedTopic(String topicName) { - ObjectMeta kafkaMeta = kafkaCluster.get().getMetadata(); - - return Optional.ofNullable(managedTopics.get(kafkaMeta.getNamespace())) - .map(clustersInNamespace -> clustersInNamespace.get(kafkaMeta.getName())) - .map(topicsInCluster -> topicsInCluster.get(topicName)) - .filter(this::isManaged); + return Optional.ofNullable(kafkaContext.resource()) + .map(Kafka::getMetadata) + .flatMap(kafkaMeta -> Optional.ofNullable(managedTopics.get(kafkaMeta.getNamespace())) + .map(clustersInNamespace -> clustersInNamespace.get(kafkaMeta.getName())) + .map(topicsInCluster -> topicsInCluster.get(topicName)) + .filter(this::isManaged)); } boolean isManaged(KafkaTopic topic) { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/Holder.java b/api/src/main/java/com/github/streamshub/console/api/support/Holder.java new file mode 100644 index 000000000..10f70aec3 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/Holder.java @@ -0,0 +1,87 @@ +package com.github.streamshub.console.api.support; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Like {@linkplain java.util.Optional Optional}, but non-final so it may be + * used as a CDI type. + * + * @param the type of value + * @see {@link java.util.Optional} + */ +public class Holder implements Supplier { + + private static final Holder EMPTY = new Holder<>(null); + private final T value; + + private Holder(T value) { + this.value = value; + } + + @SuppressWarnings("unchecked") + public static Holder empty() { + return (Holder) EMPTY; + } + + public static Holder of(T value) { + return new Holder<>(value); + } + + /** + * If a value is present, returns {@code true}, otherwise {@code false}. + * + * @return {@code true} if a value is present, otherwise {@code false} + */ + public boolean isPresent() { + return value != null; + } + + /** + * If a value is present, performs the given action with the value, + * otherwise does nothing. + * + * @param action the action to be performed, if a value is present + * @throws NullPointerException if value is present and the given action is + * {@code null} + */ + public void ifPresent(Consumer action) { + if (value != null) { + action.accept(value); + } + } + + /** + * If a value is present, returns the value, otherwise throws + * {@code NoSuchElementException}. + * + * @apiNote + * The preferred alternative to this method is {@link #orElseThrow()}. + * + * @return the non-{@code null} value described by this {@code Optional} + * @throws NoSuchElementException if no value is present + */ + @Override + public T get() { + if (value == null) { + throw new NoSuchElementException("No value present"); + } + return value; + } + + /** + * @see {@link java.util.Optional#map(Function)} + */ + public Optional map(Function mapper) { + Objects.requireNonNull(mapper); + if (!isPresent()) { + return Optional.empty(); + } else { + return Optional.ofNullable(mapper.apply(value)); + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/InformerFactory.java b/api/src/main/java/com/github/streamshub/console/api/support/InformerFactory.java index 67c0e5482..fb67411b6 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/InformerFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/InformerFactory.java @@ -14,7 +14,10 @@ import org.jboss.logging.Logger; +import com.github.streamshub.console.config.ConsoleConfig; + import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.strimzi.api.kafka.model.kafka.Kafka; @@ -32,15 +35,13 @@ public class InformerFactory { @Inject KubernetesClient k8s; - @Produces - @ApplicationScoped - @Named("KafkaInformer") - SharedIndexInformer kafkaInformer; + @Inject + ConsoleConfig consoleConfig; @Produces @ApplicationScoped - @Named("KafkaTopicInformer") - SharedIndexInformer topicInformer; + @Named("KafkaInformer") + Holder> kafkaInformer = Holder.empty(); @Produces @ApplicationScoped @@ -48,62 +49,95 @@ public class InformerFactory { // Keys: namespace -> cluster name -> topic name Map>> topics = new ConcurrentHashMap<>(); + SharedIndexInformer topicInformer; + /** * Initialize CDI beans produced by this factory. Executed on application startup. * * @param event CDI startup event */ void onStartup(@Observes Startup event) { - kafkaInformer = k8s.resources(Kafka.class).inAnyNamespace().inform(); - topicInformer = k8s.resources(KafkaTopic.class).inAnyNamespace().inform(); - topicInformer.addEventHandler(new ResourceEventHandler() { - @Override - public void onAdd(KafkaTopic topic) { - topicMap(topic).ifPresent(map -> map.put(topicName(topic), topic)); - } - - @Override - public void onUpdate(KafkaTopic oldTopic, KafkaTopic topic) { - onDelete(oldTopic, false); - onAdd(topic); - } + if (consoleConfig.getKubernetes().isEnabled()) { + var kafkaResources = k8s.resources(Kafka.class).inAnyNamespace(); - @Override - public void onDelete(KafkaTopic topic, boolean deletedFinalStateUnknown) { - topicMap(topic).ifPresent(map -> map.remove(topicName(topic))); + try { + kafkaInformer = Holder.of(kafkaResources.inform()); + } catch (KubernetesClientException e) { + logger.warnf("Failed to create Strimzi Kafka informer: %s", e.getMessage()); } - private static String topicName(KafkaTopic topic) { - return Optional.ofNullable(topic.getSpec()) - .map(KafkaTopicSpec::getTopicName) - .orElseGet(() -> topic.getMetadata().getName()); + try { + topicInformer = k8s.resources(KafkaTopic.class).inAnyNamespace().inform(); + topicInformer.addEventHandler(new KafkaTopicEventHandler(topics)); + } catch (KubernetesClientException e) { + logger.warnf("Failed to create Strimzi KafkaTopic informer: %s", e.getMessage()); } + } else { + logger.warn("Kubernetes client connection is disabled. Custom resource information will not be available."); + } + } - Optional> topicMap(KafkaTopic topic) { - String namespace = topic.getMetadata().getNamespace(); - String clusterName = topic.getMetadata().getLabels().get(STRIMZI_CLUSTER); - - if (clusterName == null) { - logger.warnf("KafkaTopic %s/%s is missing label %s and will be ignored", - namespace, - topic.getMetadata().getName(), - STRIMZI_CLUSTER); - return Optional.empty(); - } + void disposeKafkaInformer(@Disposes @Named("KafkaInformer") Holder> informer) { + informer.ifPresent(SharedIndexInformer::close); + } - Map map = topics.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()); + /** + * Close the KafkaTopic informer used to update the topics map being disposed. + * + * @param topics map of KafkaTopics being disposed. + */ + void disposeKafkaTopics(@Disposes Map>> topics) { + if (topicInformer != null) { + topicInformer.close(); + } + } - return Optional.of(map); + private class KafkaTopicEventHandler implements ResourceEventHandler { + Map>> topics; + + public KafkaTopicEventHandler(Map>> topics) { + this.topics = topics; + } + + @Override + public void onAdd(KafkaTopic topic) { + topicMap(topic).ifPresent(map -> map.put(topicName(topic), topic)); + } + + @Override + public void onUpdate(KafkaTopic oldTopic, KafkaTopic topic) { + onDelete(oldTopic, false); + onAdd(topic); + } + + @Override + public void onDelete(KafkaTopic topic, boolean deletedFinalStateUnknown) { + topicMap(topic).ifPresent(map -> map.remove(topicName(topic))); + } + + private static String topicName(KafkaTopic topic) { + return Optional.ofNullable(topic.getSpec()) + .map(KafkaTopicSpec::getTopicName) + .orElseGet(() -> topic.getMetadata().getName()); + } + + Optional> topicMap(KafkaTopic topic) { + String namespace = topic.getMetadata().getNamespace(); + String clusterName = topic.getMetadata().getLabels().get(STRIMZI_CLUSTER); + + if (clusterName == null) { + logger.warnf("KafkaTopic %s/%s is missing label %s and will be ignored", + namespace, + topic.getMetadata().getName(), + STRIMZI_CLUSTER); + return Optional.empty(); } - }); - } - public void disposeKafkaInformer(@Disposes @Named("KafkaInformer") SharedIndexInformer informer) { - informer.close(); - } + Map map = topics.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()); - public void disposeTopicInformer(@Disposes @Named("KafkaTopicInformer") SharedIndexInformer informer) { - informer.close(); + return Optional.of(map); + } } + } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java new file mode 100644 index 000000000..7226bb34d --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -0,0 +1,78 @@ +package com.github.streamshub.console.api.support; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.clients.admin.Admin; + +import com.github.streamshub.console.config.KafkaClusterConfig; + +import io.strimzi.api.kafka.model.kafka.Kafka; + +public class KafkaContext implements Closeable { + + public static final KafkaContext EMPTY = new KafkaContext(null, null, Collections.emptyMap(), null); + + final KafkaClusterConfig clusterConfig; + final Kafka resource; + final Map, Map> configs; + final Admin admin; + + public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map, Map> configs, Admin admin) { + this.clusterConfig = clusterConfig; + this.resource = resource; + this.configs = Map.copyOf(configs); + this.admin = admin; + } + + public KafkaContext(KafkaContext other, Admin admin) { + this(other.clusterConfig, other.resource, other.configs, admin); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof KafkaContext)) { + return false; + } + + KafkaContext other = (KafkaContext) obj; + return Objects.equals(clusterConfig, other.clusterConfig) + && Objects.equals(resource, other.resource) + && Objects.equals(configs, other.configs) + && Objects.equals(admin, other.admin); + } + + @Override + public int hashCode() { + return Objects.hash(clusterConfig, resource, configs, admin); + } + + @Override + public void close() { + if (admin != null) { + admin.close(); + } + } + + public KafkaClusterConfig clusterConfig() { + return clusterConfig; + } + + public Kafka resource() { + return resource; + } + + public Map, Map> configs() { + return configs; + } + + public Map configs(Class type) { + return configs.get(type); + } + + public Admin admin() { + return admin; + } +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index e69f61394..25f7a686b 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -39,7 +39,7 @@ quarkus.swagger-ui.enable=true quarkus.swagger-ui.always-include=true quarkus.swagger-ui.title=Console API -quarkus.log.category."org.apache.kafka".level=WARN +quarkus.log.category."org.apache.kafka".level=ERROR quarkus.container-image.labels."org.opencontainers.image.version"=${quarkus.application.version} quarkus.container-image.labels."org.opencontainers.image.revision"=${git.revision} @@ -64,6 +64,11 @@ console.kafka.admin.default.api.timeout.ms=10000 %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG ######## +%testplain.quarkus.devservices.enabled=true +%testplain.quarkus.kubernetes-client.devservices.enabled=true +%testplain.quarkus.kubernetes-client.devservices.override-kubeconfig=true +%testplain.quarkus.log.category."io.fabric8.kubernetes".level=DEBUG + #%testplain.quarkus.http.auth.proactive=false #%testplain.quarkus.http.auth.permission."oidc".policy=permit %testplain.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF diff --git a/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java index efc45975e..b778af6a4 100644 --- a/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java @@ -9,16 +9,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; import com.github.streamshub.console.test.TestHelper; import io.fabric8.kubernetes.client.KubernetesClient; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; @@ -35,7 +34,6 @@ import static org.hamcrest.Matchers.not; @QuarkusTest -@QuarkusTestResource(KubernetesServerTestResource.class) @TestHTTPEndpoint(BrokersResource.class) @TestProfile(TestPlainProfile.class) class BrokersResourceIT { @@ -46,6 +44,9 @@ class BrokersResourceIT { @Inject KubernetesClient client; + @Inject + ConsoleConfig consoleConfig; + @DeploymentManager.InjectDeploymentManager DeploymentManager deployments; @@ -60,10 +61,9 @@ void setup() { kafkaContainer = deployments.getKafkaContainer(); bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); utils = new TestHelper(bootstrapServers, config, null); - clusterId = utils.getClusterId(); client.resources(Kafka.class).inAnyNamespace().delete(); - client.resources(Kafka.class).resource(new KafkaBuilder() + utils.apply(client, new KafkaBuilder() .withNewMetadata() .withName("test-kafka1") .withNamespace("default") @@ -77,7 +77,7 @@ void setup() { .endKafka() .endSpec() .withNewStatus() - .withClusterId(clusterId) + .withClusterId(utils.getClusterId()) .addNewListener() .withName("listener0") .addNewAddress() @@ -86,8 +86,9 @@ void setup() { .endAddress() .endListener() .endStatus() - .build()) - .create(); + .build()); + + clusterId = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); } @Test diff --git a/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java index 0d2a7d820..404f84748 100644 --- a/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java @@ -49,6 +49,8 @@ import org.skyscreamer.jsonassert.JSONAssert; import org.skyscreamer.jsonassert.JSONCompareMode; +import com.github.streamshub.console.api.support.Holder; +import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; import com.github.streamshub.console.kafka.systemtest.utils.ConsumerUtils; @@ -58,11 +60,9 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; import io.strimzi.api.kafka.model.kafka.Kafka; import static com.github.streamshub.console.test.TestHelper.whenRequesting; @@ -86,7 +86,6 @@ import static org.mockito.Mockito.doAnswer; @QuarkusTest -@QuarkusTestResource(KubernetesServerTestResource.class) @TestHTTPEndpoint(ConsumerGroupsResource.class) @TestProfile(TestPlainProfile.class) class ConsumerGroupsResourceIT { @@ -98,7 +97,10 @@ class ConsumerGroupsResourceIT { KubernetesClient client; @Inject - SharedIndexInformer kafkaInformer; + ConsoleConfig consoleConfig; + + @Inject + Holder> kafkaInformer; @DeploymentManager.InjectDeploymentManager DeploymentManager deployments; @@ -121,17 +123,15 @@ void setup() throws IOException { utils = new TestHelper(bootstrapServers, config, null); - clusterId1 = utils.getClusterId(); - clusterId2 = UUID.randomUUID().toString(); - client.resources(Kafka.class).inAnyNamespace().delete(); - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers)) - .create(); + utils.apply(client, utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)); // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), 1)); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 1)); + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); } @Test diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 509a6e098..ece32151f 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -24,7 +24,6 @@ import jakarta.ws.rs.core.Response.Status; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.eclipse.microprofile.config.Config; @@ -39,6 +38,8 @@ import com.github.streamshub.console.api.model.ListFetchParams; import com.github.streamshub.console.api.service.KafkaClusterService; import com.github.streamshub.console.api.support.ErrorCategory; +import com.github.streamshub.console.api.support.Holder; +import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; @@ -48,12 +49,10 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusMock; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; @@ -81,7 +80,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest -@QuarkusTestResource(KubernetesServerTestResource.class) @TestHTTPEndpoint(KafkaClustersResource.class) @TestProfile(TestPlainProfile.class) class KafkaClustersResourceIT { @@ -95,10 +93,10 @@ class KafkaClustersResourceIT { KubernetesClient client; @Inject - SharedIndexInformer kafkaInformer; + Holder> kafkaInformer; @Inject - Map configuredAdmins; + Map configuredContexts; @Inject KafkaClusterService kafkaClusterService; @@ -128,43 +126,40 @@ void setup() throws IOException { utils = new TestHelper(bootstrapServers, config, null); - clusterId1 = utils.getClusterId(); - clusterId2 = UUID.randomUUID().toString(); - client.resources(Kafka.class).inAnyNamespace().delete(); - client.resources(Kafka.class) - .resource(new KafkaBuilder(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers, + + utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers, new KafkaListenerAuthenticationCustomBuilder() - .withSasl() - .addToListenerConfig("sasl.enabled.mechanisms", "oauthbearer") - .build())) - .editOrNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus("True") - .endCondition() - .addNewKafkaNodePool() - .withName("my-node-pool") - .endKafkaNodePool() - .endStatus() - .build()) - .create(); + .withSasl() + .addToListenerConfig("sasl.enabled.mechanisms", "oauthbearer") + .build())) + .editOrNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .addNewKafkaNodePool() + .withName("my-node-pool") + .endKafkaNodePool() + .endStatus() + .build()); + // Second cluster is offline/non-existent - client.resources(Kafka.class) - .resource(new KafkaBuilder(utils.buildKafkaResource("test-kafka2", clusterId2, randomBootstrapServers)) - .editOrNewStatus() - .addNewCondition() - .withType("NotReady") - .withStatus("True") - .endCondition() - .endStatus() - .build()) - .create(); + utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka2", UUID.randomUUID().toString(), randomBootstrapServers)) + .editOrNewStatus() + .addNewCondition() + .withType("NotReady") + .withStatus("True") + .endCondition() + .endStatus() + .build()); // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), 2)); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 2)); + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); kafkaClusterService.setListUnconfigured(false); } @@ -183,7 +178,7 @@ void testListClusters() { String k1Bootstrap = bootstrapServers.getHost() + ":" + bootstrapServers.getPort(); String k2Bootstrap = randomBootstrapServers.getHost() + ":" + randomBootstrapServers.getPort(); - whenRequesting(req -> req.get()) + whenRequesting(req -> req.queryParam("fields[kafkas]", "name,status,nodePools,listeners").get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data.size()", equalTo(2)) @@ -204,7 +199,7 @@ void testListClusters() { void testListClustersWithInformerError() { SharedIndexInformer informer = Mockito.mock(); - var informerType = new TypeLiteral>() { + var informerType = new TypeLiteral>>() { private static final long serialVersionUID = 1L; }; @@ -228,9 +223,9 @@ public synchronized Throwable fillInStackTrace() { } }); - QuarkusMock.installMockForType(informer, informerType, new NamedLiteral()); + QuarkusMock.installMockForType(Holder.of(informer), informerType, new NamedLiteral()); - whenRequesting(req -> req.get("{clusterId}", UUID.randomUUID().toString())) + whenRequesting(req -> req.get()) .assertThat() .statusCode(is(Status.INTERNAL_SERVER_ERROR.getStatusCode())) .body("errors.size()", is(1)) @@ -270,13 +265,13 @@ void testListClustersWithRangePaginationTruncated() { IntStream.range(3, 10) .mapToObj(i -> "test-kafka" + i) .map(name -> utils.buildKafkaResource(name, randomBootstrapServers)) - .map(kafka -> client.resources(Kafka.class).resource(kafka).create()) + .map(kafka -> utils.apply(client, kafka)) .map(kafka -> kafka.getMetadata().getName())) .toList(); // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), allKafkaNames.size())); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size())); var fullResponse = whenRequesting(req -> req.queryParam("sort", "name").get()) .assertThat() @@ -327,13 +322,13 @@ void testListClustersWithPaginationCursors(Integer afterIndex, Integer beforeInd IntStream.range(3, 10) .mapToObj(i -> "test-kafka" + i) .map(name -> utils.buildKafkaResource(name, randomBootstrapServers)) - .map(kafka -> client.resources(Kafka.class).resource(kafka).create()) + .map(kafka -> utils.apply(client, kafka)) .map(kafka -> kafka.getMetadata().getName())) .toList(); // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), allKafkaNames.size())); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size())); var fullResponse = whenRequesting(req -> req.queryParam("sort", "name").get()) .assertThat() @@ -510,7 +505,7 @@ void testListClustersWithUnconfiguredCluster() { // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), 3)); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 3)); whenRequesting(req -> req.get()) .assertThat() @@ -563,10 +558,10 @@ void testDescribeClusterWithCertificates() { Map clientConfig = mockAdminClient(); - client.resources(Kafka.class).resource(kafka).create(); + utils.apply(client, kafka); await().atMost(Duration.ofSeconds(5)) - .until(() -> configuredAdmins.containsKey(Cache.metaNamespaceKeyFunc(kafka))); + .until(() -> configuredContexts.containsKey(clusterId)); whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() @@ -594,7 +589,7 @@ void testDescribeClusterWithTLSMissingCertificates() { client.resources(Kafka.class).resource(kafka).create(); await().atMost(Duration.ofSeconds(5)) - .until(() -> kafkaInformer + .until(() -> kafkaInformer.get() .getStore() .list() .stream() diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java new file mode 100644 index 000000000..8d34928b8 --- /dev/null +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java @@ -0,0 +1,91 @@ +package com.github.streamshub.console.api; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +import jakarta.inject.Inject; +import jakarta.ws.rs.core.Response.Status; + +import org.eclipse.microprofile.config.Config; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.github.streamshub.console.api.service.KafkaClusterService; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.kafka.systemtest.TestPlainNoK8sProfile; +import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; +import com.github.streamshub.console.test.TestHelper; + +import io.quarkus.test.common.http.TestHTTPEndpoint; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.strimzi.test.container.StrimziKafkaContainer; + +import static com.github.streamshub.console.test.TestHelper.whenRequesting; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +@QuarkusTest +@TestHTTPEndpoint(KafkaClustersResource.class) +@TestProfile(TestPlainNoK8sProfile.class) +class KafkaClustersResourceNoK8sIT { + + @Inject + Config config; + + @Inject + Map configuredContexts; + + @Inject + KafkaClusterService kafkaClusterService; + + @Inject + ConsoleConfig consoleConfig; + + @DeploymentManager.InjectDeploymentManager + DeploymentManager deployments; + + TestHelper utils; + + StrimziKafkaContainer kafkaContainer; + String clusterId1; + String clusterId2; + URI bootstrapServers; + URI randomBootstrapServers; + + @BeforeEach + void setup() throws IOException { + kafkaContainer = deployments.getKafkaContainer(); + bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); + randomBootstrapServers = URI.create(consoleConfig.getKafka() + .getCluster("default/test-kafka2") + .map(k -> k.getProperties().get("bootstrap.servers")) + .orElseThrow()); + + utils = new TestHelper(bootstrapServers, config, null); + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); + kafkaClusterService.setListUnconfigured(false); + } + + @Test + void testListClusters() { + whenRequesting(req -> req.queryParam("fields[kafkas]", "name,status,nodePools,listeners").get()) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.size()", equalTo(2)) + .body("data.id", containsInAnyOrder(clusterId1, clusterId2)) + .body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2")) + .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.status", is(nullValue())) + .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.nodePools", is(nullValue())) + .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.listeners", is(nullValue())) + .body("data.find { it.attributes.name == 'test-kafka2'}.attributes.status", is(nullValue())) + .body("data.find { it.attributes.name == 'test-kafka2'}.attributes.listeners", is(nullValue())); + } + +} diff --git a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java index 00cb744d6..a8285ac6a 100644 --- a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java @@ -43,11 +43,9 @@ import com.github.streamshub.console.test.TopicHelper; import io.fabric8.kubernetes.client.KubernetesClient; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; import io.strimzi.api.kafka.model.kafka.Kafka; import static com.github.streamshub.console.test.TestHelper.whenRequesting; @@ -63,7 +61,6 @@ import static org.hamcrest.Matchers.is; @QuarkusTest -@QuarkusTestResource(KubernetesServerTestResource.class) @TestHTTPEndpoint(RecordsResource.class) @TestProfile(TestPlainProfile.class) class RecordsResourceIT { @@ -100,17 +97,14 @@ void setup() throws IOException { utils = new TestHelper(bootstrapServers, config, null); recordUtils = new RecordHelper(bootstrapServers, config, null); - clusterId1 = utils.getClusterId(); - clusterId2 = UUID.randomUUID().toString(); - client.resources(Kafka.class).inAnyNamespace().delete(); - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers)) - .create(); + + utils.apply(client, utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)); // Second cluster is offline/non-existent - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka2", clusterId2, randomBootstrapServers)) - .create(); + utils.apply(client, utils.buildKafkaResource("test-kafka2", UUID.randomUUID().toString(), randomBootstrapServers)); + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); } @Test diff --git a/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java index 7f3123236..1a9aa6f3b 100644 --- a/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java @@ -64,6 +64,7 @@ import org.skyscreamer.jsonassert.JSONAssert; import org.skyscreamer.jsonassert.JSONCompareMode; +import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; @@ -75,11 +76,9 @@ import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.topic.KafkaTopic; import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; @@ -111,7 +110,6 @@ import static org.mockito.Mockito.doAnswer; @QuarkusTest -@QuarkusTestResource(KubernetesServerTestResource.class) @TestHTTPEndpoint(TopicsResource.class) @TestProfile(TestPlainProfile.class) class TopicsResourceIT { @@ -126,7 +124,7 @@ class TopicsResourceIT { KubernetesClient client; @Inject - SharedIndexInformer kafkaInformer; + Holder> kafkaInformer; @Inject @Named("KafkaTopics") @@ -158,23 +156,19 @@ void setup() throws IOException { utils = new TestHelper(bootstrapServers1, config, null); - clusterId1 = utils.getClusterId(); - clusterId2 = UUID.randomUUID().toString(); - client.resources(Kafka.class).inAnyNamespace().delete(); client.resources(KafkaTopic.class).inAnyNamespace().delete(); - client.resources(Kafka.class) - .resource(utils.buildKafkaResource(clusterName1, clusterId1, bootstrapServers1)) - .create(); + utils.apply(client, utils.buildKafkaResource(clusterName1, utils.getClusterId(), bootstrapServers1)); // Second cluster is offline/non-existent - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka2", clusterId2, randomBootstrapServers)) - .create(); + utils.apply(client, utils.buildKafkaResource("test-kafka2", UUID.randomUUID().toString(), randomBootstrapServers)); // Wait for the informer cache to be populated with all Kafka CRs await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.getStore().list().size(), 2)); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 2)); + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); } @Test diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java new file mode 100644 index 000000000..b892be8d2 --- /dev/null +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java @@ -0,0 +1,46 @@ +package com.github.streamshub.console.kafka.systemtest; + +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.List; +import java.util.Map; + +import com.github.streamshub.console.kafka.systemtest.deployment.KafkaUnsecuredResourceManager; + +/** + * Same as profile {@linkplain TestPlainProfile}, but disables Kubernetes use by setting + * properties {@code kuberentes.enabled=false} in the application's configuration YAML and + * {@code quarkus.kubernetes-client.devservices.enabled=false} to disable the testing/mock + * Kubernetes API server. + */ +public class TestPlainNoK8sProfile extends TestPlainProfile implements QuarkusTestProfile { + + @Override + public List testResources() { + return List.of(new TestResourceEntry(KafkaUnsecuredResourceManager.class, Map.of("profile", PROFILE))); + } + + @Override + public Map getConfigOverrides() { + var configFile = writeConfiguration(""" + kubernetes: + enabled: false + kafka: + clusters: + - name: test-kafka1 + namespace: default + id: k1-id + properties: + bootstrap.servers: ${console.test.external-bootstrap} + - name: test-kafka2 + namespace: default + id: k2-id + properties: + bootstrap.servers: ${console.test.random-bootstrap} + """); + + return Map.of( + "quarkus.kubernetes-client.devservices.enabled", "false", + "console.config-path", configFile.getAbsolutePath()); + } +} diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index b29846f71..340b7c08d 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -1,11 +1,17 @@ package com.github.streamshub.console.kafka.systemtest; -import io.quarkus.test.junit.QuarkusTestProfile; - +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.List; import java.util.Map; import com.github.streamshub.console.kafka.systemtest.deployment.KafkaUnsecuredResourceManager; +import com.github.streamshub.console.kafka.systemtest.deployment.StrimziCrdResourceManager; + +import io.quarkus.test.junit.QuarkusTestProfile; public class TestPlainProfile implements QuarkusTestProfile { @@ -20,7 +26,51 @@ public String getConfigProfile() { @Override public List testResources() { - return List.of(new TestResourceEntry(KafkaUnsecuredResourceManager.class, Map.of("profile", PROFILE))); + return List.of( + new TestResourceEntry(StrimziCrdResourceManager.class), + new TestResourceEntry(KafkaUnsecuredResourceManager.class, Map.of("profile", PROFILE))); + } + + @Override + public Map getConfigOverrides() { + var configFile = writeConfiguration(""" + kubernetes: + enabled: true + kafka: + clusters: + - name: test-kafka1 + namespace: default + id: k1-id + properties: + bootstrap.servers: ${console.test.external-bootstrap} + - name: test-kafka2 + namespace: default + id: k2-id + properties: + bootstrap.servers: ${console.test.random-bootstrap} + - name: test-kafka3 + namespace: default + # listener is named and bootstrap.servers not set (will be retrieved from Kafka CR) + listener: listener0 + properties: + security.protocol: SSL + """); + + return Map.of("console.config-path", configFile.getAbsolutePath()); } + protected File writeConfiguration(String configurationYaml) { + File configFile; + + try { + configFile = File.createTempFile("console-test-config-", ".yaml"); + configFile.deleteOnExit(); + + Files.writeString(configFile.toPath(), configurationYaml, StandardOpenOption.WRITE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return configFile; + } } diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/KafkaUnsecuredResourceManager.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/KafkaUnsecuredResourceManager.java index 2eb9b6949..bf0c0f1a6 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/KafkaUnsecuredResourceManager.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/KafkaUnsecuredResourceManager.java @@ -1,12 +1,9 @@ package com.github.streamshub.console.kafka.systemtest.deployment; -import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.net.ServerSocket; import java.net.URI; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; @@ -16,7 +13,6 @@ public class KafkaUnsecuredResourceManager extends KafkaResourceManager implements QuarkusTestResourceLifecycleManager { ServerSocket randomSocket; - File configFile; @Override public Map start() { @@ -33,36 +29,8 @@ public Map start() { URI randomBootstrapServers = URI.create("dummy://localhost:" + randomSocket.getLocalPort()); - try { - configFile = File.createTempFile("console-test-config-", ".yaml"); - configFile.deleteOnExit(); - - Files.writeString(configFile.toPath(), """ - kafka: - clusters: - - name: test-kafka1 - namespace: default - properties: - bootstrap.servers: ${console.test.external-bootstrap} - - name: test-kafka2 - namespace: default - properties: - bootstrap.servers: ${console.test.random-bootstrap} - - name: test-kafka3 - namespace: default - # listener is named and bootstrap.servers not set (will be retrieved from Kafka CR) - listener: listener0 - properties: - security.protocol: SSL - """, - StandardOpenOption.WRITE); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return Map.ofEntries( Map.entry(profile + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, externalBootstrap), - Map.entry(profile + "console.config-path", configFile.getAbsolutePath()), Map.entry(profile + "console.test.external-bootstrap", externalBootstrap), Map.entry(profile + "console.test.random-bootstrap", randomBootstrapServers.toString())); } @@ -78,7 +46,5 @@ public void stop() { throw new UncheckedIOException(e); } } - - configFile.delete(); } } diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java new file mode 100644 index 000000000..6d1a7be66 --- /dev/null +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/deployment/StrimziCrdResourceManager.java @@ -0,0 +1,86 @@ +package com.github.streamshub.console.kafka.systemtest.deployment; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.quarkus.test.common.DevServicesContext; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.strimzi.api.kafka.Crds; + +/** + * This manager creates the Strimzi CRDs needed by the application prior to the test + * instance of the application being started. It is provided with the Kubernetes API + * connection properties for the Quarkus devservices instance of the K8s API. + */ +public class StrimziCrdResourceManager implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { + + private static final String PREFIX = "quarkus.kubernetes-client."; + + DevServicesContext context; + Map devConfig; + + @Override + public void setIntegrationTestContext(DevServicesContext context) { + this.context = context; + devConfig = context.devServicesProperties(); + } + + Optional get(String key) { + return get(key, Function.identity()); + } + + Optional get(String key, Function mapper) { + return Optional.ofNullable(devConfig.get(PREFIX + key)) + .map(mapper); + } + + @Override + public Map start() { + Config base = Config.autoConfigure(null); + + var k8s = new KubernetesClientBuilder() + .editOrNewConfig() + .withTrustCerts(get("trust-certs", Boolean::parseBoolean).orElseGet(base::isTrustCerts)) + .withWatchReconnectLimit(get("watch-reconnect-limit", Integer::parseInt).orElseGet(base::getWatchReconnectLimit)) + .withWatchReconnectInterval((int) get("watch-reconnect-interval", Duration::parse) + .orElse(Duration.ofMillis(base.getWatchReconnectInterval())).toMillis()) + .withConnectionTimeout((int) get("connection-timeout", Duration::parse) + .orElse(Duration.ofMillis(base.getConnectionTimeout())).toMillis()) + .withRequestTimeout((int) get("request-timeout", Duration::parse) + .orElse(Duration.ofMillis(base.getRequestTimeout())).toMillis()) + .withMasterUrl(get("api-server-url").or(() -> get("master-url")).orElse(base.getMasterUrl())) + .withNamespace(get("namespace").orElseGet(base::getNamespace)) + .withUsername(get("username").orElse(base.getUsername())) + .withPassword(get("password").orElse(base.getPassword())) + .withCaCertFile(get("ca-cert-file").orElse(base.getCaCertFile())) + .withCaCertData(get("ca-cert-data").orElse(base.getCaCertData())) + .withClientCertFile(get("client-cert-file").orElse(base.getClientCertFile())) + .withClientCertData(get("client-cert-data").orElse(base.getClientCertData())) + .withClientKeyFile(get("client-key-file").orElse(base.getClientKeyFile())) + .withClientKeyData(get("client-key-data").orElse(base.getClientKeyData())) + .withClientKeyPassphrase(get("client-key-passphrase").orElse(base.getClientKeyPassphrase())) + .withClientKeyAlgo(get("client-key-algo").orElse(base.getClientKeyAlgo())) + .withHttpProxy(get("http-proxy").orElse(base.getHttpProxy())) + .withHttpsProxy(get("https-proxy").orElse(base.getHttpsProxy())) + .withProxyUsername(get("proxy-username").orElse(base.getProxyUsername())) + .withProxyPassword(get("proxy-password").orElse(base.getProxyPassword())) + .withNoProxy(get("no-proxy", s -> s.split(",")).orElse(base.getNoProxy())) + .endConfig() + .build(); + + k8s.resource(Crds.kafka()).serverSideApply(); + k8s.resource(Crds.kafkaTopic()).serverSideApply(); + + return Collections.emptyMap(); + } + + @Override + public void stop() { + // No-op + } +} diff --git a/api/src/test/java/com/github/streamshub/console/test/TestHelper.java b/api/src/test/java/com/github/streamshub/console/test/TestHelper.java index dabedc1ce..2e2de2898 100644 --- a/api/src/test/java/com/github/streamshub/console/test/TestHelper.java +++ b/api/src/test/java/com/github/streamshub/console/test/TestHelper.java @@ -20,6 +20,8 @@ import com.github.streamshub.console.api.Annotations; import com.github.streamshub.console.kafka.systemtest.utils.ClientsConfig; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.KubernetesClient; import io.restassured.response.Response; import io.restassured.response.ValidatableResponse; import io.restassured.specification.RequestSpecification; @@ -119,6 +121,11 @@ public Kafka buildKafkaResource(String name, String id, URI bootstrapServers, Ka .build(); } + public > C apply(KubernetesClient client, C resource) { + client.resource(resource).serverSideApply(); + return client.resource(resource).patchStatus(); + } + public String getClusterId() { try (Admin admin = Admin.create(adminConfig)) { return admin.describeCluster().clusterId().get(10, TimeUnit.SECONDS); diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java index 3c496f52a..691eabd41 100644 --- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java @@ -2,8 +2,17 @@ public class ConsoleConfig { + KubernetesConfig kubernetes = new KubernetesConfig(); KafkaConfig kafka = new KafkaConfig(); + public KubernetesConfig getKubernetes() { + return kubernetes; + } + + public void setKubernetes(KubernetesConfig kubernetes) { + this.kubernetes = kubernetes; + } + public KafkaConfig getKafka() { return kafka; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index bf4cfd23c..6e91a6ddd 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -7,6 +7,7 @@ public class KafkaClusterConfig { + private String id; private String name; private String namespace; private String listener; @@ -17,9 +18,20 @@ public class KafkaClusterConfig { @JsonIgnore public String clusterKey() { + if (namespace == null || namespace.isBlank()) { + return name; + } return "%s/%s".formatted(namespace, name); } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getName() { return name; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KubernetesConfig.java b/common/src/main/java/com/github/streamshub/console/config/KubernetesConfig.java new file mode 100644 index 000000000..38229655c --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/KubernetesConfig.java @@ -0,0 +1,14 @@ +package com.github.streamshub.console.config; + +public class KubernetesConfig { + + boolean enabled = true; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } +} diff --git a/console-config-example.yaml b/console-config-example.yaml index 369e6b851..44f539fde 100644 --- a/console-config-example.yaml +++ b/console-config-example.yaml @@ -1,7 +1,13 @@ kafka: + kubernetes: + # enable/disable use of Kubernetes to obtain additional information from Strimzi + # Kafka and KafkaTopic custom resources. Enabled by default + enabled: true + clusters: - name: my-kafka1 # name of the Strimzi Kafka CR - namespace: my-namespace1 # namespace of the Strimzi Kafka CR + namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional) + id: my-kafka1-id # value to be used as an identifier for the cluster. Must be specified when namespace is not. listener: "secure" # name of the listener to use for connections from the console # `properties` contains keys/values to use for any Kafka connection properties: diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVar.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVar.java new file mode 100644 index 000000000..f71ec7f0f --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVar.java @@ -0,0 +1,34 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ConfigVar { + + @JsonProperty("name") + private String name; + + @JsonProperty("value") + private String value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVarSource.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVarSource.java new file mode 100644 index 000000000..b0026fa3e --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVarSource.java @@ -0,0 +1,51 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import io.fabric8.generator.annotation.ValidationRule; +import io.fabric8.kubernetes.api.model.ConfigMapEnvSource; +import io.fabric8.kubernetes.api.model.SecretEnvSource; +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +@ValidationRule( + value = "has(self.configMapRef) || has(self.secretRef)", + message = "One of `configMapRef` or `secretRef` is required") +public class ConfigVarSource { + + @JsonProperty("configMapRef") + private ConfigMapEnvSource configMapRef; + + @JsonProperty("prefix") + private String prefix; + + @JsonProperty("secretRef") + private SecretEnvSource secretRef; + + public ConfigMapEnvSource getConfigMapRef() { + return configMapRef; + } + + public void setConfigMapRef(ConfigMapEnvSource configMapRef) { + this.configMapRef = configMapRef; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + public SecretEnvSource getSecretRef() { + return secretRef; + } + + public void setSecretRef(SecretEnvSource secretRef) { + this.secretRef = secretRef; + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVars.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVars.java new file mode 100644 index 000000000..b88aadfbc --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConfigVars.java @@ -0,0 +1,34 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import java.util.ArrayList; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ConfigVars { + + List values = new ArrayList<>(); + + List valuesFrom = new ArrayList<>(); + + public List getValues() { + return values; + } + + public void setValues(List values) { + this.values = values; + } + + public List getValuesFrom() { + return valuesFrom; + } + + public void setValuesFrom(List valuesFrom) { + this.valuesFrom = valuesFrom; + } + +} \ No newline at end of file diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index 4dd94fee0..3a14c05b2 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -1,23 +1,76 @@ package com.github.streamshub.console.api.v1alpha1.spec; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import io.fabric8.generator.annotation.Required; +import io.fabric8.generator.annotation.ValidationRule; import io.sundr.builder.annotations.Buildable; @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) +@ValidationRule( + // The `namespace` property must be wrapped in double underscore to escape it + // due to it being a "reserved" word. + value = "has(self.id) || has(self.__namespace__)", + message = "One of `id` or `namespace` is required") +@ValidationRule( + // The `namespace` property must be wrapped in double underscore to escape it + // due to it being a "reserved" word. + value = "!has(self.listener) || has(self.__namespace__)", + message = "Property `listener` may not be used when `namespace` is omitted") public class KafkaCluster { - @Required - String name; + @JsonPropertyDescription(""" + Identifier to be used for this Kafka cluster in the console. When \ + the console is connected to Kubernetes and a Strimzi Kafka custom \ + resource may be discovered using the name and namespace properties, \ + this property is optional. Otherwise, the Kafka cluster identifier \ + published in the Kafka resource's status will be used. If namespace \ + is not given or the console or Kubernetes is not in use, this property \ + is required. + + When provided, this property will override the Kafka cluster id available \ + in the Kafka resource's status.""") + private String id; @Required - String namespace; + @JsonPropertyDescription(""" + The name of the Kafka cluster. When the console is connected to \ + Kubernetes, a Strimzi Kafka custom resource may be discovered using \ + this property together with the namespace property. In any case, \ + this property will be displayed in the console for the Kafka cluster's \ + name.""") + private String name; + + @JsonPropertyDescription(""" + The namespace of the Kafka cluster. When the console is connected to \ + Kubernetes, a Strimzi Kafka custom resource may be discovered using \ + this property together with the name property.""") + private String namespace; + + @JsonPropertyDescription(""" + The name of the listener in the Strimzi Kafka Kubernetes resource that \ + should be used by the console to establish connections.""") + private String listener; + + private Credentials credentials; + + private ConfigVars properties = new ConfigVars(); + + private ConfigVars adminProperties = new ConfigVars(); - String listener; + private ConfigVars consumerProperties = new ConfigVars(); - Credentials credentials; + private ConfigVars producerProperties = new ConfigVars(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } public String getName() { return name; @@ -51,4 +104,35 @@ public void setCredentials(Credentials credentials) { this.credentials = credentials; } + public ConfigVars getProperties() { + return properties; + } + + public void setProperties(ConfigVars properties) { + this.properties = properties; + } + + public ConfigVars getAdminProperties() { + return adminProperties; + } + + public void setAdminProperties(ConfigVars adminProperties) { + this.adminProperties = adminProperties; + } + + public ConfigVars getConsumerProperties() { + return consumerProperties; + } + + public void setConsumerProperties(ConfigVars consumerProperties) { + this.consumerProperties = consumerProperties; + } + + public ConfigVars getProducerProperties() { + return producerProperties; + } + + public void setProducerProperties(ConfigVars producerProperties) { + this.producerProperties = producerProperties; + } } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/status/Condition.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/status/Condition.java index 39df08428..a1c75007f 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/status/Condition.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/status/Condition.java @@ -1,17 +1,10 @@ package com.github.streamshub.console.api.v1alpha1.status; -import java.util.HashMap; -import java.util.Map; - -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import io.sundr.builder.annotations.Buildable; -import static java.util.Collections.emptyMap; - @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) public class Condition { @@ -21,7 +14,6 @@ public class Condition { private String message; private String type; private String lastTransitionTime; - private Map additionalProperties; @JsonPropertyDescription("The status of the condition, either True, False or Unknown.") public String getStatus() { @@ -69,17 +61,4 @@ public String getMessage() { public void setMessage(String message) { this.message = message; } - - @JsonAnyGetter - public Map getAdditionalProperties() { - return this.additionalProperties != null ? this.additionalProperties : emptyMap(); - } - - @JsonAnySetter - public void setAdditionalProperty(String name, Object value) { - if (this.additionalProperties == null) { - this.additionalProperties = new HashMap<>(1); - } - this.additionalProperties.put(name, value); - } } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java index 76a287867..276aa1e73 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java @@ -26,7 +26,7 @@ public interface ConsoleResource { static final String INSTANCE_LABEL = "app.kubernetes.io/instance"; static final String MANAGER = "streamshub-console-operator"; - static final Map MANAGEMENT_LABEL = Map.of(MANAGED_BY_LABEL, MANAGER); + public static final Map MANAGEMENT_LABEL = Map.of(MANAGED_BY_LABEL, MANAGER); static final String MANAGEMENT_SELECTOR = MANAGED_BY_LABEL + '=' + MANAGER; static final HexFormat DIGEST_FORMAT = HexFormat.of(); static final String DEFAULT_DIGEST = "0".repeat(40); diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index 1727d4279..d9dc43ced 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -9,8 +9,10 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Random; +import java.util.function.Function; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -22,11 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.streamshub.console.ReconciliationException; import com.github.streamshub.console.api.v1alpha1.Console; -import com.github.streamshub.console.api.v1alpha1.spec.CredentialsKafkaUser; +import com.github.streamshub.console.api.v1alpha1.spec.ConfigVars; +import com.github.streamshub.console.api.v1alpha1.spec.Credentials; import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; @@ -108,96 +112,105 @@ private ConsoleConfig buildConfig(Console primary, Context context) { ConsoleConfig config = new ConsoleConfig(); for (var kafkaRef : primary.getSpec().getKafkaClusters()) { - addConfig(context, config, kafkaRef); + addConfig(primary, context, config, kafkaRef); } return config; } - private void addConfig(Context context, ConsoleConfig config, KafkaCluster kafkaRef) { + private void addConfig(Console primary, Context context, ConsoleConfig config, KafkaCluster kafkaRef) { String namespace = kafkaRef.getNamespace(); String name = kafkaRef.getName(); String listenerName = kafkaRef.getListener(); KafkaClusterConfig kcConfig = new KafkaClusterConfig(); + kcConfig.setId(kafkaRef.getId()); kcConfig.setNamespace(namespace); kcConfig.setName(name); kcConfig.setListener(listenerName); - config.getKafka().getClusters().add(kcConfig); - // TODO: add informer for Kafka CRs - Kafka kafka = getResource(context, Kafka.class, namespace, name); - - if (listenerName != null) { - GenericKafkaListener listenerSpec = kafka.getSpec() - .getKafka() - .getListeners() - .stream() - .filter(l -> l.getName().equals(listenerName)) - .findFirst() - .orElseThrow(() -> new ReconciliationException("Listener '" + listenerName + "' not found on Kafka " + name + " in namespace " + namespace)); - - StringBuilder protocol = new StringBuilder(); - String mechanism = null; - - if (listenerSpec.getAuth() != null) { - protocol.append("SASL_"); - - var auth = listenerSpec.getAuth(); - switch (auth.getType()) { - case "oauth": - mechanism = "OAUTHBEARER"; - break; - case "scram-sha-512": - mechanism = "SCRAM-SHA-512"; - break; - case "tls", "custom": - default: - // Nothing yet - break; - } - } + config.getKubernetes().setEnabled(Objects.nonNull(namespace)); + config.getKafka().getClusters().add(kcConfig); - if (listenerSpec.isTls()) { - protocol.append("SSL"); - } else { - protocol.append("PLAINTEXT"); - } + if (namespace != null && listenerName != null) { + // TODO: add informer for Kafka CRs + Kafka kafka = getResource(context, Kafka.class, namespace, name); + setListenerConfig(kcConfig.getProperties(), kafka, listenerName); + } - var properties = kcConfig.getProperties(); + Optional.ofNullable(kafkaRef.getCredentials()) + .map(Credentials::getKafkaUser) + .ifPresent(user -> { + String userNs = Optional.ofNullable(user.getNamespace()).orElse(namespace); + setKafkaUserConfig( + context, + getResource(context, KafkaUser.class, userNs, user.getName()), + kcConfig.getProperties()); + }); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + setConfigVars(primary, context, kcConfig.getProperties(), kafkaRef.getProperties()); + setConfigVars(primary, context, kcConfig.getAdminProperties(), kafkaRef.getAdminProperties()); + setConfigVars(primary, context, kcConfig.getConsumerProperties(), kafkaRef.getConsumerProperties()); + setConfigVars(primary, context, kcConfig.getProducerProperties(), kafkaRef.getProducerProperties()); + } - if (mechanism != null) { - properties.put(SaslConfigs.SASL_MECHANISM, mechanism); + void setListenerConfig(Map properties, Kafka kafka, String listenerName) { + GenericKafkaListener listenerSpec = kafka.getSpec() + .getKafka() + .getListeners() + .stream() + .filter(l -> l.getName().equals(listenerName)) + .findFirst() + .orElseThrow(() -> new ReconciliationException("Listener '%s' not found on Kafka %s/%s" + .formatted(listenerName, kafka.getMetadata().getNamespace(), kafka.getMetadata().getName()))); + + StringBuilder protocol = new StringBuilder(); + String mechanism = null; + + if (listenerSpec.getAuth() != null) { + protocol.append("SASL_"); + + var auth = listenerSpec.getAuth(); + switch (auth.getType()) { + case "oauth": + mechanism = "OAUTHBEARER"; + break; + case "scram-sha-512": + mechanism = "SCRAM-SHA-512"; + break; + case "tls", "custom": + default: + // Nothing yet + break; } + } - ListenerStatus listenerStatus = Optional.ofNullable(kafka.getStatus()) - .map(KafkaStatus::getListeners) - .orElseGet(Collections::emptyList) - .stream() - .filter(l -> l.getName().equals(listenerName)) - .findFirst() - .orElse(null); - - Optional.ofNullable(listenerStatus) - .map(ListenerStatus::getBootstrapServers) - .or(() -> Optional.ofNullable(listenerSpec.getConfiguration()) - .map(GenericKafkaListenerConfiguration::getBootstrap) - .map(GenericKafkaListenerConfigurationBootstrap::getHost)) - .ifPresent(bootstrapServers -> properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); - - Optional.ofNullable(kafkaRef.getCredentials()).ifPresent(credentials -> { - if (credentials.getKafkaUser() != null) { - CredentialsKafkaUser user = credentials.getKafkaUser(); - String userNs = Optional.ofNullable(user.getNamespace()).orElse(namespace); - setKafkaUserConfig( - context, - getResource(context, KafkaUser.class, userNs, user.getName()), - properties); - } - }); + if (listenerSpec.isTls()) { + protocol.append("SSL"); + } else { + protocol.append("PLAINTEXT"); } + + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + + if (mechanism != null) { + properties.put(SaslConfigs.SASL_MECHANISM, mechanism); + } + + ListenerStatus listenerStatus = Optional.ofNullable(kafka.getStatus()) + .map(KafkaStatus::getListeners) + .orElseGet(Collections::emptyList) + .stream() + .filter(l -> l.getName().equals(listenerName)) + .findFirst() + .orElse(null); + + Optional.ofNullable(listenerStatus) + .map(ListenerStatus::getBootstrapServers) + .or(() -> Optional.ofNullable(listenerSpec.getConfiguration()) + .map(GenericKafkaListenerConfiguration::getBootstrap) + .map(GenericKafkaListenerConfigurationBootstrap::getHost)) + .ifPresent(bootstrapServers -> properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); } void setKafkaUserConfig(Context context, KafkaUser user, Map properties) { @@ -219,8 +232,59 @@ void setKafkaUserConfig(Context context, KafkaUser user, Map context, Map target, ConfigVars source) { + String namespace = primary.getMetadata().getNamespace(); + + source.getValuesFrom().stream().forEach(fromSource -> { + String prefix = fromSource.getPrefix(); + var configMapRef = fromSource.getConfigMapRef(); + var secretRef = fromSource.getSecretRef(); + + if (configMapRef != null) { + copyData(context, target, ConfigMap.class, namespace, configMapRef.getName(), prefix, configMapRef.getOptional(), ConfigMap::getData); + } + + if (secretRef != null) { + copyData(context, target, Secret.class, namespace, secretRef.getName(), prefix, secretRef.getOptional(), Secret::getData); + } + }); + + source.getValues().forEach(configVar -> target.put(configVar.getName(), configVar.getValue())); + } + + @SuppressWarnings("java:S107") // Ignore Sonar warning for too many args + void copyData(Context context, + Map target, + Class sourceType, + String namespace, + String name, + String prefix, + Boolean optional, + Function> dataProvider) { + + S source = getResource(context, sourceType, namespace, name, Boolean.TRUE.equals(optional)); + + if (source != null) { + copyData(target, dataProvider.apply(source), prefix, Secret.class.equals(sourceType)); + } + } + + void copyData(Map target, Map source, String prefix, boolean decode) { + source.forEach((key, value) -> { + if (prefix != null) { + key = prefix + key; + } + target.put(key, decode ? decodeString(value) : value); + }); + } + static T getResource( Context context, Class resourceType, String namespace, String name) { + return getResource(context, resourceType, namespace, name, false); + } + + static T getResource( + Context context, Class resourceType, String namespace, String name, boolean optional) { T resource = context.getClient() .resources(resourceType) @@ -228,7 +292,7 @@ static T getResource( .withName(name) .get(); - if (resource == null) { + if (resource == null && !optional) { throw new ReconciliationException("No such %s resource: %s/%s".formatted(resourceType.getSimpleName(), namespace, name)); } diff --git a/operator/src/test/example-console.yaml b/operator/src/test/example-console.yaml index 6a5844979..21748f796 100644 --- a/operator/src/test/example-console.yaml +++ b/operator/src/test/example-console.yaml @@ -2,11 +2,18 @@ apiVersion: console.streamshub.github.com/v1alpha1 kind: Console metadata: name: example - namespace: streams-console spec: hostname: example-console.apps-crc.testing kafkaClusters: - - kafkaUserName: console-kafka-user1 + - id: my-console + credentials: + kafkaUser: + name: console-kafka-user1 + #namespace: same as kafkaCluster listener: secure name: console-kafka namespace: streams-console + properties: + values: + - name: x-some-test-property + value: the-value diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index 1a64ea3ea..d4fac62ca 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -16,8 +16,11 @@ import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.ConsoleBuilder; import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.dependents.ConsoleResource; import com.github.streamshub.console.dependents.ConsoleSecret; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Secret; @@ -64,17 +67,20 @@ void setUp() throws Exception { var allConsoles = client.resources(Console.class).inAnyNamespace(); var allKafkas = client.resources(Kafka.class).inAnyNamespace(); var allKafkaUsers = client.resources(KafkaUser.class).inAnyNamespace(); - var allSecrets = client.resources(Secret.class).inAnyNamespace(); + var allConfigMaps = client.resources(ConfigMap.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); + var allSecrets = client.resources(Secret.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); allConsoles.delete(); allKafkas.delete(); allKafkaUsers.delete(); + allConfigMaps.delete(); allSecrets.delete(); await().atMost(LIMIT).untilAsserted(() -> { assertTrue(allConsoles.list().getItems().isEmpty()); assertTrue(allKafkas.list().getItems().isEmpty()); assertTrue(allKafkaUsers.list().getItems().isEmpty()); + assertTrue(allConfigMaps.list().getItems().isEmpty()); assertTrue(allSecrets.list().getItems().isEmpty()); }); @@ -362,6 +368,7 @@ void testConsoleReconciliationWithMissingJaasConfigKey() { .withNewMetadata() .withName("ku1") .withNamespace("ns1") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) .endMetadata() // no data map .build(); @@ -431,6 +438,7 @@ void testConsoleReconciliationWithValidKafkaUser() { .withNewMetadata() .withName("ku1") .withNamespace("ns1") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) .endMetadata() .addToData(SaslConfigs.SASL_JAAS_CONFIG, Base64.getEncoder().encodeToString("jaas-config-value".getBytes())) .build(); @@ -480,6 +488,86 @@ void testConsoleReconciliationWithValidKafkaUser() { }); } + + @Test + void testConsoleReconciliationWithKafkaProperties() { + client.resource(new ConfigMapBuilder() + .withNewMetadata() + .withName("cm-1") + .withNamespace("ns2") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) + .endMetadata() + .addToData("x-consumer-prop-name", "x-consumer-prop-value") + .build()) + .serverSideApply(); + + client.resource(new SecretBuilder() + .withNewMetadata() + .withName("scrt-1") + .withNamespace("ns2") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) + .endMetadata() + .addToData("x-producer-prop-name", + Base64.getEncoder().encodeToString("x-producer-prop-value".getBytes())) + .build()) + .serverSideApply(); + + Console consoleCR = new ConsoleBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName("console-1") + .withNamespace("ns2") + .build()) + .withNewSpec() + .withHostname("example.com") + .addNewKafkaCluster() + .withId("custom-id") + .withName(kafkaCR.getMetadata().getName()) + .withNewProperties() + .addNewValue() + .withName("x-prop-name") + .withValue("x-prop-value") + .endValue() + .endProperties() + .withNewAdminProperties() + .addNewValue() + .withName("x-admin-prop-name") + .withValue("x-admin-prop-value") + .endValue() + .endAdminProperties() + .withNewConsumerProperties() + .addNewValuesFrom() + .withPrefix("extra-") + .withNewConfigMapRef("cm-1", false) + .endValuesFrom() + .addNewValuesFrom() + .withNewConfigMapRef("cm-2", true) + .endValuesFrom() + .endConsumerProperties() + .withNewProducerProperties() + .addNewValuesFrom() + .withNewSecretRef("scrt-1", false) + .endValuesFrom() + .endProducerProperties() + .endKafkaCluster() + .endSpec() + .build(); + + client.resource(consoleCR).create(); + + await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { + var consoleSecret = client.secrets().inNamespace("ns2").withName("console-1-" + ConsoleSecret.NAME).get(); + assertNotNull(consoleSecret); + String configEncoded = consoleSecret.getData().get("console-config.yaml"); + byte[] configDecoded = Base64.getDecoder().decode(configEncoded); + ConsoleConfig config = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + var kafkaConfig = config.getKafka().getClusters().get(0); + assertEquals("x-prop-value", kafkaConfig.getProperties().get("x-prop-name")); + assertEquals("x-admin-prop-value", kafkaConfig.getAdminProperties().get("x-admin-prop-name")); + assertEquals("x-consumer-prop-value", kafkaConfig.getConsumerProperties().get("extra-x-consumer-prop-name")); + assertEquals("x-producer-prop-value", kafkaConfig.getProducerProperties().get("x-producer-prop-name")); + }); + } + // Utility private Deployment setReady(Deployment deployment) { diff --git a/pom.xml b/pom.xml index 83fece39c..97adf458a 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,11 @@ https://sonarcloud.io streamshub api/src/main/java/com/github/streamshub/console/api/support/TrustAllCertificateManager.java + + + common/src/main/java/com/github/streamshub/console/config/*.java, + operator/src/main/java/com/github/streamshub/console/api/v1alpha1/**/*.java + e1,e2 java:S6813 diff --git a/ui/api/kafka/actions.ts b/ui/api/kafka/actions.ts index c01eb8d0e..e1927e1b7 100644 --- a/ui/api/kafka/actions.ts +++ b/ui/api/kafka/actions.ts @@ -76,7 +76,7 @@ export async function getKafkaClusterKpis( ): Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null> { try { const cluster = await getKafkaCluster(clusterId); - if (!cluster) { + if (!cluster?.attributes?.namespace) { return null; } @@ -238,7 +238,7 @@ export async function getKafkaClusterMetrics( try { const cluster = await getKafkaCluster(clusterId); - if (!cluster || !prom) { + if (!cluster?.attributes?.namespace || !prom) { return null; } @@ -246,7 +246,7 @@ export async function getKafkaClusterMetrics( await Promise.all( metrics.map((m) => getRangeByNodeId( - cluster.attributes.namespace, + cluster.attributes.namespace!, cluster.attributes.name, cluster.attributes.nodePools?.join("|") ?? "", m, @@ -303,7 +303,7 @@ export async function getKafkaTopicMetrics( try { const cluster = await getKafkaCluster(clusterId); - if (!cluster || !prom) { + if (!cluster?.attributes?.namespace || !prom) { return null; } @@ -311,7 +311,7 @@ export async function getKafkaTopicMetrics( await Promise.all( metrics.map((m) => getRangeByNodeId( - cluster.attributes.namespace, + cluster.attributes.namespace!, cluster.attributes.name, cluster.attributes.nodePools?.join("|") ?? "", m, diff --git a/ui/api/kafka/schema.ts b/ui/api/kafka/schema.ts index ba05935bf..913986de9 100644 --- a/ui/api/kafka/schema.ts +++ b/ui/api/kafka/schema.ts @@ -15,8 +15,8 @@ export const ClusterListSchema = z.object({ }), attributes: z.object({ name: z.string(), - namespace: z.string(), - kafkaVersion: z.string().optional(), + namespace: z.string().nullable().optional(), + kafkaVersion: z.string().nullable().optional(), }), }); export const ClustersResponseSchema = z.object({ @@ -28,10 +28,10 @@ const ClusterDetailSchema = z.object({ type: z.literal("kafkas"), attributes: z.object({ name: z.string(), - namespace: z.string(), - creationTimestamp: z.string(), - status: z.string(), - kafkaVersion: z.string().optional(), + namespace: z.string().nullable().optional(), + creationTimestamp: z.string().nullable().optional(), + status: z.string().nullable().optional(), + kafkaVersion: z.string().nullable().optional(), nodes: z.array(NodeSchema), controller: NodeSchema, authorizedOperations: z.array(z.string()), @@ -41,7 +41,7 @@ const ClusterDetailSchema = z.object({ bootstrapServers: z.string().nullable(), authType: z.string().nullable(), }), - ), + ).nullable().optional(), conditions: z.array( z.object({ type: z.string().optional(), @@ -50,7 +50,7 @@ const ClusterDetailSchema = z.object({ message: z.string().optional(), lastTransitionTime: z.string().optional(), }), - ), + ).nullable().optional(), nodePools: z.array(z.string()).optional().nullable(), }), }); diff --git a/ui/app/[locale]/home/ClustersTable.tsx b/ui/app/[locale]/home/ClustersTable.tsx index dc6f45863..f3337df5c 100644 --- a/ui/app/[locale]/home/ClustersTable.tsx +++ b/ui/app/[locale]/home/ClustersTable.tsx @@ -91,9 +91,9 @@ export function ClustersTable({ {t("ClustersTable.connection_not_configured")} ); case "version": - return {row.attributes.kafkaVersion ?? "n/a"}; + return {row.attributes.kafkaVersion ?? "Not Available"}; case "namespace": - return {row.attributes.namespace}; + return {row.attributes.namespace ?? "N/A"}; } }} renderActions={({ ActionsColumn, row }) => (