Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add extra tags to metronome beats #1195

Merged
merged 4 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static void initialize(final String[] args) throws IOException {
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
canaryConfig.getTopicName(),
canaryConfig.toKafkaConfig().toMap());
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getMetronomeBeatExtraTags());

// start worker
if (canaryConfig.isMetronomeWorkerEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public Topology toTopology() {
.filterNot(MetricsTopology::isExhaustedRetries)
// remove the id
.groupBy(
MetricsTopology::cleanBeatKey,
Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// reset aggregator every minute
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)))
// calculate average
Expand All @@ -62,8 +61,7 @@ public Topology toTopology() {
final KStream<MetricKey, MetricValue> countMetricStream = beatsStream
// remove the id
.groupBy(
MetricsTopology::cleanBeatKey,
Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// count all
.count(initializeCountStore(COUNT_STORE))
.toStream()
Expand Down Expand Up @@ -181,6 +179,10 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) {
Tag.newBuilder().setKey("status").setValue(key.getStatus().toLowerCase()));
}

if (key.getTagsCount() > 0) {
builder.addAllTags(key.getTagsList());
}

return builder.build();
}

Expand All @@ -195,13 +197,14 @@ private static AverageAggregator initializeAverageAggregator() {
return AverageAggregator.newBuilder().build();
}

private static BeatKey cleanBeatKey(final BeatKey key, final BeatValue value) {
private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
return BeatKey.newBuilder()
.setType(key.getType())
.setServerVersion(key.getServerVersion())
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.addAllTags(key.getTagsList())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ public class CanaryConfig implements Config {
public static final String METRONOME_GET_RETRIES = "metronome.get.retries";
public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable";
public static final String METRONOME_DATA_PATH = "metronome.data.path";
public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags";
public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS);

public static final String AGGREGATOR_ENABLE = "aggregator.enable";
public static final String AGGREGATOR_STORE_RETENTION_MS = "aggregator.store.retention.ms";

public static final String METRICS_PORT = "metrics.port";
public static final String METRICS_PATH = "metrics.path";
public static final String AGGREGATOR_STORE_RETENTION_MS = "aggregator.store.retention.ms";
public static final String METRICS_COMMON_TAGS = "metrics.common.tags";
public static final String METRICS_COMMON_TAGS_PREFIX = "%s.".formatted(METRICS_COMMON_TAGS);

Expand Down Expand Up @@ -147,6 +150,14 @@ public Map<String, String> getCommonTags() {
entry -> entry.getValue().toString()));
}

public Map<String, String> getMetronomeBeatExtraTags() {
return configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(METRONOME_BEAT_EXTRA_TAGS_PREFIX))
.collect(Collectors.toMap(
entry -> entry.getKey().substring(METRONOME_BEAT_EXTRA_TAGS_PREFIX.length()),
entry -> entry.getValue().toString()));
}

public boolean isTopicCreationEnabled() {
return Boolean.parseBoolean(getConfig(TOPIC_CREATION_ENABLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import io.littlehorse.canary.proto.BeatKey;
import io.littlehorse.canary.proto.BeatType;
import io.littlehorse.canary.proto.BeatValue;
import io.littlehorse.canary.proto.Tag;
import io.littlehorse.canary.util.ShutdownHook;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -18,6 +20,7 @@
public class BeatProducer {

private final Producer<Bytes, Bytes> producer;
private final Map<String, String> extraTags;
private final String lhServerHost;
private final int lhServerPort;
private final String lhServerVersion;
Expand All @@ -28,13 +31,15 @@ public BeatProducer(
final int lhServerPort,
final String lhServerVersion,
final String topicName,
final Map<String, Object> kafkaProducerConfigMap) {
final Map<String, Object> producerConfig,
final Map<String, String> extraTags) {
this.lhServerHost = lhServerHost;
this.lhServerPort = lhServerPort;
this.lhServerVersion = lhServerVersion;
this.topicName = topicName;
this.extraTags = extraTags;

producer = new KafkaProducer<>(kafkaProducerConfigMap);
producer = new KafkaProducer<>(producerConfig);
ShutdownHook.add("Beat Producer", producer);
}

Expand Down Expand Up @@ -91,6 +96,15 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat
builder.setStatus(status);
}

final List<Tag> tags = extraTags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();

builder.addAllTags(tags);

return builder.build();
}
}
2 changes: 2 additions & 0 deletions canary/src/main/proto/beats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option java_multiple_files = true;
option java_package = "io.littlehorse.canary.proto";

import "google/protobuf/timestamp.proto";
import "metrics.proto";

enum BeatType {
WF_RUN_REQUEST = 0;
Expand All @@ -25,6 +26,7 @@ message BeatKey {
BeatType type = 4;
optional string status = 5;
optional string id = 6;
repeated Tag tags = 7;
}

message BeatValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -46,34 +48,59 @@ private static MetricKey newMetricKey(String id) {
}

private static MetricKey newMetricKey(String id, String status) {
return newMetricKey(HOST_1, PORT_1, id, status);
return newMetricKey(HOST_1, PORT_1, id, status, null);
}

private static MetricKey newMetricKey(String host, int port, String id) {
return newMetricKey(host, port, id, null);
return newMetricKey(host, port, id, null, null);
}

private static MetricKey newMetricKey(String host, int port, String id, String status) {
private static MetricKey newMetricKey(String id, String status, Map<String, String> tags) {
return newMetricKey(HOST_1, PORT_1, id, status, tags);
}

private static MetricKey newMetricKey(String host, int port, String id, String status, Map<String, String> tags) {
MetricKey.Builder builder =
MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id);

if (status != null) {
builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build());
}

if (tags != null) {
List<Tag> tagList = tags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();
builder.addAllTags(tagList);
}

return builder.build();
}

private static TestRecord<BeatKey, BeatValue> newBeat(BeatType type, String id, Long latency) {
return newBeat(HOST_1, PORT_1, type, id, latency, null);
return newBeat(HOST_1, PORT_1, type, id, latency, null, null);
}

private static TestRecord<BeatKey, BeatValue> newBeat(BeatType type, String id, Long latency, String beatStatus) {
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus);
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus, null);
}

private static TestRecord<BeatKey, BeatValue> newBeat(
String host, int port, BeatType type, String id, Long latency, String beatStatus) {
BeatType type, String id, Long latency, String beatStatus, Map<String, String> tags) {
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus, tags);
}

private static TestRecord<BeatKey, BeatValue> newBeat(
String host,
int port,
BeatType type,
String id,
Long latency,
String beatStatus,
Map<String, String> tags) {
BeatKey.Builder keyBuilder = BeatKey.newBuilder()
.setServerHost(host)
.setServerPort(port)
Expand All @@ -85,6 +112,16 @@ private static TestRecord<BeatKey, BeatValue> newBeat(
keyBuilder.setStatus(beatStatus);
}

if (tags != null) {
List<Tag> tagList = tags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();
keyBuilder.addAllTags(tagList);
}

if (latency != null) {
valueBuilder.setLatency(latency);
}
Expand Down Expand Up @@ -136,6 +173,19 @@ void calculateCountAndLatencyForWfRunRequest() {
.isEqualTo(newMetricValue(3.));
}

@Test
void includeBeatTagsIntoMetrics() {
BeatType expectedType = BeatType.WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();
Map<String, String> expectedTags = Map.of("my_tag", "value");

inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok", expectedTags));

assertThat(getCount()).isEqualTo(3);
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok", expectedTags)))
.isEqualTo(newMetricValue(20.));
}

@Test
void calculateCountForExhaustedRetries() {
BeatType expectedType = BeatType.GET_WF_RUN_EXHAUSTED_RETRIES;
Expand Down Expand Up @@ -270,9 +320,9 @@ void calculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() {
inputTopic.pipeInput(newBeat(expectedType, expectedUniqueId, 10L));
inputTopic.pipeInput(newBeat(expectedType, expectedUniqueId, 30L));

inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 20L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 10L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 30L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 20L, null, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 10L, null, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 30L, null, null));

assertThat(getCount()).isEqualTo(8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,25 @@ void getCommonTags() {

assertThat(output).contains(entry("application_id", "my_id"), entry("extra", "extra_tag"));
}

@Test
void getMetronomeExtraTags() {
Map<String, Object> input = Map.of("lh.canary.metronome.beat.extra.tags.my_tag", "extra_tag");

CanaryConfig canaryConfig = new CanaryConfig(input);

Map<String, String> output = canaryConfig.getMetronomeBeatExtraTags();

assertThat(output).contains(entry("my_tag", "extra_tag"));
}

@Test
void getEmptyMetronomeExtraTags() {
CanaryConfig canaryConfig = new CanaryConfig(Map.of());

Map<String, String> output = canaryConfig.getMetronomeBeatExtraTags();

assertThat(output).isEmpty();
assertThat(output).isNotNull();
}
}
10 changes: 10 additions & 0 deletions docs/CANARY_CONFIGURATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [`lh.canary.metronome.get.threads`](#lhcanarymetronomegetthreads)
* [`lh.canary.metronome.get.retries`](#lhcanarymetronomegetretries)
* [`lh.canary.metronome.data.path`](#lhcanarymetronomedatapath)
* [`lh.canary.metronome.beat.extra.tags.<additional tag>`](#lhcanarymetronomebeatextratagsadditional-tag)
* [Kafka Configurations](#kafka-configurations)
* [LH Client Configurations](#lh-client-configurations)
* [Task Worker](#task-worker)
Expand Down Expand Up @@ -156,6 +157,15 @@ Local DB path.
- **Default:** /tmp/canaryMetronome
- **Importance:** medium

#### `lh.canary.metronome.beat.extra.tags.<additional tag>`

This config is useful to add more tags to prometheus metrics.
For example: `lh.canary.metronome.beat.extra.tags.my_tag=my-value`.

- **Type:** string
- **Default:** null
- **Importance:** low

### Kafka Configurations

LH Canary supports all kafka configurations. Use the prefix `lh.canary.kafka` and append the kafka config.
Expand Down
2 changes: 1 addition & 1 deletion sdk-python/examples/basic/example_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
Loading
Loading