Skip to content

Commit

Permalink
fix: latency metric
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 7, 2024
1 parent 3701cdd commit f790858
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 40 deletions.
2 changes: 1 addition & 1 deletion canary/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {

// Kafka
implementation 'org.apache.kafka:kafka-streams:3.6.1'
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.1'

// Web API
implementation 'io.javalin:javalin:6.0.1'
Expand All @@ -44,7 +45,6 @@ dependencies {
testImplementation 'org.mockito:mockito-core:5.10.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.10.0'
testImplementation 'net.datafaker:datafaker:2.1.0'
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.1'
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package io.littlehorse.canary.aggregator;

import io.littlehorse.canary.Bootstrap;
import io.littlehorse.canary.aggregator.internal.LatencyMetricExporter;
import io.littlehorse.canary.aggregator.internal.MetricTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.aggregator.topology.TaskRunLatencyTopology;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.config.KafkaStreamsConfig;
import io.littlehorse.canary.prometheus.Measurable;
import io.littlehorse.canary.proto.Metric;
import io.littlehorse.canary.proto.MetricAverage;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import io.micrometer.core.instrument.binder.system.DiskSpaceMetrics;
import java.io.File;
Expand All @@ -20,11 +18,9 @@
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@Slf4j
public class AggregatorBootstrap extends Bootstrap implements Measurable {
public class AggregatorBootstrap extends Bootstrap implements MeterBinder {

private static final Consumed<String, Metric> SERDES =
Consumed.with(Serdes.String(), ProtobufSerdes.Metric()).withTimestampExtractor(new MetricTimeExtractor());
Expand Down Expand Up @@ -59,10 +55,13 @@ public void bindTo(final MeterRegistry registry) {
final DiskSpaceMetrics diskSpaceMetrics = new DiskSpaceMetrics(new File(kafkaStreamsConfigMap.getStateDir()));
diskSpaceMetrics.bindTo(registry);

final ReadOnlyKeyValueStore<String, MetricAverage> store = null;
kafkaStreams.store(
StoreQueryParameters.fromNameAndType("latency-metrics", QueryableStoreTypes.keyValueStore()));
final LatencyMetricExporter latencyMetricExporter = new LatencyMetricExporter(store);
latencyMetricExporter.bindTo(registry);
// kafkaStreams.metadataForAllStreamsClients().stream().flatMap(streamsMetadata ->
// streamsMetadata.stateStoreNames().stream())
// .forEach(s -> log.error(s));

// final ReadOnlyKeyValueStore<String, MetricAverage> store = kafkaStreams.store(
// StoreQueryParameters.fromNameAndType("latency-metrics", QueryableStoreTypes.keyValueStore()));
// final LatencyMetricExporter latencyMetricExporter = new LatencyMetricExporter(store);
// latencyMetricExporter.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;

@Slf4j
public class TaskRunLatencyTopology {
Expand All @@ -18,19 +19,25 @@ public TaskRunLatencyTopology(final KStream<String, Metric> metricStream) {
metricStream
.filter((key, value) -> value.hasTaskRunLatency())
.groupByKey()
// this window resets the agregator every minute
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)))
.aggregate(
() -> MetricAverage.newBuilder().build(),
(key, value, aggregate) -> aggregate(value, aggregate),
Named.as("test"),
Materialized.with(Serdes.String(), ProtobufSerdes.MetricAverage()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
Materialized.<String, MetricAverage, WindowStore<Bytes, byte[]>>as("latency-metrics-windows")
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.MetricAverage()))
// we do not suppress because we want to have the value inmediatally
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.peek((key, value) -> log.debug(
"server={}, count={}, sum={}, avg={}", key, value.getCount(), value.getSum(), value.getAvg()))
.toTable(Materialized.<String, MetricAverage, KeyValueStore<Bytes, byte[]>>as("latency-metrics")
.with(Serdes.String(), ProtobufSerdes.MetricAverage()));
// we create a table to get the last latency for every lh server
.toTable(
Named.as("latency-metrics"),
Materialized.<String, MetricAverage, KeyValueStore<Bytes, byte[]>>as("latency-metrics")
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.MetricAverage()));
}

private static MetricAverage aggregate(final Metric value, final MetricAverage aggregate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import io.littlehorse.canary.Bootstrap;
import io.littlehorse.canary.CanaryException;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.prometheus.Measurable;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -15,7 +15,7 @@
import org.apache.kafka.common.errors.TopicExistsException;

@Slf4j
public class KafkaTopicBootstrap extends Bootstrap implements Measurable {
public class KafkaTopicBootstrap extends Bootstrap implements MeterBinder {

private final AdminClient adminClient;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.littlehorse.canary.kafka;

import io.littlehorse.canary.CanaryException;
import io.littlehorse.canary.prometheus.Measurable;
import io.littlehorse.canary.proto.Metric;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -17,7 +17,7 @@
import org.apache.kafka.common.utils.Bytes;

@Slf4j
public class MetricsEmitter implements Measurable {
public class MetricsEmitter implements MeterBinder {

private final Producer<String, Bytes> producer;
private final String topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import io.littlehorse.canary.Bootstrap;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.kafka.MetricsEmitter;
import io.littlehorse.canary.prometheus.Measurable;
import io.littlehorse.canary.util.Shutdown;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub;
import io.littlehorse.sdk.common.proto.ServerVersionResponse;
import io.littlehorse.sdk.worker.LHTaskWorker;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MetronomeBootstrap extends Bootstrap implements Measurable {
public class MetronomeBootstrap extends Bootstrap implements MeterBinder {

private final MetricsEmitter emitter;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.littlehorse.canary.Bootstrap;
import io.littlehorse.canary.config.CanaryConfig;
import io.littlehorse.canary.util.Shutdown;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
Expand All @@ -12,17 +13,23 @@
public class PrometheusExporterBootstrap extends Bootstrap {

private final PrometheusMeterRegistry prometheusRegistry;
private final PrometheusExporterServer prometheusExporterServer;

public PrometheusExporterBootstrap(final CanaryConfig config) {
super(config);

prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
prometheusRegistry = initializeRegistry();

new PrometheusExporterServer(config.getMetricsPort(), config.getMetricsPath(), prometheusRegistry);
}

private PrometheusMeterRegistry initializeRegistry() {
final PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
Shutdown.addShutdownHook("Prometheus Exporter", prometheusRegistry::close);

prometheusRegistry.config().commonTags("application_id", config.getId());
if (config.isMetricsFilterEnabled()) {
prometheusRegistry.config().meterFilter(new PrometheusMetricFilter(config.getEnabledMetrics()));
}
Shutdown.addShutdownHook("Prometheus Exporter", prometheusRegistry::close);

final JvmMemoryMetrics jvmMeter = new JvmMemoryMetrics();
jvmMeter.bindTo(prometheusRegistry);
Expand All @@ -33,11 +40,10 @@ public PrometheusExporterBootstrap(final CanaryConfig config) {
final ProcessorMetrics processorMetrics = new ProcessorMetrics();
processorMetrics.bindTo(prometheusRegistry);

prometheusExporterServer =
new PrometheusExporterServer(config.getMetricsPort(), config.getMetricsPath(), prometheusRegistry);
return prometheusRegistry;
}

public void addMesurable(final Measurable measurable) {
public void addMesurable(final MeterBinder measurable) {
measurable.bindTo(prometheusRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,79 @@
package io.littlehorse.canary.aggregator.topology;

import static org.junit.jupiter.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;

import org.apache.kafka.streams.TopologyTestDriver;
import com.google.protobuf.util.Timestamps;
import io.littlehorse.canary.aggregator.internal.MetricTimeExtractor;
import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes;
import io.littlehorse.canary.proto.Metadata;
import io.littlehorse.canary.proto.Metric;
import io.littlehorse.canary.proto.MetricAverage;
import io.littlehorse.canary.proto.TaskRunLatency;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class TaskRunLatencyTopologyTest {
TopologyTestDriver testDriver;

private TopologyTestDriver testDriver;
private TestInputTopic<String, Metric> inputTopic;

@BeforeEach
void beforeEach() throws IOException {
String topicName = "canary-metric-beats";

Consumed<String, Metric> serdes = Consumed.with(Serdes.String(), ProtobufSerdes.Metric())
.withTimestampExtractor(new MetricTimeExtractor());

StreamsBuilder builder = new StreamsBuilder();
new TaskRunLatencyTopology(builder.stream(topicName, serdes));
Topology topology = builder.build();

Properties properties = new Properties();
properties.put(
StreamsConfig.STATE_DIR_CONFIG,
Files.createTempDirectory("canaryStreamUnitTest").toString());

testDriver = new TopologyTestDriver(topology, properties);
inputTopic = testDriver.createInputTopic(
topicName, Serdes.String().serializer(), ProtobufSerdes.Metric().serializer());
}

@AfterEach
void afterEach() {
testDriver.close();
}

@Test
void calculateLatency() {
String key = "localhost:2023";
Metric metric1 = buildLatencyMetric(20);
Metric metric2 = buildLatencyMetric(40);

inputTopic.pipeInput(key, metric1);
inputTopic.pipeInput(key, metric2);

KeyValueStore store = testDriver.getKeyValueStore("latency-metrics");

assertThat(store.get(key))
.isEqualTo(MetricAverage.newBuilder()
.setSum(60)
.setAvg(30)
.setCount(2)
.build());
}

private static Metric buildLatencyMetric(int latency) {
return Metric.newBuilder()
.setMetadata(Metadata.newBuilder().setTime(Timestamps.now()))
.setTaskRunLatency(TaskRunLatency.newBuilder().setLatency(latency))
.build();
}
}

0 comments on commit f790858

Please sign in to comment.