diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 1b95087fb..f226c66ee 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -335,4 +335,8 @@ public void cruiseControlEnabled(boolean cruiseControlEnabled) { public Metrics metrics() { return attributes.metrics; } + + public void metrics(Metrics metrics) { + attributes.metrics = metrics; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index fb53fb0cd..b6963e5d9 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -329,6 +329,7 @@ CompletionStage addMetrics(KafkaCluster cluster, List fiel if (kafkaContext.prometheus() == null) { logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured"); + cluster.metrics(null); return CompletableFuture.completedStage(cluster); } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java index 2753c4a43..84d261ba9 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -61,6 +62,12 @@ public class MetricsService { @Inject KafkaContext kafkaContext; + Optional additionalFilter = Optional.empty(); + + public /* test */ void setAdditionalFilter(Optional additionalFilter) { + this.additionalFilter = additionalFilter; + } + ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) { return new ClientRequestFilter() { @Override @@ -99,11 +106,14 @@ public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfi var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null); - return RestClientBuilder.newBuilder() + RestClientBuilder builder = RestClientBuilder.newBuilder() .baseUri(URI.create(prometheusConfig.getUrl())) .trustStore(trustStore) - .register(createAuthenticationFilter(prometheusConfig)) - .build(PrometheusAPI.class); + .register(createAuthenticationFilter(prometheusConfig)); + + additionalFilter.ifPresent(builder::register); + + return builder.build(PrometheusAPI.class); } return null; diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 71d186103..5264979d5 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -37,6 +37,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; +import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.ListFetchParams; import com.github.streamshub.console.api.service.KafkaClusterService; import com.github.streamshub.console.api.support.ErrorCategory; @@ -739,6 +740,20 @@ void testDescribeClusterWithScram(boolean tls, String expectedProtocol) { containsString(ScramLoginModule.class.getName())); } + @Test + /* + * Tests with metrics enabled are in KafkaClustersResourceMetricsIT + */ + void testDescribeClusterWithMetricsNotEnabled() { + whenRequesting(req -> req + .param("fields[" + KafkaCluster.API_TYPE + "]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka1")) + .body("data.attributes", hasEntry(is("metrics"), nullValue())); + } + @ParameterizedTest @CsvSource({ "true", diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceMetricsIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceMetricsIT.java new file mode 100644 index 000000000..d646d4926 --- /dev/null +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceMetricsIT.java @@ -0,0 +1,323 @@ +package com.github.streamshub.console.api; + +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import jakarta.inject.Inject; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.client.ClientRequestContext; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.eclipse.microprofile.config.Config; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.github.streamshub.console.api.model.KafkaCluster; +import com.github.streamshub.console.api.service.MetricsService; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.PrometheusConfig; +import com.github.streamshub.console.config.PrometheusConfig.Type; +import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; +import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; +import com.github.streamshub.console.test.AdminClientSpy; +import com.github.streamshub.console.test.TestHelper; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.quarkus.test.common.http.TestHTTPEndpoint; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.test.container.StrimziKafkaContainer; + +import static com.github.streamshub.console.test.TestHelper.whenRequesting; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@QuarkusTest +@TestHTTPEndpoint(KafkaClustersResource.class) +@TestProfile(TestPlainProfile.class) +class KafkaClustersResourceMetricsIT implements ClientRequestFilter { + + static final JsonObject EMPTY_METRICS = Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("result", Json.createArrayBuilder())) + .build(); + + @Inject + Config config; + + @Inject + KubernetesClient client; + + @Inject + Map configuredContexts; + + @Inject + ConsoleConfig consoleConfig; + + @Inject + MetricsService metricsService; + + @DeploymentManager.InjectDeploymentManager + DeploymentManager deployments; + + TestHelper utils; + + StrimziKafkaContainer kafkaContainer; + String clusterId1; + URI bootstrapServers; + + Consumer filterQuery; + Consumer filterQueryRange; + + @Override + public void filter(ClientRequestContext requestContext) throws IOException { + var requestUri = requestContext.getUri(); + + if (requestUri.getPath().endsWith("query")) { + filterQuery.accept(requestContext); + } else if (requestUri.getPath().endsWith("query_range")) { + filterQueryRange.accept(requestContext); + } + } + + @BeforeEach + void setup() throws IOException { + filterQuery = ctx -> { /* No-op */ }; + filterQueryRange = ctx -> { /* No-op */ }; + metricsService.setAdditionalFilter(Optional.of(this)); + + /* + * Create a mock Prometheus configuration and point test-kafka1 to use it. A client + * will be created when the Kafka CR is discovered. The request filter mock created + * above is our way to intercept outbound requests and abort them with the desired + * response for each test. + */ + var prometheusConfig = new PrometheusConfig(); + prometheusConfig.setName("test"); + prometheusConfig.setType(Type.fromValue("standalone")); + prometheusConfig.setUrl("http://prometheus.example.com"); + + var prometheusAuthN = new PrometheusConfig.Basic(); + prometheusAuthN.setUsername("pr0m3th3u5"); + prometheusAuthN.setPassword("password42"); + prometheusConfig.setAuthentication(prometheusAuthN); + + consoleConfig.setMetricsSources(List.of(prometheusConfig)); + consoleConfig.getKafka().getCluster("default/test-kafka1").get().setMetricsSource("test"); + + kafkaContainer = deployments.getKafkaContainer(); + bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); + + utils = new TestHelper(bootstrapServers, config, null); + + client.resources(Kafka.class).inAnyNamespace().delete(); + + Kafka kafka1 = new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)) + .editOrNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .addNewKafkaNodePool() + .withName("my-node-pool") + .endKafkaNodePool() + .endStatus() + .build(); + + utils.apply(client, kafka1); + + // Wait for the added cluster to be configured in the context map + await().atMost(10, TimeUnit.SECONDS) + .until(() -> configuredContexts.values() + .stream() + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::clusterKey) + .anyMatch(Cache.metaNamespaceKeyFunc(kafka1)::equals)); + + // Wait for the context map to be populated with all Kafka configurations + //await().atMost(10, TimeUnit.SECONDS).until(() -> configuredContexts.size() == STATIC_KAFKAS.size()); + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + } + + @AfterEach + void teardown() throws IOException { + client.resources(Kafka.class).inAnyNamespace().delete(); + } + + @Test + void testDescribeClusterWithMetricsSetsBasicHeader() { + AtomicReference queryAuthHeader = new AtomicReference<>(); + AtomicReference queryRangeAuthHeader = new AtomicReference<>(); + + filterQuery = ctx -> { + queryAuthHeader.set(ctx.getHeaderString(HttpHeaders.AUTHORIZATION)); + ctx.abortWith(Response.ok(EMPTY_METRICS).build()); + }; + + filterQueryRange = ctx -> { + queryRangeAuthHeader.set(ctx.getHeaderString(HttpHeaders.AUTHORIZATION)); + ctx.abortWith(Response.ok(EMPTY_METRICS).build()); + }; + + whenRequesting(req -> req + .param("fields[" + KafkaCluster.API_TYPE + "]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())); + + String expected = "Basic " + Base64.getEncoder().encodeToString("pr0m3th3u5:password42".getBytes()); + assertEquals(expected, queryAuthHeader.get()); + assertEquals(expected, queryRangeAuthHeader.get()); + } + + @Test + void testDescribeClusterWithEmptyMetrics() { + filterQuery = ctx -> { + ctx.abortWith(Response.ok(EMPTY_METRICS).build()); + }; + + filterQueryRange = ctx -> { + ctx.abortWith(Response.ok(EMPTY_METRICS).build()); + }; + + whenRequesting(req -> req + .param("fields[" + KafkaCluster.API_TYPE + "]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka1")) + .body("data.attributes.metrics", allOf( + hasEntry(is("values"), anEmptyMap()), + hasEntry(is("ranges"), anEmptyMap()))); + } + + @Test + void testDescribeClusterWithMetricsErrors() { + filterQuery = ctx -> { + Response error = Response.status(Status.SERVICE_UNAVAILABLE) + .entity("EXPECTED: Prometheus is not available") + .build(); + throw new WebApplicationException(error); + }; + + filterQueryRange = ctx -> { + throw new RuntimeException("EXPECTED"); + }; + + whenRequesting(req -> req + .param("fields[" + KafkaCluster.API_TYPE + "]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka1")) + .body("data.attributes.metrics", allOf( + hasEntry(is("values"), anEmptyMap()), + hasEntry(is("ranges"), anEmptyMap()))); + } + + @Test + void testDescribeClusterWithMetricsValues() { + Instant t1 = Instant.now().minusSeconds(1); + Instant t2 = Instant.now(); + + filterQuery = ctx -> { + ctx.abortWith(Response.ok(Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("result", Json.createArrayBuilder() + .add(Json.createObjectBuilder() + .add("metric", Json.createObjectBuilder() + .add(MetricsService.METRIC_NAME, "value-metric1") + .add("custom-attribute", "attribute-value")) + .add("value", Json.createArrayBuilder() + .add(t1.toEpochMilli() / 1000f) + .add("42"))))) + .build()) + .build()); + }; + + filterQueryRange = ctx -> { + ctx.abortWith(Response.ok(Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("result", Json.createArrayBuilder() + .add(Json.createObjectBuilder() + .add("metric", Json.createObjectBuilder() + .add(MetricsService.METRIC_NAME, "range-metric1") + .add("custom-attribute", "attribute-value")) + .add("values", Json.createArrayBuilder() + .add(Json.createArrayBuilder() + .add((double) t1.toEpochMilli() / 1000f) + .add("2.718")) + .add(Json.createArrayBuilder() + .add((double) t2.toEpochMilli() / 1000f) + .add("3.1415")))))) + .build()) + .build()); + }; + + whenRequesting(req -> req + .param("fields[" + KafkaCluster.API_TYPE + "]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", equalTo("test-kafka1")) + .body("data.attributes.metrics.values.value-metric1", contains(allOf( + aMapWithSize(2), + hasEntry("value", "42"), + hasEntry("custom-attribute", "attribute-value")))) + .body("data.attributes.metrics.ranges.range-metric1", contains(allOf( + aMapWithSize(2), + //hasEntry("range", arrayContaining("", "")), + hasEntry("custom-attribute", "attribute-value") + ))); + } + + // Helper methods + + static Map mockAdminClient() { + return mockAdminClient(Map.of(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name)); + } + + static Map mockAdminClient(Map overrides) { + Map clientConfig = new HashMap<>(); + + AdminClientSpy.install(config -> { + clientConfig.putAll(config); + + Map newConfig = new HashMap<>(config); + newConfig.putAll(overrides); + return newConfig; + }, client -> { /* No-op */ }); + + return clientConfig; + } +} diff --git a/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java b/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java index 2f330cb0b..3dd88cf49 100644 --- a/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java @@ -14,9 +14,10 @@ @JsonInclude(Include.NON_NULL) public class PrometheusConfig implements Named { - @NotBlank + @NotBlank(message = "Metrics source `name` is required") private String name; private Type type; + @NotBlank(message = "Metrics source `url` is required") private String url; @Valid private Authentication authentication; diff --git a/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java b/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java index b8debb754..41c81e829 100644 --- a/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java +++ b/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java @@ -93,14 +93,20 @@ void testKafkaNameMissingFailsValidation() { } @Test - void testRegistryNamePassesValidation() { + void testKnownReferenceNamesPassValidation() { SchemaRegistryConfig registry = new SchemaRegistryConfig(); registry.setName("known-registry"); registry.setUrl("http://example.com"); config.getSchemaRegistries().add(registry); + PrometheusConfig metrics = new PrometheusConfig(); + metrics.setName("known-prometheus"); + metrics.setUrl("http://example.com"); + config.getMetricsSources().add(metrics); + KafkaClusterConfig cluster = new KafkaClusterConfig(); cluster.setName("name1"); + cluster.setMetricsSource("known-prometheus"); cluster.setSchemaRegistry("known-registry"); config.getKafka().getClusters().add(cluster); @@ -110,15 +116,47 @@ void testRegistryNamePassesValidation() { } @Test - void testUnknownRegistryNameFailsValidation() { + void testUnknownReferenceNamesFailValidation() { KafkaClusterConfig cluster = new KafkaClusterConfig(); cluster.setName("name1"); + cluster.setMetricsSource("unknown-prometheus"); cluster.setSchemaRegistry("unknown-registry"); config.getKafka().getClusters().add(cluster); var violations = validator.validate(config); + assertEquals(2, violations.size()); + List messages = violations.stream().map(ConstraintViolation::getMessage).toList(); + assertTrue(messages.contains("Kafka cluster references an unknown metrics source")); + assertTrue(messages.contains("Kafka cluster references an unknown schema registry")); + } + + @Test + void testMetricsSourceNamesNotUniqueFailsValidation() { + for (String name : List.of("name1", "name2", "name1")) { + PrometheusConfig metrics = new PrometheusConfig(); + metrics.setName(name); + metrics.setUrl("http://example.com"); + config.getMetricsSources().add(metrics); + } + + var violations = validator.validate(config); + assertEquals(1, violations.size()); - assertEquals("Kafka cluster references an unknown schema registry", violations.iterator().next().getMessage()); + assertEquals("Metrics source names must be unique", violations.iterator().next().getMessage()); + } + + @Test + void testMetricsSourceNamesUniquePassesValidation() { + for (String name : List.of("name1", "name2", "name3")) { + PrometheusConfig metrics = new PrometheusConfig(); + metrics.setName(name); + metrics.setUrl("http://example.com"); + config.getMetricsSources().add(metrics); + } + + var violations = validator.validate(config); + + assertTrue(violations.isEmpty()); } } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index 1c0cec1d1..bd3b29787 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -63,6 +63,7 @@ public class ConsoleSecret extends CRUDKubernetesDependentResource implements ConsoleResource { public static final String NAME = "console-secret"; + private static final String EMBEDDED_METRICS_NAME = "streamshub.console.embedded-prometheus"; private static final Random RANDOM = new SecureRandom(); @Inject @@ -140,6 +141,14 @@ private ConsoleConfig buildConfig(Console primary, Context context) { private void addMetricsSources(Console primary, ConsoleConfig config, Context context) { var metricsSources = coalesce(primary.getSpec().getMetricsSources(), Collections::emptyList); + if (metricsSources.isEmpty()) { + var prometheusConfig = new PrometheusConfig(); + prometheusConfig.setName(EMBEDDED_METRICS_NAME); + prometheusConfig.setUrl(prometheusService.getUrl(primary, context)); + config.getMetricsSources().add(prometheusConfig); + return; + } + for (Prometheus prometheus : metricsSources) { var prometheusConfig = new PrometheusConfig(); prometheusConfig.setName(prometheus.getName()); @@ -173,13 +182,6 @@ private void addMetricsSources(Console primary, ConsoleConfig config, Context context) { @@ -218,8 +220,8 @@ private void addConfig(Console primary, Context context, ConsoleConfig kcConfig.setSchemaRegistry(kafkaRef.getSchemaRegistry()); if (kafkaRef.getMetricsSource() == null) { - if (config.getMetricsSources().stream().anyMatch(src -> src.getName().equals("embedded-prometheus"))) { - kcConfig.setMetricsSource("embedded-prometheus"); + if (config.getMetricsSources().stream().anyMatch(src -> src.getName().equals(EMBEDDED_METRICS_NAME))) { + kcConfig.setMetricsSource(EMBEDDED_METRICS_NAME); } } else { kcConfig.setMetricsSource(kafkaRef.getMetricsSource()); diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index bd9ca3d4f..ae7633119 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import jakarta.inject.Inject; @@ -18,19 +19,23 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.ConsoleBuilder; +import com.github.streamshub.console.api.v1alpha1.spec.Prometheus.Type; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.dependents.ConsoleResource; import com.github.streamshub.console.dependents.ConsoleSecret; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionBuilder; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.api.model.RouteBuilder; import io.javaoperatorsdk.operator.Operator; import io.quarkus.test.junit.QuarkusTest; import io.strimzi.api.kafka.Crds; @@ -67,7 +72,7 @@ class ConsoleReconcilerTest { Kafka kafkaCR; - public static > C apply(KubernetesClient client, C resource) { + public static T apply(KubernetesClient client, T resource) { client.resource(resource).serverSideApply(); return client.resource(resource).patchStatus(); } @@ -76,6 +81,36 @@ public static > C apply(KubernetesClient cl void setUp() throws Exception { client.resource(Crds.kafka()).serverSideApply(); client.resource(Crds.kafkaUser()).serverSideApply(); + client.resource(new CustomResourceDefinitionBuilder() + .withNewMetadata() + .withName("routes.route.openshift.io") + .endMetadata() + .withNewSpec() + .withScope("Namespaced") + .withGroup("route.openshift.io") + .addNewVersion() + .withName("v1") + .withNewSubresources() + .withNewStatus() + .endStatus() + .endSubresources() + .withNewSchema() + .withNewOpenAPIV3Schema() + .withType("object") + .withXKubernetesPreserveUnknownFields(true) + .endOpenAPIV3Schema() + .endSchema() + .withStorage(true) + .withServed(true) + .endVersion() + .withNewNames() + .withSingular("route") + .withPlural("routes") + .withKind("Route") + .endNames() + .endSpec() + .build()) + .serverSideApply(); var allConsoles = client.resources(Console.class).inAnyNamespace(); var allKafkas = client.resources(Kafka.class).inAnyNamespace(); @@ -624,24 +659,7 @@ void testConsoleReconciliationWithSchemaRegistryUrl() { client.resource(consoleCR).create(); - await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { - var console = client.resources(Console.class) - .inNamespace(consoleCR.getMetadata().getNamespace()) - .withName(consoleCR.getMetadata().getName()) - .get(); - assertEquals(1, console.getStatus().getConditions().size()); - var ready = console.getStatus().getConditions().get(0); - assertEquals("Ready", ready.getType()); - assertEquals("False", ready.getStatus()); - assertEquals("DependentsNotReady", ready.getReason()); - - var consoleSecret = client.secrets().inNamespace("ns2").withName("console-1-" + ConsoleSecret.NAME).get(); - assertNotNull(consoleSecret); - String configEncoded = consoleSecret.getData().get("console-config.yaml"); - byte[] configDecoded = Base64.getDecoder().decode(configEncoded); - Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded)); - ConsoleConfig consoleConfig = YAML.readValue(configDecoded, ConsoleConfig.class); - + assertConsoleConfig(consoleCR, consoleConfig -> { String registryName = consoleConfig.getSchemaRegistries().get(0).getName(); assertEquals("example-registry", registryName); String registryUrl = consoleConfig.getSchemaRegistries().get(0).getUrl(); @@ -652,8 +670,80 @@ void testConsoleReconciliationWithSchemaRegistryUrl() { }); } + @Test + void testConsoleReconciliationWithOpenShiftMonitoring() { + String thanosQueryHost = "thanos.example.com"; + + client.resource(new NamespaceBuilder() + .withNewMetadata() + .withName("openshift-monitoring") + .endMetadata() + .build()) + .serverSideApply(); + + Route thanosQuerier = new RouteBuilder() + .withNewMetadata() + .withNamespace("openshift-monitoring") + .withName("thanos-querier") + .endMetadata() + .withNewSpec() + .endSpec() + .withNewStatus() + .addNewIngress() + .withHost(thanosQueryHost) + .endIngress() + .endStatus() + .build(); + + apply(client, thanosQuerier); + + Console consoleCR = new ConsoleBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName("console-1") + .withNamespace("ns2") + .build()) + .withNewSpec() + .withHostname("example.com") + .addNewMetricsSource() + .withName("ocp-platform-monitoring") + .withType(Type.OPENSHIFT_MONITORING) + .endMetricsSource() + .addNewKafkaCluster() + .withName(kafkaCR.getMetadata().getName()) + .withNamespace(kafkaCR.getMetadata().getNamespace()) + .withListener(kafkaCR.getSpec().getKafka().getListeners().get(0).getName()) + .withMetricsSource("ocp-platform-monitoring") + .endKafkaCluster() + .endSpec() + .build(); + + client.resource(consoleCR).create(); + + assertConsoleConfig(consoleCR, consoleConfig -> { + String metricsName = consoleConfig.getMetricsSources().get(0).getName(); + assertEquals("ocp-platform-monitoring", metricsName); + String metricsUrl = consoleConfig.getMetricsSources().get(0).getUrl(); + assertEquals("https://" + thanosQueryHost, metricsUrl); + + String metricsRef = consoleConfig.getKafka().getClusters().get(0).getMetricsSource(); + assertEquals("ocp-platform-monitoring", metricsRef); + }); + } + // Utility + private void assertConsoleConfig(Console targetResource, Consumer assertion) { + await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { + var consoleSecret = client.secrets().inNamespace("ns2").withName("console-1-" + ConsoleSecret.NAME).get(); + assertNotNull(consoleSecret); + String configEncoded = consoleSecret.getData().get("console-config.yaml"); + byte[] configDecoded = Base64.getDecoder().decode(configEncoded); + Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded)); + ConsoleConfig consoleConfig = YAML.readValue(configDecoded, ConsoleConfig.class); + assertion.accept(consoleConfig); + }); + } + private Deployment setReady(Deployment deployment) { int desiredReplicas = Optional.ofNullable(deployment.getSpec().getReplicas()).orElse(1);