diff --git a/canary/build.gradle b/canary/build.gradle index 830220269..d15cd4077 100644 --- a/canary/build.gradle +++ b/canary/build.gradle @@ -35,7 +35,7 @@ dependencies { implementation 'io.javalin:javalin-micrometer:6.3.0' // Prometheus - implementation 'io.micrometer:micrometer-registry-prometheus:1.12.2' + implementation 'io.micrometer:micrometer-registry-prometheus:1.14.2' // RocksDB implementation 'org.rocksdb:rocksdbjni:9.0.0' diff --git a/canary/canary.properties b/canary/canary.properties index 64d6086db..f08680df3 100644 --- a/canary/canary.properties +++ b/canary/canary.properties @@ -14,3 +14,4 @@ lh.canary.metronome.frequency.ms=1000 lh.canary.metronome.run.threads=1 lh.canary.metronome.run.requests=300 lh.canary.metronome.run.sample.rate=1 +lh.canary.metronome.server.id=lh diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java index 55085d54c..c79188837 100644 --- a/canary/src/main/java/io/littlehorse/canary/Main.java +++ b/canary/src/main/java/io/littlehorse/canary/Main.java @@ -67,6 +67,7 @@ private static void initialize(final String[] args) throws IOException { lhConfig.getApiBootstrapHost(), lhConfig.getApiBootstrapPort(), lhClient.getServerVersion(), + canaryConfig.getMetronomeServerId(), canaryConfig.getTopicName(), canaryConfig.toKafkaConfig().toMap(), canaryConfig.getMetronomeBeatExtraTags()); diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java index 11d1924ed..4fc00e2b4 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/Aggregator.java @@ -31,7 +31,8 @@ public Aggregator( @Override public void bindTo(final MeterRegistry registry) { - final MetricStoreExporter prometheusMetricStoreExporter = new MetricStoreExporter(kafkaStreams, METRICS_STORE); + final MetricStoreExporter prometheusMetricStoreExporter = + new MetricStoreExporter(kafkaStreams, METRICS_STORE, Duration.ofSeconds(30)); prometheusMetricStoreExporter.bindTo(registry); } } diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java index 842a19e21..a9cef273e 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java @@ -9,6 +9,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.MeterBinder; +import java.time.Duration; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,22 +24,25 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @Slf4j -public class MetricStoreExporter implements MeterBinder { +public class MetricStoreExporter implements MeterBinder, AutoCloseable { private final KafkaStreams kafkaStreams; private final String storeName; - private final Map currentMeters; + private final Map currentMeters = new HashMap<>(); + private final Duration refreshPeriod; + private ScheduledExecutorService mainExecutor; - public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName) { + public MetricStoreExporter(final KafkaStreams kafkaStreams, final String storeName, final Duration refreshPeriod) { this.kafkaStreams = kafkaStreams; this.storeName = storeName; - currentMeters = new HashMap<>(); + this.refreshPeriod = refreshPeriod; } private static List toTags(final MetricKey key) { final List tags = new ArrayList<>(); tags.add(Tag.of("server", "%s:%s".formatted(key.getServerHost(), key.getServerPort()))); tags.add(Tag.of("server_version", key.getServerVersion())); + tags.add(Tag.of("server_id", key.getServerId())); tags.addAll(key.getTagsList().stream() .map(tag -> Tag.of(tag.getKey(), tag.getValue())) .toList()); @@ -47,12 +51,15 @@ private static List toTags(final MetricKey key) { @Override public void bindTo(final MeterRegistry registry) { - final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(); - ShutdownHook.add("Latency Metrics Exporter", () -> { - mainExecutor.shutdownNow(); - mainExecutor.awaitTermination(1, TimeUnit.SECONDS); - }); - mainExecutor.scheduleAtFixedRate(() -> updateMetrics(registry), 30, 30, TimeUnit.SECONDS); + mainExecutor = Executors.newSingleThreadScheduledExecutor(); + ShutdownHook.add("Latency Metrics Exporter", this); + mainExecutor.scheduleAtFixedRate( + () -> updateMetrics(registry), 0, refreshPeriod.toMillis(), TimeUnit.MILLISECONDS); + } + + public void close() throws InterruptedException { + mainExecutor.shutdownNow(); + mainExecutor.awaitTermination(1, TimeUnit.SECONDS); } private void updateMetrics(final MeterRegistry registry) { @@ -72,7 +79,6 @@ private void updateMetrics(final MeterRegistry registry) { while (records.hasNext()) { final KeyValue record = records.next(); foundMetrics.add(record.key); - final PrometheusMetric current = currentMeters.get(record.key); if (current == null) { final AtomicDouble newMeter = new AtomicDouble(record.value.getValue()); diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java index d89fa61e1..2f2b930ea 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java @@ -172,6 +172,7 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) { .setServerVersion(key.getServerVersion()) .setServerPort(key.getServerPort()) .setServerHost(key.getServerHost()) + .setServerId(key.getServerId()) .setId("canary_%s".formatted(id)); if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) { @@ -204,6 +205,7 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) { .setServerHost(key.getServerHost()) .setServerPort(key.getServerPort()) .setStatus(key.getStatus()) + .setServerId(key.getServerId()) .addAllTags(key.getTagsList()) .build(); } diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java index f70e85d12..4607d26a1 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java +++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java @@ -32,6 +32,7 @@ public class CanaryConfig implements Config { public static final String METRONOME_GET_RETRIES = "metronome.get.retries"; public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable"; public static final String METRONOME_DATA_PATH = "metronome.data.path"; + public static final String METRONOME_SERVER_ID = "metronome.server.id"; public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags"; public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS); @@ -71,7 +72,11 @@ public KafkaConfig toKafkaConfig() { } private String getConfig(final String configName) { - return configs.get(configName).toString(); + final Object value = configs.get(configName); + if (value == null) { + throw new IllegalArgumentException("Configuration 'lh.canary." + configName + "' not found"); + } + return value.toString(); } public String getTopicName() { @@ -181,4 +186,8 @@ public int getWorkflowVersion() { public String getMetronomeDataPath() { return getConfig(METRONOME_DATA_PATH); } + + public String getMetronomeServerId() { + return getConfig(METRONOME_SERVER_ID); + } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java index 96671335c..4520a102f 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java @@ -25,17 +25,20 @@ public class BeatProducer { private final int lhServerPort; private final String lhServerVersion; private final String topicName; + private final String lhServerId; public BeatProducer( final String lhServerHost, final int lhServerPort, final String lhServerVersion, + final String lhServerId, final String topicName, final Map producerConfig, final Map extraTags) { this.lhServerHost = lhServerHost; this.lhServerPort = lhServerPort; this.lhServerVersion = lhServerVersion; + this.lhServerId = lhServerId; this.topicName = topicName; this.extraTags = extraTags; @@ -89,6 +92,7 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat .setServerHost(lhServerHost) .setServerPort(lhServerPort) .setServerVersion(lhServerVersion) + .setServerId(lhServerId) .setId(id) .setType(type); diff --git a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java index d2f055580..4e8e4707e 100644 --- a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusExporter.java @@ -3,8 +3,8 @@ import io.littlehorse.canary.util.ShutdownHook; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.MeterBinder; -import io.micrometer.prometheus.PrometheusConfig; -import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.micrometer.prometheusmetrics.PrometheusConfig; +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry; import java.util.Map; public class PrometheusExporter { diff --git a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java index 1bb6f18c8..740e8e613 100644 --- a/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/prometheus/PrometheusServerExporter.java @@ -1,9 +1,9 @@ package io.littlehorse.canary.prometheus; import io.javalin.Javalin; +import io.javalin.http.ContentType; import io.javalin.http.Context; import io.littlehorse.canary.util.ShutdownHook; -import io.prometheus.client.exporter.common.TextFormat; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -22,6 +22,6 @@ public PrometheusServerExporter( private void printMetrics(final Context context) { log.trace("Processing metrics request"); - context.contentType(TextFormat.CONTENT_TYPE_004).result(prometheusExporter.scrape()); + context.contentType(ContentType.PLAIN).result(prometheusExporter.scrape()); } } diff --git a/canary/src/main/proto/beats.proto b/canary/src/main/proto/beats.proto index b0a2e7261..043ed4042 100644 --- a/canary/src/main/proto/beats.proto +++ b/canary/src/main/proto/beats.proto @@ -27,6 +27,7 @@ message BeatKey { optional string status = 5; optional string id = 6; repeated Tag tags = 7; + string server_id = 8; } message BeatValue { diff --git a/canary/src/main/proto/metrics.proto b/canary/src/main/proto/metrics.proto index 90abbe8fc..276da2d81 100644 --- a/canary/src/main/proto/metrics.proto +++ b/canary/src/main/proto/metrics.proto @@ -12,6 +12,7 @@ message MetricKey { string server_version = 3; string id = 4; repeated Tag tags = 5; + string server_id = 6; } message MetricValue { diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java new file mode 100644 index 000000000..8f20872d0 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java @@ -0,0 +1,131 @@ +package io.littlehorse.canary.aggregator.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import io.littlehorse.canary.proto.MetricKey; +import io.littlehorse.canary.proto.MetricValue; +import io.littlehorse.canary.proto.Tag; +import io.micrometer.prometheusmetrics.PrometheusConfig; +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry; +import java.time.Duration; +import java.util.List; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricStoreExporterTest { + + public static final String TEST_STORAGE = "testStorage"; + public static final String HOST = "localhost"; + + @Mock + KafkaStreams kafkaStreams; + + @Mock + ReadOnlyKeyValueStore store; + + @Mock + KeyValueIterator records; + + PrometheusMeterRegistry prometheusRegistry; + MetricStoreExporter metricExporter; + + @BeforeEach + void setUp() { + metricExporter = new MetricStoreExporter(kafkaStreams, TEST_STORAGE, Duration.ofSeconds(10)); + prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + } + + @AfterEach + void tearDown() throws InterruptedException { + metricExporter.close(); + prometheusRegistry.close(); + } + + @Test + public void shouldScrapeSimpleMetric() throws InterruptedException { + // metrics + MetricKey key = createMetricsKey(List.of( + Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build())); + MetricKey key2 = createMetricsKey(List.of()); + MetricValue value = MetricValue.newBuilder().setValue(1.0).build(); + + // records + when(records.hasNext()).thenReturn(true, false); + when(records.next()).thenReturn(KeyValue.pair(key, value)); + doNothing().when(records).close(); + + // store + when(store.all()).thenReturn(records); + + // kafka streams + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + when(kafkaStreams.store(any())).thenReturn(store); + + metricExporter.bindTo(prometheusRegistry); + + Thread.sleep(500); + + assertThat(prometheusRegistry.scrape()) + .contains( + "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0"); + } + + private static MetricKey createMetricsKey(List tags) { + return createMetricsKey(HOST, tags); + } + + private static MetricKey createMetricsKey(String host, List tags) { + return MetricKey.newBuilder() + .setServerHost(host) + .setServerPort(2023) + .setServerVersion("test") + .setId("my_metric") + .setServerId("my_server") + .addAllTags(tags) + .build(); + } + + @Test + void printMetricsWithTwoDifferentServers() throws InterruptedException { + // metrics + List tags = List.of( + Tag.newBuilder().setKey("custom_tag").setValue("custom_value").build()); + MetricKey key1 = createMetricsKey(tags); + MetricKey key2 = createMetricsKey("localhost2", tags); + MetricValue value = MetricValue.newBuilder().setValue(1.0).build(); + + // records + when(records.hasNext()).thenReturn(true, true, false); + when(records.next()).thenReturn(KeyValue.pair(key1, value), KeyValue.pair(key2, value)); + doNothing().when(records).close(); + + // store + when(store.all()).thenReturn(records); + + // kafka streams + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + when(kafkaStreams.store(any())).thenReturn(store); + + metricExporter.bindTo(prometheusRegistry); + + Thread.sleep(500); + System.out.printf(prometheusRegistry.scrape()); + assertThat(prometheusRegistry.scrape()) + .isEqualTo( + "# HELP my_metric \n" + "# TYPE my_metric gauge\n" + + "my_metric{custom_tag=\"custom_value\",server=\"localhost2:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n" + + "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n"); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java index afb90b3ed..752198d7e 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java @@ -30,6 +30,7 @@ class MetricsTopologyTest { public static final String HOST_2 = "localhost2"; public static final int PORT_2 = 2024; + public static final String SERVER_ID = "LH"; private TopologyTestDriver testDriver; private TestInputTopic inputTopic; @@ -60,8 +61,11 @@ private static MetricKey newMetricKey(String id, String status, Map tags) { - MetricKey.Builder builder = - MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id); + MetricKey.Builder builder = MetricKey.newBuilder() + .setServerHost(host) + .setServerPort(port) + .setId(id) + .setServerId(SERVER_ID); if (status != null) { builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build()); @@ -105,6 +109,7 @@ private static TestRecord newBeat( .setServerHost(host) .setServerPort(port) .setType(type) + .setServerId(SERVER_ID) .setId(id); BeatValue.Builder valueBuilder = BeatValue.newBuilder().setTime(Timestamps.now()); @@ -180,10 +185,22 @@ void includeBeatTagsIntoMetrics() { Map expectedTags = Map.of("my_tag", "value"); inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok", expectedTags)); + inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok")); - assertThat(getCount()).isEqualTo(3); + assertThat(getCount()).isEqualTo(6); assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok", expectedTags))) .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok", expectedTags))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok", expectedTags))) + .isEqualTo(newMetricValue(1.)); + + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok"))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_max", "ok"))) + .isEqualTo(newMetricValue(20.)); + assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_count", "ok"))) + .isEqualTo(newMetricValue(1.)); } @Test diff --git a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java index 07a679da0..f34fa3fa5 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Map; import org.junit.jupiter.api.Test; @@ -65,4 +66,14 @@ void getEmptyMetronomeExtraTags() { assertThat(output).isEmpty(); assertThat(output).isNotNull(); } + + @Test + void throwsExceptionIfConfigurationIsNotFound() { + CanaryConfig canaryConfig = new CanaryConfig(Map.of()); + + IllegalArgumentException result = + assertThrows(IllegalArgumentException.class, canaryConfig::getMetronomeServerId); + + assertThat(result.getMessage()).isEqualTo("Configuration 'lh.canary.metronome.server.id' not found"); + } } diff --git a/docs/CANARY_CONFIGURATIONS.md b/docs/CANARY_CONFIGURATIONS.md index 8d7f167b8..8b906103e 100644 --- a/docs/CANARY_CONFIGURATIONS.md +++ b/docs/CANARY_CONFIGURATIONS.md @@ -15,6 +15,7 @@ * [`lh.canary.metronome.get.retries`](#lhcanarymetronomegetretries) * [`lh.canary.metronome.data.path`](#lhcanarymetronomedatapath) * [`lh.canary.metronome.beat.extra.tags.`](#lhcanarymetronomebeatextratagsadditional-tag) + * [`lh.canary.metronome.server.id`](#lhcanarymetronomeserverid) * [Kafka Configurations](#kafka-configurations) * [LH Client Configurations](#lh-client-configurations) * [Task Worker](#task-worker) @@ -166,6 +167,15 @@ For example: `lh.canary.metronome.beat.extra.tags.my_tag=my-value`. - **Default:** null - **Importance:** low +#### `lh.canary.metronome.server.id` + +Add the tag server id the prometheus metrics (**mandatory**). +For example: `lh.canary.metronome.server.id=lh`. + +- **Type:** string +- **Default:** null +- **Importance:** high + ### Kafka Configurations LH Canary supports all kafka configurations. Use the prefix `lh.canary.kafka` and append the kafka config. diff --git a/lhctl/internal/wf_run.go b/lhctl/internal/wf_run.go index 7042f680a..66e5831bd 100644 --- a/lhctl/internal/wf_run.go +++ b/lhctl/internal/wf_run.go @@ -230,7 +230,7 @@ var scheduleWfCmd = &cobra.Command{ // Now parse variables if len(args) > 2 { - if len(args)%2 != 1 { + if len(args)%2 == 1 { log.Fatal(` If you provide variables, you must provide pairs of . Therefore, you must have an even number of args after the WfSpec Id, for an @@ -245,7 +245,7 @@ odd total number of args. See 'lhctl run --help' for details.`) wfSpec, err = getGlobalClient(cmd).GetLatestWfSpec( requestContext(cmd), &lhproto.GetLatestWfSpecRequest{ - Name: args[0], + Name: wfSpecName, MajorVersion: majorVersion, }, ) @@ -253,7 +253,7 @@ odd total number of args. See 'lhctl run --help' for details.`) wfSpec, err = getGlobalClient(cmd).GetWfSpec( requestContext(cmd), &lhproto.WfSpecId{ - Name: args[0], + Name: wfSpecName, MajorVersion: *majorVersion, Revision: *revision, }) @@ -266,7 +266,7 @@ odd total number of args. See 'lhctl run --help' for details.`) scheduleWfReq.Variables = make(map[string]*lhproto.VariableValue) varDefs := littlehorse.GetInputVarDefs(wfSpec) - for i := 1; i+1 < len(args); i += 2 { + for i := 2; i+1 < len(args); i += 2 { varName := args[i] varValStr := args[i+1]