Skip to content

Commit

Permalink
Metrics via API, per-cluster Prometheus Config, OCP monitoring support
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Nov 1, 2024
1 parent b1ef079 commit ec9683b
Show file tree
Hide file tree
Showing 66 changed files with 1,505 additions and 950 deletions.
8 changes: 4 additions & 4 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,10 @@
<quarkus.profile>build</quarkus.profile>
<quarkus.docker.dockerfile-jvm-path>src/main/docker/Dockerfile</quarkus.docker.dockerfile-jvm-path>
<quarkus.container-image.build>true</quarkus.container-image.build>
<quarkus.container-image.registry>${docker.registry}</quarkus.container-image.registry>
<quarkus.container-image.group>${docker.group}</quarkus.container-image.group>
<quarkus.container-image.tag>${docker.tag}</quarkus.container-image.tag>
<quarkus.container-image.push>${docker.push}</quarkus.container-image.push>
<quarkus.container-image.registry>${container-image.registry}</quarkus.container-image.registry>
<quarkus.container-image.group>${container-image.group}</quarkus.container-image.group>
<quarkus.container-image.tag>${container-image.tag}</quarkus.container-image.tag>
<quarkus.container-image.push>${container-image.push}</quarkus.container-image.push>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.service.MetricsService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
Expand Down Expand Up @@ -154,6 +155,9 @@ public class ClientFactory {
@Named("kafkaAdminFilter")
UnaryOperator<Admin> kafkaAdminFilter = UnaryOperator.identity();

@Inject
MetricsService metricsService;

@Produces
@ApplicationScoped
Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Admin> adminBuilder) {
Expand All @@ -168,7 +172,7 @@ Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Adm
consoleConfig.getKafka().getClusters()
.stream()
.filter(c -> cachedKafkaResource(c).isEmpty())
.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
//.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
.forEach(clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.empty(),
Expand Down Expand Up @@ -302,6 +306,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(registryConfig, mapper);
ctx.prometheus(metricsService.createClient(consoleConfig, clusterConfig));

KafkaContext previous = contexts.put(clusterId, ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ static class Attributes {
@JsonProperty
boolean cruiseControlEnabled;

@JsonProperty
Metrics metrics = new Metrics();

Attributes(List<Node> nodes, Node controller, List<String> authorizedOperations) {
this.nodes = nodes;
this.controller = controller;
Expand Down Expand Up @@ -328,4 +331,8 @@ public void nodePools(List<String> nodePools) {
public void cruiseControlEnabled(boolean cruiseControlEnabled) {
attributes.cruiseControlEnabled = cruiseControlEnabled;
}

public Metrics metrics() {
return attributes.metrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.github.streamshub.console.api.model;

import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.eclipse.microprofile.openapi.annotations.media.Schema;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

public record Metrics(
@JsonProperty
Map<String, List<Metrics.ValueMetric>> values,

@JsonProperty
Map<String, List<Metrics.RangeMetric>> ranges) {

public Metrics() {
this(new LinkedHashMap<>(), new LinkedHashMap<>());
}

@Schema(additionalProperties = String.class)
public static record ValueMetric(
@JsonProperty
String value,

@JsonAnyGetter
@Schema(hidden = true)
Map<String, String> attributes) {
}

@Schema(additionalProperties = String.class)
public static record RangeMetric(
@JsonProperty
@Schema(implementation = String[][].class)
List<RangeEntry> range,

@JsonAnyGetter
@Schema(hidden = true)
Map<String, String> attributes) {
}

@JsonFormat(shape = JsonFormat.Shape.ARRAY)
@JsonPropertyOrder({"when", "value"})
public static record RangeEntry(Instant when, String value) {
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.github.streamshub.console.api.service;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -76,6 +80,9 @@ public class KafkaClusterService {
@Inject
ConsoleConfig consoleConfig;

@Inject
MetricsService metricsService;

@Inject
/**
* All Kafka contexts known to the application
Expand Down Expand Up @@ -148,6 +155,7 @@ public CompletionStage<KafkaCluster> describeCluster(List<String> fields) {
enumNames(get(result::authorizedOperations))))
.thenApplyAsync(this::addKafkaContextData, threadContext.currentContextExecutor())
.thenApply(this::addKafkaResourceData)
.thenCompose(cluster -> addMetrics(cluster, fields))
.thenApply(this::setManaged);
}

Expand Down Expand Up @@ -313,6 +321,41 @@ KafkaCluster setManaged(KafkaCluster cluster) {
return cluster;
}


CompletionStage<KafkaCluster> addMetrics(KafkaCluster cluster, List<String> fields) {
if (!fields.contains(KafkaCluster.Fields.METRICS)) {
return CompletableFuture.completedStage(cluster);
}

if (kafkaContext.prometheus() == null) {
logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured");
return CompletableFuture.completedStage(cluster);
}

String namespace = cluster.namespace();
String name = cluster.name();
String rangeQuery;
String valueQuery;

try (var rangesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_ranges.promql");
var valuesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_values.promql")) {
rangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
valueQuery = new String(valuesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

var rangeResults = metricsService.queryRanges(rangeQuery).toCompletableFuture();
var valueResults = metricsService.queryValues(valueQuery).toCompletableFuture();

return CompletableFuture.allOf(
rangeResults.thenAccept(cluster.metrics().ranges()::putAll),
valueResults.thenAccept(cluster.metrics().values()::putAll))
.thenApply(nothing -> cluster);
}

private Optional<Kafka> findCluster(KafkaCluster cluster) {
return findCluster(Cache.namespaceKeyFunc(cluster.namespace(), cluster.name()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package com.github.streamshub.console.api.service;

import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.json.JsonArray;
import jakarta.json.JsonObject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.ClientRequestContext;
import jakarta.ws.rs.client.ClientRequestFilter;
import jakarta.ws.rs.core.HttpHeaders;

import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.jboss.logging.Logger;

import com.github.streamshub.console.api.model.Metrics;
import com.github.streamshub.console.api.model.Metrics.RangeEntry;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.PrometheusAPI;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
import com.github.streamshub.console.config.PrometheusConfig;
import com.github.streamshub.console.config.PrometheusConfig.Type;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

@ApplicationScoped
public class MetricsService {

public static final String METRIC_NAME = "__console_metric_name__";

@Inject
Logger logger;

@Inject
TlsConfigurationRegistry certificates;

@Inject
KubernetesClient k8s;

@Inject
KafkaContext kafkaContext;

ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) {
return new ClientRequestFilter() {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
var authConfig = config.getAuthentication();
String authHeader = null;

if (authConfig instanceof PrometheusConfig.Basic basic) {
authHeader = "Basic " + Base64.getEncoder().encodeToString("%s:%s".formatted(
basic.getUsername(),
basic.getPassword())
.getBytes());
} else if (authConfig instanceof PrometheusConfig.Bearer bearer) {
authHeader = "Bearer " + bearer.getToken();
} else if (config.getType() == Type.OPENSHIFT_MONITORING) {
// ServiceAccount needs cluster role `cluster-monitoring-view`
authHeader = "Bearer " + k8s.getConfiguration().getAutoOAuthToken();
}

if (authHeader != null) {
requestContext.getHeaders().add(HttpHeaders.AUTHORIZATION, authHeader);
}
}
};
}

public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) {
PrometheusConfig prometheusConfig;

if (clusterConfig.getMetricsSource() != null) {
prometheusConfig = consoleConfig.getMetricsSources()
.stream()
.filter(source -> source.getName().equals(clusterConfig.getMetricsSource()))
.findFirst()
.orElseThrow();

var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null);

return RestClientBuilder.newBuilder()
.baseUri(URI.create(prometheusConfig.getUrl()))
.trustStore(trustStore)
.register(createAuthenticationFilter(prometheusConfig))
.build(PrometheusAPI.class);
}

return null;
}

CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query) {
PrometheusAPI prometheusAPI = kafkaContext.prometheus();

return fetchMetrics(
() -> prometheusAPI.query(query, Instant.now()),
(metric, attributes) -> {
// ignore timestamp in first position
String value = metric.getJsonArray("value").getString(1);
return new Metrics.ValueMetric(value, attributes);
});
}

CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query) {
PrometheusAPI prometheusAPI = kafkaContext.prometheus();

return fetchMetrics(
() -> {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(30, ChronoUnit.MINUTES);
Instant end = now;
return prometheusAPI.queryRange(query, start, end, "25");
},
(metric, attributes) -> {
List<RangeEntry> values = metric.getJsonArray("values")
.stream()
.map(JsonArray.class::cast)
.map(e -> new Metrics.RangeEntry(
Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)),
e.getString(1)
))
.toList();

return new Metrics.RangeMetric(values, attributes);
});
}

<M> CompletionStage<Map<String, List<M>>> fetchMetrics(
Supplier<JsonObject> operation,
BiFunction<JsonObject, Map<String, String>, M> builder) {

return CompletableFuture.supplyAsync(() -> {
try {
return extractMetrics(operation.get(), builder);
} catch (WebApplicationException wae) {
logger.warnf("Failed to retrieve Kafka cluster metrics, status %d: %s",
wae.getResponse().getStatus(),
wae.getResponse().getEntity());
return Collections.emptyMap();
} catch (Exception e) {
logger.warnf(e, "Failed to retrieve Kafka cluster metrics");
return Collections.emptyMap();
}
});
}

<M> Map<String, List<M>> extractMetrics(JsonObject response,
BiFunction<JsonObject, Map<String, String>, M> builder) {

return response.getJsonObject("data").getJsonArray("result")
.stream()
.map(JsonObject.class::cast)
.map(metric -> {
JsonObject meta = metric.getJsonObject("metric");
String metricName = meta.getString(METRIC_NAME);

Map<String, String> attributes = meta.keySet()
.stream()
.filter(Predicate.not(METRIC_NAME::equals))
.map(key -> Map.entry(key, meta.getString(key)))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));

return Map.entry(metricName, builder.apply(metric, attributes));
})
.collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList())));
}
}
Loading

0 comments on commit ec9683b

Please sign in to comment.