From 53655cc196baddac018582b22febe9aa0fdc00ec Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 1 Aug 2024 12:10:50 -0400 Subject: [PATCH] Refine ClientFactory config handling, add SCRAM/OAuth tests Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 112 ++++++++--- .../console/api/KafkaClustersResourceIT.java | 184 ++++++++++++------ .../kafka/systemtest/TestPlainProfile.java | 2 - install/000-install-dependency-operators.sh | 0 install/001-deploy-prometheus.sh | 0 5 files changed, 207 insertions(+), 91 deletions(-) mode change 100644 => 100755 install/000-install-dependency-operators.sh mode change 100644 => 100755 install/001-deploy-prometheus.sh diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 251f75d21..cccbd5e54 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -6,6 +6,7 @@ import java.nio.file.Path; import java.util.Base64; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -228,7 +229,7 @@ Map produceKafkaContexts(ConsoleConfig consoleConfig, adminBuilder, false)); - return contexts; + return Collections.unmodifiableMap(contexts); } void addKafkaEventHandler(Map contexts, @@ -237,32 +238,52 @@ void addKafkaEventHandler(Map contexts, kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler() { public void onAdd(Kafka kafka) { - findConfig(kafka).ifPresent(clusterConfig -> putKafkaContext(contexts, - clusterConfig, - Optional.of(kafka), - adminBuilder, - false)); + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s added", Cache.metaNamespaceKeyFunc(kafka)); + } + findConfig(kafka).ifPresentOrElse( + clusterConfig -> { + if (defaultedClusterId(clusterConfig, Optional.of(kafka))) { + log.debugf("Ignoring added Kafka resource %s, cluster ID not yet available and not provided via configuration", + Cache.metaNamespaceKeyFunc(kafka)); + } else { + putKafkaContext(contexts, + clusterConfig, + Optional.of(kafka), + adminBuilder, + false); + } + }, + () -> log.debugf("Ignoring added Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka))); } public void onUpdate(Kafka oldKafka, Kafka newKafka) { - findConfig(newKafka).ifPresent(clusterConfig -> putKafkaContext(contexts, + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s updated", Cache.metaNamespaceKeyFunc(oldKafka)); + } + findConfig(newKafka).ifPresentOrElse( + clusterConfig -> putKafkaContext(contexts, clusterConfig, Optional.of(newKafka), adminBuilder, - true)); + true), + () -> log.debugf("Ignoring updated Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(newKafka))); } public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { - findConfig(kafka).ifPresent(clusterConfig -> { - String clusterKey = clusterConfig.clusterKey(); - String clusterId = Optional.ofNullable(clusterConfig.getId()) - .or(() -> Optional.ofNullable(kafka.getStatus()).map(KafkaStatus::getClusterId)) - .orElse(null); - log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); - log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); - KafkaContext previous = contexts.remove(clusterId); - Optional.ofNullable(previous).ifPresent(KafkaContext::close); - }); + if (log.isDebugEnabled()) { + log.debugf("Kafka resource %s deleted", Cache.metaNamespaceKeyFunc(kafka)); + } + findConfig(kafka).ifPresentOrElse( + clusterConfig -> { + String clusterKey = clusterConfig.clusterKey(); + String clusterId = clusterId(clusterConfig, Optional.of(kafka)); + log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); + log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); + KafkaContext previous = contexts.remove(clusterId); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); + }, + () -> log.debugf("Ignoring deleted Kafka resource %s, not found in configuration", Cache.metaNamespaceKeyFunc(kafka))); } Optional findConfig(Kafka kafka) { @@ -305,9 +326,9 @@ void putKafkaContext(Map contexts, kafkaResource); Map, Map> clientConfigs = new HashMap<>(); - clientConfigs.put(Admin.class, adminConfigs); - clientConfigs.put(Consumer.class, consumerConfigs); - clientConfigs.put(Producer.class, producerConfigs); + clientConfigs.put(Admin.class, Collections.unmodifiableMap(adminConfigs)); + clientConfigs.put(Consumer.class, Collections.unmodifiableMap(consumerConfigs)); + clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs)); Admin admin = null; @@ -316,29 +337,58 @@ void putKafkaContext(Map contexts, } String clusterKey = clusterConfig.clusterKey(); - String clusterId = Optional.ofNullable(clusterConfig.getId()) - .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElseGet(clusterConfig::getName); + String clusterId = clusterId(clusterConfig, kafkaResource); if (!replace && contexts.containsKey(clusterId)) { log.warnf(""" Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ configuration must be unique and may not match id values of \ clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); - } else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) { - if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) { + } else { + boolean truststoreNowRequired = truststoreRequired(adminConfigs); + + if (resourceStatusDroppedCertificates(contexts.get(clusterId), kafkaResource, truststoreNowRequired)) { log.warnf(""" Ignoring update to Kafka custom resource %s. Connection requires \ trusted certificate which is no longer available.""", clusterKey); + } else { + if (truststoreNowRequired && kafkaResource.isPresent()) { + log.warnf(""" + Connection requires trusted certificate(s) which are not present \ + in the Kafka CR status of resource %s.""", clusterKey); + } + + KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); + log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); + KafkaContext previous = contexts.put(clusterId, ctx); + Optional.ofNullable(previous).ifPresent(KafkaContext::close); } - } else { - KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); - log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); - KafkaContext previous = contexts.put(clusterId, ctx); - Optional.ofNullable(previous).ifPresent(KafkaContext::close); } } + boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { + return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty(); + } + + String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { + return Optional.ofNullable(clusterConfig.getId()) + .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) + .orElseGet(clusterConfig::getName); + } + + /** + * Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being + * removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore + * this update and keep the old KafkaContext. + */ + boolean resourceStatusDroppedCertificates(KafkaContext context, Optional kafkaResource, boolean truststoreNowRequired) { + if (!truststoreNowRequired || context == null || kafkaResource.isEmpty()) { + return false; + } + + return !truststoreRequired(context.configs(Admin.class)); + } + Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> store.getByKey(clusterConfig.clusterKey())) diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 879f392ab..aec59c4d1 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -24,12 +24,13 @@ import jakarta.ws.rs.core.Response.Status; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.eclipse.microprofile.config.Config; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -56,7 +57,9 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuthBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512Builder; +import io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler; import io.strimzi.test.container.StrimziKafkaContainer; import static com.github.streamshub.console.test.TestHelper.whenRequesting; @@ -65,7 +68,6 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -75,6 +77,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -491,32 +494,6 @@ void testListClustersWithUnexpectedPageCursorData() { .body("errors.source.parameter", contains("page[after]")); } - @Test - @Disabled("Only configured clusters are returned at this time") - void testListClustersWithUnconfiguredCluster() { - String clusterId = UUID.randomUUID().toString(); - String clusterName = "test-kafka-" + clusterId; - - // Create a Kafka CR with SCRAM-SHA that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource(clusterName, clusterId, bootstrapServers)) - .create(); - - // Wait for the informer cache to be populated with all Kafka CRs - await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 3)); - - whenRequesting(req -> req.get()) - .assertThat() - .statusCode(is(Status.OK.getStatusCode())) - .body("data.size()", equalTo(3)) - .body("data.id", containsInAnyOrder(clusterId1, clusterId2, clusterId)) - .body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2", clusterName)) - .body("data.find { it.attributes.name == 'test-kafka1'}.meta.configured", is(true)) - .body("data.find { it.attributes.name == 'test-kafka2'}.meta.configured", is(true)) - .body("data.find { it.attributes.name == '" + clusterName + "'}.meta.configured", is(false)); - } - @Test void testDescribeClusterWithCustomAuthType() { mockAdminClient(); @@ -539,10 +516,17 @@ void testDescribeClusterWithCertificates() { String clusterId = UUID.randomUUID().toString(); /* - * Create a Kafka CR with OAuth that proxies to kafka1. - * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL + * Create a Kafka CR that proxies to kafka1. + * test-kafka3 is predefined in KafkaUnsecuredResourceManager */ Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(true) + .endListener() + .endKafka() + .endSpec() .editStatus() .editMatchingListener(l -> "listener0".equals(l.getName())) .addToCertificates(""" @@ -563,7 +547,8 @@ void testDescribeClusterWithCertificates() { whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() - .statusCode(is(Status.OK.getStatusCode())); + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka3")); // Ignoring response data since they are from test-kafka-1 assertEquals("SSL", clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); @@ -577,14 +562,24 @@ void testDescribeClusterWithTLSMissingCertificates() { /* * Create a Kafka CR without certificates - * test-kafka3 is predefined in KafkaUnsecuredResourceManager with SSL + * test-kafka3 is predefined in KafkaUnsecuredResourceManager */ Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers)) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(true) + .endListener() + .endKafka() + .endSpec() .build(); Map clientConfig = mockAdminClient(); - client.resources(Kafka.class).resource(kafka).create(); + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); await().atMost(Duration.ofSeconds(5)) .until(() -> kafkaInformer.get() @@ -597,29 +592,13 @@ void testDescribeClusterWithTLSMissingCertificates() { whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() - .statusCode(is(Status.NOT_FOUND.getStatusCode())); + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka3")); // Ignoring response data since they are from test-kafka-1 - // adminBuilder was never called to update configuration - assertThat(clientConfig, is(anEmptyMap())); - } - - @Test - void testDescribeClusterWithScram() { - String clusterId = UUID.randomUUID().toString(); - - // Create a Kafka CR with SCRAM-SHA that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, - new KafkaListenerAuthenticationScramSha512Builder().build())) - .create(); - - whenRequesting(req -> req.get("{clusterId}", clusterId)) - .assertThat() - .statusCode(is(Status.NOT_FOUND.getStatusCode())) - .body("errors.size()", is(1)) - .body("errors.status", contains("404")) - .body("errors.code", contains("4041")); + assertEquals("SSL", clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertThat(clientConfig, not(hasKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))); + assertThat(clientConfig, not(hasKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG))); } @Test @@ -627,10 +606,8 @@ void testDescribeClusterWithCustomNonOAuth() { String clusterId = UUID.randomUUID().toString(); // Create a Kafka CR with generic custom authentication that proxies to kafka1 - client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, - new KafkaListenerAuthenticationCustomBuilder().build())) - .create(); + utils.apply(client, utils.buildKafkaResource("test-kafka-" + clusterId, clusterId, bootstrapServers, + new KafkaListenerAuthenticationCustomBuilder().build())); whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() @@ -660,6 +637,97 @@ void testDescribeClusterWithNoSuchCluster() { .body("errors.code", contains("4041")); } + @ParameterizedTest + @CsvSource({ + "true, SASL_SSL", + "false, SASL_PLAINTEXT" + }) + void testDescribeClusterWithOAuthTokenUrl(boolean tls, String expectedProtocol) { + String clusterId = UUID.randomUUID().toString(); + + /* + * Create a Kafka CR that proxies to kafka1. + * test-kafka3 is predefined in KafkaUnsecuredResourceManager + */ + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, + new KafkaListenerAuthenticationOAuthBuilder() + .withTokenEndpointUri("https://example.com/token") + .build())) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(tls) + .endListener() + .endKafka() + .endSpec() + .build(); + + Map clientConfig = mockAdminClient(); + + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); + + whenRequesting(req -> req + .auth().oauth2("my-secure-token") + .get("{clusterId}", clusterId)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", is("test-kafka3")) + .body("data.meta.authentication.method", is("oauth")) + .body("data.meta.authentication.tokenUrl", is("https://example.com/token")); + + assertEquals(expectedProtocol, clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertEquals("OAUTHBEARER", clientConfig.get(SaslConfigs.SASL_MECHANISM)); + assertEquals(JaasClientOauthLoginCallbackHandler.class.getName(), + clientConfig.get(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS)); + } + + @ParameterizedTest + @CsvSource({ + "true, SASL_SSL", + "false, SASL_PLAINTEXT" + }) + void testDescribeClusterWithScram(boolean tls, String expectedProtocol) { + String clusterId = UUID.randomUUID().toString(); + + /* + * Create a Kafka CR with SCRAM-SHA that proxies to kafka1 + * test-kafka3 is predefined in KafkaUnsecuredResourceManager + */ + Kafka kafka = new KafkaBuilder(utils.buildKafkaResource("test-kafka3", clusterId, bootstrapServers, + new KafkaListenerAuthenticationScramSha512Builder().build())) + .editSpec() + .editKafka() + .editMatchingListener(l -> "listener0".equals(l.getName())) + .withTls(tls) + .endListener() + .endKafka() + .endSpec() + .build(); + + Map clientConfig = mockAdminClient(); + + utils.apply(client, kafka); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> configuredContexts.containsKey(clusterId)); + + whenRequesting(req -> req + .auth().basic("u", "p") + .get("{clusterId}", clusterId)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", is("test-kafka3")) + .body("data.meta.authentication.method", is("basic")); + + assertEquals(expectedProtocol, clientConfig.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + assertEquals("SCRAM-SHA-512", clientConfig.get(SaslConfigs.SASL_MECHANISM)); + assertThat(String.valueOf(clientConfig.get(SaslConfigs.SASL_JAAS_CONFIG)), + containsString(ScramLoginModule.class.getName())); + } + // Helper methods static Map mockAdminClient() { diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index 340b7c08d..2f1b5e290 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -52,8 +52,6 @@ public Map getConfigOverrides() { namespace: default # listener is named and bootstrap.servers not set (will be retrieved from Kafka CR) listener: listener0 - properties: - security.protocol: SSL """); return Map.of("console.config-path", configFile.getAbsolutePath()); diff --git a/install/000-install-dependency-operators.sh b/install/000-install-dependency-operators.sh old mode 100644 new mode 100755 diff --git a/install/001-deploy-prometheus.sh b/install/001-deploy-prometheus.sh old mode 100644 new mode 100755