Skip to content

Commit

Permalink
test: add unit test to prom metric exporter
Browse files Browse the repository at this point in the history
Co-authored-by: Mijail Rondon <[email protected]>
  • Loading branch information
sauljabin and mijailr committed Dec 16, 2024
1 parent fd8bee4 commit 6b78aee
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public Aggregator(

@Override
public void bindTo(final MeterRegistry registry) {
final MetricStoreExporter prometheusMetricStoreExporter = new MetricStoreExporter(kafkaStreams, METRICS_STORE);
final MetricStoreExporter prometheusMetricStoreExporter =
new MetricStoreExporter(kafkaStreams, METRICS_STORE, Duration.ofSeconds(30));
prometheusMetricStoreExporter.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -23,16 +24,18 @@
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@Slf4j
public class MetricStoreExporter implements MeterBinder {
public class MetricStoreExporter implements MeterBinder, AutoCloseable {

private final KafkaStreams kafkaStreams;
private final String storeName;
private final Map<MetricKey, PrometheusMetric> currentMeters;
private final Map<MetricKey, PrometheusMetric> currentMeters = new HashMap<>();
private final Duration refreshPeriod;
private ScheduledExecutorService mainExecutor;

public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName) {
public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName, final Duration refreshPeriod) {
this.kafkaStreams = kafkaStreams;
this.storeName = storeName;
currentMeters = new HashMap<>();
this.refreshPeriod = refreshPeriod;
}

private static List<Tag> toTags(final MetricKey key) {
Expand All @@ -47,12 +50,15 @@ private static List<Tag> toTags(final MetricKey key) {

@Override
public void bindTo(final MeterRegistry registry) {
final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor();
ShutdownHook.add("Latency Metrics Exporter", () -> {
mainExecutor.shutdownNow();
mainExecutor.awaitTermination(1, TimeUnit.SECONDS);
});
mainExecutor.scheduleAtFixedRate(() -> updateMetrics(registry), 30, 30, TimeUnit.SECONDS);
mainExecutor = Executors.newSingleThreadScheduledExecutor();
ShutdownHook.add("Latency Metrics Exporter", this);
mainExecutor.scheduleAtFixedRate(
() -> updateMetrics(registry), 0, refreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}

public void close() throws InterruptedException {
mainExecutor.shutdownNow();
mainExecutor.awaitTermination(1, TimeUnit.SECONDS);
}

private void updateMetrics(final MeterRegistry registry) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.littlehorse.canary.aggregator.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import io.littlehorse.canary.proto.MetricKey;
import io.littlehorse.canary.proto.MetricValue;
import io.littlehorse.canary.proto.Tag;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class MetricStoreExporterTest {

public static final String TEST_STORAGE = "testStorage";
public static final String HOST = "localhost";

@Mock
KafkaStreams kafkaStreams;

@Mock
ReadOnlyKeyValueStore<MetricKey, MetricValue> store;

@Mock
KeyValueIterator<MetricKey, MetricValue> records;

PrometheusMeterRegistry prometheusRegistry;
MetricStoreExporter metricExporter;

@BeforeEach
void setUp() {
metricExporter = new MetricStoreExporter(kafkaStreams, TEST_STORAGE, Duration.ofSeconds(10));
prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
}

@AfterEach
void tearDown() throws InterruptedException {
metricExporter.close();
prometheusRegistry.close();
}

@Test
public void shouldScrapeSimpleMetric() throws InterruptedException {
// metrics
List<Tag> tags = List.of(
Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build());
MetricKey key = MetricKey.newBuilder()
.setServerHost(HOST)
.setServerPort(2023)
.setServerVersion("test")
.setId("my_metric")
.addAllTags(tags)
.build();
MetricValue value = MetricValue.newBuilder().setValue(1.0).build();

// records
when(records.hasNext()).thenReturn(true, false);
when(records.next()).thenReturn(KeyValue.pair(key, value));
doNothing().when(records).close();

// store
when(store.all()).thenReturn(records);

// kafka streams
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING);
when(kafkaStreams.store(any())).thenReturn(store);

metricExporter.bindTo(prometheusRegistry);

Thread.sleep(500);

assertThat(prometheusRegistry.scrape())
.contains(
"my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\",} 1.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,22 @@ void includeBeatTagsIntoMetrics() {
Map<String, String> expectedTags = Map.of("my_tag", "value");

inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok", expectedTags));
inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok"));

assertThat(getCount()).isEqualTo(3);
assertThat(getCount()).isEqualTo(6);
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok", expectedTags)))
.isEqualTo(newMetricValue(20.));
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok", expectedTags)))
.isEqualTo(newMetricValue(20.));
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok", expectedTags)))
.isEqualTo(newMetricValue(1.));

assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok")))
.isEqualTo(newMetricValue(20.));
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok")))
.isEqualTo(newMetricValue(20.));
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok")))
.isEqualTo(newMetricValue(1.));
}

@Test
Expand Down

0 comments on commit 6b78aee

Please sign in to comment.