Skip to content

Commit

Permalink
feat: improve metrics filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 6, 2024
1 parent a21be3c commit d80f5fc
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 60 deletions.
2 changes: 1 addition & 1 deletion canary/canary.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Canary settings
lh.canary.id=dev
lh.canary.id=canary-dev
lh.canary.topic.creation.enable=true
lh.canary.topic.creation.replicas=1
lh.canary.topic.creation.partitions=1
Expand Down
7 changes: 6 additions & 1 deletion canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();

log.info("Canary configurations: {}", config);
log.info("Canary active metrics: {}", config.getEnabledMetrics());
log.info("KafkaAdmin configurations: {}", config.toKafkaAdminConfig());
log.info("KafkaProducer configurations: {}", config.toKafkaProducerConfig());
log.info("KafkaStreams configurations: {}", config.toKafkaStreamsConfig());
Expand All @@ -40,7 +41,11 @@ public static void main(final String[] args) throws IOException, InterruptedExce

private static void initializeBootstraps(final CanaryConfig config) {
final PrometheusExporterBootstrap prometheusExporterBootstrap = new PrometheusExporterBootstrap(
config.getMetricsPort(), config.getMetricsPath(), config.isMetricsFilterEnabled(), config.getId());
config.getMetricsPort(),
config.getMetricsPath(),
config.isMetricsFilterEnabled(),
config.getEnabledMetrics(),
config.getId());

final KafkaTopicBootstrap kafkaTopicBootstrap = new KafkaTopicBootstrap(
config.getTopicName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.littlehorse.canary.aggregator;

import io.littlehorse.canary.aggregator.internal.LatencyMetricExporter;
import io.littlehorse.canary.aggregator.internal.MetricTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.aggregator.topology.TaskRunLatencyTopology;
import io.littlehorse.canary.prometheus.Measurable;
import io.littlehorse.canary.proto.Metric;
import io.littlehorse.canary.proto.MetricAverage;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
Expand All @@ -13,31 +15,24 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@Slf4j
public class AggregatorBootstrap implements Measurable {

private static final Consumed<String, Metric> SERDES =
Consumed.with(Serdes.String(), ProtobufSerdes.Metric()).withTimestampExtractor(new MetricTimeExtractor());
private final KafkaStreams kafkaStreams;
private final KafkaStreamsMetrics kafkaStreamsMetrics;
private final Map<String, Object> kafkaStreamsConfigMap;

public AggregatorBootstrap(final String metricsTopicName, final Map<String, Object> kafkaStreamsConfigMap) {
this.kafkaStreamsConfigMap = kafkaStreamsConfigMap;
kafkaStreams = new KafkaStreams(buildTopology(metricsTopicName), new StreamsConfig(this.kafkaStreamsConfigMap));
Shutdown.addShutdownHook("Aggregator Topology", kafkaStreams);
kafkaStreams.start();

kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams);
Shutdown.addShutdownHook("Aggregator Topology: Prometheus Exporter", kafkaStreamsMetrics);

log.trace("Initialized");
}

Expand All @@ -54,9 +49,17 @@ public String getStateDir() {

@Override
public void bindTo(final MeterRegistry registry) {
final KafkaStreamsMetrics kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams);
Shutdown.addShutdownHook("Aggregator Topology: Prometheus Exporter", kafkaStreamsMetrics);
kafkaStreamsMetrics.bindTo(registry);

final DiskSpaceMetrics diskSpaceMetrics = new DiskSpaceMetrics(new File(getStateDir()));
diskSpaceMetrics.bindTo(registry);

final ReadOnlyKeyValueStore<String, MetricAverage> store = null;
// kafkaStreams.store(
// StoreQueryParameters.fromNameAndType("latency-metrics", QueryableStoreTypes.keyValueStore()));
final LatencyMetricExporter latencyMetricExporter = new LatencyMetricExporter(store);
latencyMetricExporter.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.littlehorse.canary.aggregator.internal;

import io.littlehorse.canary.proto.MetricAverage;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class LatencyMetricExporter implements MeterBinder {

private final ReadOnlyKeyValueStore<String, MetricAverage> store;

public LatencyMetricExporter(final ReadOnlyKeyValueStore<String, MetricAverage> store) {
this.store = store;
}

@Override
public void bindTo(final MeterRegistry registry) {
Gauge.builder("test", () -> (int) (Math.random() * 1000))
.description("Usable space for path")
.baseUnit("ms")
.strongReference(true)
.register(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

@Slf4j
public class TaskRunLatencyTopology {
Expand All @@ -23,12 +22,15 @@ public TaskRunLatencyTopology(final KStream<String, Metric> metricStream) {
.aggregate(
() -> MetricAverage.newBuilder().build(),
(key, value, aggregate) -> aggregate(value, aggregate),
Named.as("test"),
Materialized.with(Serdes.String(), ProtobufSerdes.MetricAverage()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.peek((key, value) -> log.info(
"server={}, count={}, sum={}, avg={}", key, value.getCount(), value.getSum(), value.getAvg()));
.peek((key, value) -> log.debug(
"server={}, count={}, sum={}, avg={}", key, value.getCount(), value.getSum(), value.getAvg()))
.toTable(Materialized.<String, MetricAverage, KeyValueStore<Bytes, byte[]>>as("latency-metrics")
.with(Serdes.String(), ProtobufSerdes.MetricAverage()));
}

private static MetricAverage aggregate(final Metric value, final MetricAverage aggregate) {
Expand Down
59 changes: 46 additions & 13 deletions canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
package io.littlehorse.canary.config;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

public class CanaryConfig implements Config {

public static final String LH_CANARY_PREFIX = "lh.canary.";
public static final String LH_CANARY_KAFKA_PREFIX = LH_CANARY_PREFIX + "kafka.";

public static final String TOPIC_NAME = "topic.name";
public static final String TOPIC_CREATION_PARTITIONS = "topic.creation.partitions";
public static final String TOPIC_CREATION_REPLICAS = "topic.creation.replicas";
public static final String METRONOME_ENABLE = "metronome.enable";
public static final String AGGREGATOR_ENABLE = "aggregator.enable";
public static final String METRONOME_FREQUENCY_MS = "metronome.frequency.ms";
public static final String METRONOME_THREADS = "metronome.threads";
public static final String METRONOME_RUNS = "metronome.runs";
public static final String API_PORT = "api.port";
public static final String ID = "id";
public static final String METRICS_PORT = "metrics.port";
public static final String METRICS_PATH = "metrics.path";
public static final String METRICS_FILTER_ENABLE = "metrics.filter.enable";
private final Map<String, Object> configs;

public CanaryConfig(final Map<String, Object> configs) {
Expand All @@ -14,6 +31,10 @@ public CanaryConfig(final Map<String, Object> configs) {
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
}

public static String prefix(final String configName) {
return LH_CANARY_PREFIX + configName;
}

@Override
public Map<String, Object> toMap() {
return configs;
Expand Down Expand Up @@ -41,54 +62,66 @@ public KafkaStreamsConfig toKafkaStreamsConfig() {
}

public String getTopicName() {
return configs.get(LH_CANARY_TOPIC_NAME).toString();
return getConfig(TOPIC_NAME);
}

private String getConfig(final String configName) {
return configs.get(prefix(configName)).toString();
}

public int getTopicPartitions() {
return Integer.parseInt(configs.get(LH_CANARY_TOPIC_CREATION_PARTITIONS).toString());
return Integer.parseInt(getConfig(TOPIC_CREATION_PARTITIONS));
}

public int getApiPort() {
return Integer.parseInt(configs.get(LH_CANARY_API_PORT).toString());
return Integer.parseInt(getConfig(API_PORT));
}

public int getMetricsPort() {
return Integer.parseInt(configs.get(LH_CANARY_METRICS_PORT).toString());
return Integer.parseInt(getConfig(METRICS_PORT));
}

public String getMetricsPath() {
return configs.get(LH_CANARY_METRICS_PATH).toString();
return getConfig(METRICS_PATH);
}

public boolean isMetricsFilterEnabled() {
return Boolean.parseBoolean(configs.get(LH_CANARY_METRICS_FILTER_ENABLE).toString());
return Boolean.parseBoolean(getConfig(METRICS_FILTER_ENABLE));
}

public short getTopicReplicas() {
return Short.parseShort(configs.get(LH_CANARY_TOPIC_CREATION_REPLICAS).toString());
return Short.parseShort(getConfig(TOPIC_CREATION_REPLICAS));
}

public boolean isMetronomeEnabled() {
return Boolean.parseBoolean(configs.get(LH_CANARY_METRONOME_ENABLE).toString());
return Boolean.parseBoolean(getConfig(METRONOME_ENABLE));
}

public boolean isAggregatorEnabled() {
return Boolean.parseBoolean(configs.get(LH_CANARY_AGGREGATOR_ENABLE).toString());
return Boolean.parseBoolean(getConfig(AGGREGATOR_ENABLE));
}

public long getMetronomeFrequency() {
return Long.parseLong(configs.get(LH_CANARY_METRONOME_FREQUENCY_MS).toString());
return Long.parseLong(getConfig(METRONOME_FREQUENCY_MS));
}

public int getMetronomeThreads() {
return Integer.parseInt(configs.get(LH_CANARY_METRONOME_THREADS).toString());
return Integer.parseInt(getConfig(METRONOME_THREADS));
}

public int getMetronomeRuns() {
return Integer.parseInt(configs.get(LH_CANARY_METRONOME_RUNS).toString());
return Integer.parseInt(getConfig(METRONOME_RUNS));
}

public String getId() {
return configs.get(LH_CANARY_ID).toString();
return getConfig(ID);
}

public List<String> getEnabledMetrics() {
return configs.entrySet().stream()
.filter(entry -> entry.getKey().matches("%s\\[\\d+\\]".formatted(prefix(METRICS_FILTER_ENABLE))))
.map(Entry::getValue)
.map(Object::toString)
.toList();
}
}
16 changes: 0 additions & 16 deletions canary/src/main/java/io/littlehorse/canary/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,6 @@
import java.util.Map;

public interface Config {
String LH_CANARY_PREFIX = "lh.canary.";

String LH_CANARY_KAFKA_PREFIX = LH_CANARY_PREFIX + "kafka.";
String LH_CANARY_TOPIC_NAME = LH_CANARY_PREFIX + "topic.name";
String LH_CANARY_TOPIC_CREATION_PARTITIONS = LH_CANARY_PREFIX + "topic.creation.partitions";
String LH_CANARY_TOPIC_CREATION_REPLICAS = LH_CANARY_PREFIX + "topic.creation.replicas";
String LH_CANARY_METRONOME_ENABLE = LH_CANARY_PREFIX + "metronome.enable";
String LH_CANARY_AGGREGATOR_ENABLE = LH_CANARY_PREFIX + "aggregator.enable";
String LH_CANARY_METRONOME_FREQUENCY_MS = LH_CANARY_PREFIX + "metronome.frequency.ms";
String LH_CANARY_METRONOME_THREADS = LH_CANARY_PREFIX + "metronome.threads";
String LH_CANARY_METRONOME_RUNS = LH_CANARY_PREFIX + "metronome.runs";
String LH_CANARY_API_PORT = LH_CANARY_PREFIX + "api.port";
String LH_CANARY_ID = LH_CANARY_PREFIX + "id";
String LH_CANARY_METRICS_PORT = LH_CANARY_PREFIX + "metrics.port";
String LH_CANARY_METRICS_PATH = LH_CANARY_PREFIX + "metrics.path";
String LH_CANARY_METRICS_FILTER_ENABLE = LH_CANARY_PREFIX + "metrics.filter.enable";

Map<String, Object> toMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ public class KafkaAdminConfig implements Config {

public KafkaAdminConfig(final Map<String, Object> configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> entry.getKey().startsWith(CanaryConfig.LH_CANARY_KAFKA_PREFIX))
.map(entry ->
entry(entry.getKey().substring(CanaryConfig.LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> AdminClientConfig.configNames().contains(entry.getKey()))
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ public class KafkaProducerConfig implements Config {

public KafkaProducerConfig(final Map<String, Object> configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> entry.getKey().startsWith(CanaryConfig.LH_CANARY_KAFKA_PREFIX))
.map(entry ->
entry(entry.getKey().substring(CanaryConfig.LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> ProducerConfig.configNames().contains(entry.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ public class KafkaStreamsConfig implements Config {

public KafkaStreamsConfig(final Map<String, Object> configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> entry.getKey().startsWith(CanaryConfig.LH_CANARY_KAFKA_PREFIX))
.map(entry ->
entry(entry.getKey().substring(CanaryConfig.LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
.filter(entry -> StreamsConfig.configDef().names().contains(entry.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public class LittleHorseConfig implements Config {

public LittleHorseConfig(final Map<String, Object> configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_PREFIX))
.filter(entry -> entry.getKey().startsWith(CanaryConfig.LH_CANARY_PREFIX))
.map(entry -> {
final String formattedKey = entry.getKey()
.substring(LH_CANARY_PREFIX.length())
.substring(CanaryConfig.LH_CANARY_PREFIX.length())
.toUpperCase()
.replace(".", "_")
.replace("-", "_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.List;

public class PrometheusExporterBootstrap {

private final PrometheusMeterRegistry prometheusRegistry;
private final PrometheusExporterServer prometheusExporterServer;

public PrometheusExporterBootstrap(
final int webPort, final String webPath, final boolean metricsFilterEnabled, final String applicationId) {
final int webPort,
final String webPath,
final boolean filterMetrics,
final List<String> enabledMetrics,
final String applicationId) {
prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
prometheusRegistry.config().commonTags("application_id", applicationId);
if (metricsFilterEnabled) {
prometheusRegistry.config().meterFilter(new PrometheusMetricFilter());
if (filterMetrics) {
prometheusRegistry.config().meterFilter(new PrometheusMetricFilter(enabledMetrics));
}
Shutdown.addShutdownHook("Prometheus Exporter", prometheusRegistry::close);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
import java.util.List;

public class PrometheusMetricFilter implements MeterFilter {
public static final List<String> RULES = List.of("jvm_memory_used_bytes");
private final List<String> rules;

public PrometheusMetricFilter(final List<String> rules) {
if (rules == null) {
this.rules = List.of();
} else {
this.rules = rules;
}
}

@Override
public MeterFilterReply accept(final Meter.Id id) {
Expand All @@ -16,7 +24,7 @@ public MeterFilterReply accept(final Meter.Id id) {

final String metricName = "%s_%s".formatted(id.getName().replace(".", "_"), id.getBaseUnit());

for (String rule : RULES) {
for (String rule : rules) {
if (rule.equals(metricName)) {
return MeterFilterReply.ACCEPT;
}
Expand Down
Loading

0 comments on commit d80f5fc

Please sign in to comment.