diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java index 11d1924ed..4fc00e2b4 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java @@ -31,7 +31,8 @@ public Aggregator( @Override public void bindTo(final MeterRegistry registry) { - final MetricStoreExporter prometheusMetricStoreExporter = new MetricStoreExporter(kafkaStreams, METRICS_STORE); + final MetricStoreExporter prometheusMetricStoreExporter = + new MetricStoreExporter(kafkaStreams, METRICS_STORE, Duration.ofSeconds(30)); prometheusMetricStoreExporter.bindTo(registry); } } diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java index 842a19e21..c1c4f9d19 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java @@ -9,6 +9,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.MeterBinder; +import java.time.Duration; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,16 +24,18 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @Slf4j -public class MetricStoreExporter implements MeterBinder { +public class MetricStoreExporter implements MeterBinder, AutoCloseable { private final KafkaStreams kafkaStreams; private final String storeName; - private final Map currentMeters; + private final Map currentMeters = new HashMap<>(); + private final Duration refreshPeriod; + private ScheduledExecutorService mainExecutor; - public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName) { + public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName, final Duration refreshPeriod) { this.kafkaStreams = kafkaStreams; this.storeName = storeName; - currentMeters = new HashMap<>(); + this.refreshPeriod = refreshPeriod; } private static List toTags(final MetricKey key) { @@ -47,12 +50,15 @@ private static List toTags(final MetricKey key) { @Override public void bindTo(final MeterRegistry registry) { - final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(); - ShutdownHook.add("Latency Metrics Exporter", () -> { - mainExecutor.shutdownNow(); - mainExecutor.awaitTermination(1, TimeUnit.SECONDS); - }); - mainExecutor.scheduleAtFixedRate(() -> updateMetrics(registry), 30, 30, TimeUnit.SECONDS); + mainExecutor = Executors.newSingleThreadScheduledExecutor(); + ShutdownHook.add("Latency Metrics Exporter", this); + mainExecutor.scheduleAtFixedRate( + () -> updateMetrics(registry), 0, refreshPeriod.toMillis(), TimeUnit.MILLISECONDS); + } + + public void close() throws InterruptedException { + mainExecutor.shutdownNow(); + mainExecutor.awaitTermination(1, TimeUnit.SECONDS); } private void updateMetrics(final MeterRegistry registry) { diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java new file mode 100644 index 000000000..2dede7718 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java @@ -0,0 +1,90 @@ +package io.littlehorse.canary.aggregator.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import io.littlehorse.canary.proto.MetricKey; +import io.littlehorse.canary.proto.MetricValue; +import io.littlehorse.canary.proto.Tag; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.time.Duration; +import java.util.List; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricStoreExporterTest { + + public static final String TEST_STORAGE = "testStorage"; + public static final String HOST = "localhost"; + + @Mock + KafkaStreams kafkaStreams; + + @Mock + ReadOnlyKeyValueStore store; + + @Mock + KeyValueIterator records; + + PrometheusMeterRegistry prometheusRegistry; + MetricStoreExporter metricExporter; + + @BeforeEach + void setUp() { + metricExporter = new MetricStoreExporter(kafkaStreams, TEST_STORAGE, Duration.ofSeconds(10)); + prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + } + + @AfterEach + void tearDown() throws InterruptedException { + metricExporter.close(); + prometheusRegistry.close(); + } + + @Test + public void shouldScrapeSimpleMetric() throws InterruptedException { + // metrics + List tags = List.of( + Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build()); + MetricKey key = MetricKey.newBuilder() + .setServerHost(HOST) + .setServerPort(2023) + .setServerVersion("test") + .setId("my_metric") + .addAllTags(tags) + .build(); + MetricValue value = MetricValue.newBuilder().setValue(1.0).build(); + + // records + when(records.hasNext()).thenReturn(true, false); + when(records.next()).thenReturn(KeyValue.pair(key, value)); + doNothing().when(records).close(); + + // store + when(store.all()).thenReturn(records); + + // kafka streams + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + when(kafkaStreams.store(any())).thenReturn(store); + + metricExporter.bindTo(prometheusRegistry); + + Thread.sleep(500); + + assertThat(prometheusRegistry.scrape()) + .contains( + "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\",} 1.0"); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java index afb90b3ed..0865547b3 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java @@ -180,10 +180,22 @@ void includeBeatTagsIntoMetrics() { Map expectedTags = Map.of("my_tag", "value"); inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok", expectedTags)); + inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok")); - assertThat(getCount()).isEqualTo(3); + assertThat(getCount()).isEqualTo(6); assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok", expectedTags))) .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok", expectedTags))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok", expectedTags))) + .isEqualTo(newMetricValue(1.)); + + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok"))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok"))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok"))) + .isEqualTo(newMetricValue(1.)); } @Test