Skip to content

Commit

Permalink
Merge branch 'master' into dashboard-scheduledwfruns
Browse files Browse the repository at this point in the history
  • Loading branch information
HazimAr authored Dec 17, 2024
2 parents 768cca3 + 0486f18 commit ae5a552
Show file tree
Hide file tree
Showing 27 changed files with 416 additions and 198 deletions.
2 changes: 1 addition & 1 deletion canary/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
implementation 'io.javalin:javalin-micrometer:6.3.0'

// Prometheus
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.2'
implementation 'io.micrometer:micrometer-registry-prometheus:1.14.2'

// RocksDB
implementation 'org.rocksdb:rocksdbjni:9.0.0'
Expand Down
1 change: 1 addition & 0 deletions canary/canary.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ lh.canary.metronome.frequency.ms=1000
lh.canary.metronome.run.threads=1
lh.canary.metronome.run.requests=300
lh.canary.metronome.run.sample.rate=1
lh.canary.metronome.server.id=lh
1 change: 1 addition & 0 deletions canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private static void initialize(final String[] args) throws IOException {
lhConfig.getApiBootstrapHost(),
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
canaryConfig.getMetronomeServerId(),
canaryConfig.getTopicName(),
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getMetronomeBeatExtraTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public Aggregator(

@Override
public void bindTo(final MeterRegistry registry) {
final MetricStoreExporter prometheusMetricStoreExporter = new MetricStoreExporter(kafkaStreams, METRICS_STORE);
final MetricStoreExporter prometheusMetricStoreExporter =
new MetricStoreExporter(kafkaStreams, METRICS_STORE, Duration.ofSeconds(30));
prometheusMetricStoreExporter.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -23,22 +24,25 @@
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@Slf4j
public class MetricStoreExporter implements MeterBinder {
public class MetricStoreExporter implements MeterBinder, AutoCloseable {

private final KafkaStreams kafkaStreams;
private final String storeName;
private final Map<MetricKey, PrometheusMetric> currentMeters;
private final Map<MetricKey, PrometheusMetric> currentMeters = new HashMap<>();
private final Duration refreshPeriod;
private ScheduledExecutorService mainExecutor;

public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName) {
public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName, final Duration refreshPeriod) {
this.kafkaStreams = kafkaStreams;
this.storeName = storeName;
currentMeters = new HashMap<>();
this.refreshPeriod = refreshPeriod;
}

private static List<Tag> toTags(final MetricKey key) {
final List<Tag> tags = new ArrayList<>();
tags.add(Tag.of("server", "%s:%s".formatted(key.getServerHost(), key.getServerPort())));
tags.add(Tag.of("server_version", key.getServerVersion()));
tags.add(Tag.of("server_id", key.getServerId()));
tags.addAll(key.getTagsList().stream()
.map(tag -> Tag.of(tag.getKey(), tag.getValue()))
.toList());
Expand All @@ -47,12 +51,15 @@ private static List<Tag> toTags(final MetricKey key) {

@Override
public void bindTo(final MeterRegistry registry) {
final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor();
ShutdownHook.add("Latency Metrics Exporter", () -> {
mainExecutor.shutdownNow();
mainExecutor.awaitTermination(1, TimeUnit.SECONDS);
});
mainExecutor.scheduleAtFixedRate(() -> updateMetrics(registry), 30, 30, TimeUnit.SECONDS);
mainExecutor = Executors.newSingleThreadScheduledExecutor();
ShutdownHook.add("Latency Metrics Exporter", this);
mainExecutor.scheduleAtFixedRate(
() -> updateMetrics(registry), 0, refreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}

public void close() throws InterruptedException {
mainExecutor.shutdownNow();
mainExecutor.awaitTermination(1, TimeUnit.SECONDS);
}

private void updateMetrics(final MeterRegistry registry) {
Expand All @@ -72,7 +79,6 @@ private void updateMetrics(final MeterRegistry registry) {
while (records.hasNext()) {
final KeyValue<MetricKey, MetricValue> record = records.next();
foundMetrics.add(record.key);

final PrometheusMetric current = currentMeters.get(record.key);
if (current == null) {
final AtomicDouble newMeter = new AtomicDouble(record.value.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) {
.setServerVersion(key.getServerVersion())
.setServerPort(key.getServerPort())
.setServerHost(key.getServerHost())
.setServerId(key.getServerId())
.setId("canary_%s".formatted(id));

if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) {
Expand Down Expand Up @@ -204,6 +205,7 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.setServerId(key.getServerId())
.addAllTags(key.getTagsList())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class CanaryConfig implements Config {
public static final String METRONOME_GET_RETRIES = "metronome.get.retries";
public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable";
public static final String METRONOME_DATA_PATH = "metronome.data.path";
public static final String METRONOME_SERVER_ID = "metronome.server.id";
public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags";
public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS);

Expand Down Expand Up @@ -71,7 +72,11 @@ public KafkaConfig toKafkaConfig() {
}

private String getConfig(final String configName) {
return configs.get(configName).toString();
final Object value = configs.get(configName);
if (value == null) {
throw new IllegalArgumentException("Configuration 'lh.canary." + configName + "' not found");
}
return value.toString();
}

public String getTopicName() {
Expand Down Expand Up @@ -181,4 +186,8 @@ public int getWorkflowVersion() {
public String getMetronomeDataPath() {
return getConfig(METRONOME_DATA_PATH);
}

public String getMetronomeServerId() {
return getConfig(METRONOME_SERVER_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ public class BeatProducer {
private final int lhServerPort;
private final String lhServerVersion;
private final String topicName;
private final String lhServerId;

public BeatProducer(
final String lhServerHost,
final int lhServerPort,
final String lhServerVersion,
final String lhServerId,
final String topicName,
final Map<String, Object> producerConfig,
final Map<String, String> extraTags) {
this.lhServerHost = lhServerHost;
this.lhServerPort = lhServerPort;
this.lhServerVersion = lhServerVersion;
this.lhServerId = lhServerId;
this.topicName = topicName;
this.extraTags = extraTags;

Expand Down Expand Up @@ -89,6 +92,7 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat
.setServerHost(lhServerHost)
.setServerPort(lhServerPort)
.setServerVersion(lhServerVersion)
.setServerId(lhServerId)
.setId(id)
.setType(type);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.littlehorse.canary.util.ShutdownHook;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheusmetrics.PrometheusConfig;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
import java.util.Map;

public class PrometheusExporter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.littlehorse.canary.prometheus;

import io.javalin.Javalin;
import io.javalin.http.ContentType;
import io.javalin.http.Context;
import io.littlehorse.canary.util.ShutdownHook;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -22,6 +22,6 @@ public PrometheusServerExporter(

private void printMetrics(final Context context) {
log.trace("Processing metrics request");
context.contentType(TextFormat.CONTENT_TYPE_004).result(prometheusExporter.scrape());
context.contentType(ContentType.PLAIN).result(prometheusExporter.scrape());
}
}
1 change: 1 addition & 0 deletions canary/src/main/proto/beats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message BeatKey {
optional string status = 5;
optional string id = 6;
repeated Tag tags = 7;
string server_id = 8;
}

message BeatValue {
Expand Down
1 change: 1 addition & 0 deletions canary/src/main/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message MetricKey {
string server_version = 3;
string id = 4;
repeated Tag tags = 5;
string server_id = 6;
}

message MetricValue {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.littlehorse.canary.aggregator.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import io.littlehorse.canary.proto.MetricKey;
import io.littlehorse.canary.proto.MetricValue;
import io.littlehorse.canary.proto.Tag;
import io.micrometer.prometheusmetrics.PrometheusConfig;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class MetricStoreExporterTest {

public static final String TEST_STORAGE = "testStorage";
public static final String HOST = "localhost";

@Mock
KafkaStreams kafkaStreams;

@Mock
ReadOnlyKeyValueStore<MetricKey, MetricValue> store;

@Mock
KeyValueIterator<MetricKey, MetricValue> records;

PrometheusMeterRegistry prometheusRegistry;
MetricStoreExporter metricExporter;

@BeforeEach
void setUp() {
metricExporter = new MetricStoreExporter(kafkaStreams, TEST_STORAGE, Duration.ofSeconds(10));
prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
}

@AfterEach
void tearDown() throws InterruptedException {
metricExporter.close();
prometheusRegistry.close();
}

@Test
public void shouldScrapeSimpleMetric() throws InterruptedException {
// metrics
MetricKey key = createMetricsKey(List.of(
Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build()));
MetricKey key2 = createMetricsKey(List.of());
MetricValue value = MetricValue.newBuilder().setValue(1.0).build();

// records
when(records.hasNext()).thenReturn(true, false);
when(records.next()).thenReturn(KeyValue.pair(key, value));
doNothing().when(records).close();

// store
when(store.all()).thenReturn(records);

// kafka streams
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING);
when(kafkaStreams.store(any())).thenReturn(store);

metricExporter.bindTo(prometheusRegistry);

Thread.sleep(500);

assertThat(prometheusRegistry.scrape())
.contains(
"my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0");
}

private static MetricKey createMetricsKey(List<Tag> tags) {
return createMetricsKey(HOST, tags);
}

private static MetricKey createMetricsKey(String host, List<Tag> tags) {
return MetricKey.newBuilder()
.setServerHost(host)
.setServerPort(2023)
.setServerVersion("test")
.setId("my_metric")
.setServerId("my_server")
.addAllTags(tags)
.build();
}

@Test
void printMetricsWithTwoDifferentServers() throws InterruptedException {
// metrics
List<Tag> tags = List.of(
Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build());
MetricKey key1 = createMetricsKey(tags);
MetricKey key2 = createMetricsKey("localhost2", tags);
MetricValue value = MetricValue.newBuilder().setValue(1.0).build();

// records
when(records.hasNext()).thenReturn(true, true, false);
when(records.next()).thenReturn(KeyValue.pair(key1, value), KeyValue.pair(key2, value));
doNothing().when(records).close();

// store
when(store.all()).thenReturn(records);

// kafka streams
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING);
when(kafkaStreams.store(any())).thenReturn(store);

metricExporter.bindTo(prometheusRegistry);

Thread.sleep(500);
System.out.printf(prometheusRegistry.scrape());
assertThat(prometheusRegistry.scrape())
.isEqualTo(
"# HELP my_metric \n" + "# TYPE my_metric gauge\n"
+ "my_metric{custom_tag=\"custom_value\",server=\"localhost2:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n"
+ "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n");
}
}
Loading

0 comments on commit ae5a552

Please sign in to comment.