Skip to content

Commit

Permalink
feat: add extra tags to metronome beats
Browse files Browse the repository at this point in the history
Co-authored-by: Mijail Rondon <[email protected]>
  • Loading branch information
sauljabin and mijailr committed Dec 13, 2024
1 parent 87aac48 commit 0abe27e
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 120 deletions.
3 changes: 2 additions & 1 deletion canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static void initialize(final String[] args) throws IOException {
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
canaryConfig.getTopicName(),
canaryConfig.toKafkaConfig().toMap());
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getMetronomeBeatExtraTags());

// start worker
if (canaryConfig.isMetronomeWorkerEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public Topology toTopology() {
.filterNot(MetricsTopology::isExhaustedRetries)
// remove the id
.groupBy(
MetricsTopology::cleanBeatKey,
Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// reset aggregator every minute
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)))
// calculate average
Expand All @@ -62,8 +61,7 @@ public Topology toTopology() {
final KStream<MetricKey, MetricValue> countMetricStream = beatsStream
// remove the id
.groupBy(
MetricsTopology::cleanBeatKey,
Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
MetricsTopology::removeWfId, Grouped.with(ProtobufSerdes.BeatKey(), ProtobufSerdes.BeatValue()))
// count all
.count(initializeCountStore(COUNT_STORE))
.toStream()
Expand Down Expand Up @@ -181,6 +179,10 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) {
Tag.newBuilder().setKey("status").setValue(key.getStatus().toLowerCase()));
}

if (key.getTagsCount() > 0) {
builder.addAllTags(key.getTagsList());
}

return builder.build();
}

Expand All @@ -195,13 +197,14 @@ private static AverageAggregator initializeAverageAggregator() {
return AverageAggregator.newBuilder().build();
}

private static BeatKey cleanBeatKey(final BeatKey key, final BeatValue value) {
private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
return BeatKey.newBuilder()
.setType(key.getType())
.setServerVersion(key.getServerVersion())
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.addAllTags(key.getTagsList())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ public class CanaryConfig implements Config {
public static final String METRONOME_GET_RETRIES = "metronome.get.retries";
public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable";
public static final String METRONOME_DATA_PATH = "metronome.data.path";
public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags";
public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS);

public static final String AGGREGATOR_ENABLE = "aggregator.enable";
public static final String AGGREGATOR_STORE_RETENTION_MS = "aggregator.store.retention.ms";

public static final String METRICS_PORT = "metrics.port";
public static final String METRICS_PATH = "metrics.path";
public static final String AGGREGATOR_STORE_RETENTION_MS = "aggregator.store.retention.ms";
public static final String METRICS_COMMON_TAGS = "metrics.common.tags";
public static final String METRICS_COMMON_TAGS_PREFIX = "%s.".formatted(METRICS_COMMON_TAGS);

Expand Down Expand Up @@ -147,6 +150,14 @@ public Map<String, String> getCommonTags() {
entry -> entry.getValue().toString()));
}

public Map<String, String> getMetronomeBeatExtraTags() {
return configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(METRONOME_BEAT_EXTRA_TAGS_PREFIX))
.collect(Collectors.toMap(
entry -> entry.getKey().substring(METRONOME_BEAT_EXTRA_TAGS_PREFIX.length()),
entry -> entry.getValue().toString()));
}

public boolean isTopicCreationEnabled() {
return Boolean.parseBoolean(getConfig(TOPIC_CREATION_ENABLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import io.littlehorse.canary.proto.BeatKey;
import io.littlehorse.canary.proto.BeatType;
import io.littlehorse.canary.proto.BeatValue;
import io.littlehorse.canary.proto.Tag;
import io.littlehorse.canary.util.ShutdownHook;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -18,6 +20,7 @@
public class BeatProducer {

private final Producer<Bytes, Bytes> producer;
private final Map<String, String> extraTags;
private final String lhServerHost;
private final int lhServerPort;
private final String lhServerVersion;
Expand All @@ -28,13 +31,15 @@ public BeatProducer(
final int lhServerPort,
final String lhServerVersion,
final String topicName,
final Map<String, Object> kafkaProducerConfigMap) {
final Map<String, Object> producerConfig,
final Map<String, String> extraTags) {
this.lhServerHost = lhServerHost;
this.lhServerPort = lhServerPort;
this.lhServerVersion = lhServerVersion;
this.topicName = topicName;
this.extraTags = extraTags;

producer = new KafkaProducer<>(kafkaProducerConfigMap);
producer = new KafkaProducer<>(producerConfig);
ShutdownHook.add("Beat Producer", producer);
}

Expand Down Expand Up @@ -91,6 +96,15 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat
builder.setStatus(status);
}

final List<Tag> tags = extraTags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();

builder.addAllTags(tags);

return builder.build();
}
}
2 changes: 2 additions & 0 deletions canary/src/main/proto/beats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option java_multiple_files = true;
option java_package = "io.littlehorse.canary.proto";

import "google/protobuf/timestamp.proto";
import "metrics.proto";

enum BeatType {
WF_RUN_REQUEST = 0;
Expand All @@ -25,6 +26,7 @@ message BeatKey {
BeatType type = 4;
optional string status = 5;
optional string id = 6;
repeated Tag tags = 7;
}

message BeatValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -46,34 +48,59 @@ private static MetricKey newMetricKey(String id) {
}

private static MetricKey newMetricKey(String id, String status) {
return newMetricKey(HOST_1, PORT_1, id, status);
return newMetricKey(HOST_1, PORT_1, id, status, null);
}

private static MetricKey newMetricKey(String host, int port, String id) {
return newMetricKey(host, port, id, null);
return newMetricKey(host, port, id, null, null);
}

private static MetricKey newMetricKey(String host, int port, String id, String status) {
private static MetricKey newMetricKey(String id, String status, Map<String, String> tags) {
return newMetricKey(HOST_1, PORT_1, id, status, tags);
}

private static MetricKey newMetricKey(String host, int port, String id, String status, Map<String, String> tags) {
MetricKey.Builder builder =
MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id);

if (status != null) {
builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build());
}

if (tags != null) {
List<Tag> tagList = tags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();
builder.addAllTags(tagList);
}

return builder.build();
}

private static TestRecord<BeatKey, BeatValue> newBeat(BeatType type, String id, Long latency) {
return newBeat(HOST_1, PORT_1, type, id, latency, null);
return newBeat(HOST_1, PORT_1, type, id, latency, null, null);
}

private static TestRecord<BeatKey, BeatValue> newBeat(BeatType type, String id, Long latency, String beatStatus) {
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus);
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus, null);
}

private static TestRecord<BeatKey, BeatValue> newBeat(
String host, int port, BeatType type, String id, Long latency, String beatStatus) {
BeatType type, String id, Long latency, String beatStatus, Map<String, String> tags) {
return newBeat(HOST_1, PORT_1, type, id, latency, beatStatus, tags);
}

private static TestRecord<BeatKey, BeatValue> newBeat(
String host,
int port,
BeatType type,
String id,
Long latency,
String beatStatus,
Map<String, String> tags) {
BeatKey.Builder keyBuilder = BeatKey.newBuilder()
.setServerHost(host)
.setServerPort(port)
Expand All @@ -85,6 +112,16 @@ private static TestRecord<BeatKey, BeatValue> newBeat(
keyBuilder.setStatus(beatStatus);
}

if (tags != null) {
List<Tag> tagList = tags.entrySet().stream()
.map(entry -> Tag.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.toList();
keyBuilder.addAllTags(tagList);
}

if (latency != null) {
valueBuilder.setLatency(latency);
}
Expand Down Expand Up @@ -136,6 +173,18 @@ void calculateCountAndLatencyForWfRunRequest() {
.isEqualTo(newMetricValue(3.));
}

@Test
void includeBeatTagsIntoMetrics() {
BeatType expectedType = BeatType.WF_RUN_REQUEST;
String expectedTypeName = expectedType.name().toLowerCase();

inputTopic.pipeInput(newBeat(expectedType, getRandomId(), 20L, "ok", Map.of("my_tag", "value")));

assertThat(getCount()).isEqualTo(3);
assertThat(store.get(newMetricKey("canary_" + expectedTypeName + "_avg", "ok", Map.of("my_tag", "value"))))
.isEqualTo(newMetricValue(20.));
}

@Test
void calculateCountForExhaustedRetries() {
BeatType expectedType = BeatType.GET_WF_RUN_EXHAUSTED_RETRIES;
Expand Down Expand Up @@ -270,9 +319,9 @@ void calculateCountAndLatencyForTaskRunWithDuplicatedAndTwoServers() {
inputTopic.pipeInput(newBeat(expectedType, expectedUniqueId, 10L));
inputTopic.pipeInput(newBeat(expectedType, expectedUniqueId, 30L));

inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 20L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 10L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 30L, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 20L, null, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 10L, null, null));
inputTopic.pipeInput(newBeat(HOST_2, PORT_2, expectedType, expectedUniqueId, 30L, null, null));

assertThat(getCount()).isEqualTo(8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,25 @@ void getCommonTags() {

assertThat(output).contains(entry("application_id", "my_id"), entry("extra", "extra_tag"));
}

@Test
void getMetronomeExtraTags() {
Map<String, Object> input = Map.of("lh.canary.metronome.beat.extra.tags.my_tag", "extra_tag");

CanaryConfig canaryConfig = new CanaryConfig(input);

Map<String, String> output = canaryConfig.getMetronomeBeatExtraTags();

assertThat(output).contains(entry("my_tag", "extra_tag"));
}

@Test
void getEmptyMetronomeExtraTags() {
CanaryConfig canaryConfig = new CanaryConfig(Map.of());

Map<String, String> output = canaryConfig.getMetronomeBeatExtraTags();

assertThat(output).isEmpty();
assertThat(output).isNotNull();
}
}
2 changes: 1 addition & 1 deletion sdk-python/examples/basic/example_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
36 changes: 22 additions & 14 deletions sdk-python/examples/user_tasks/user_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ def get_config() -> LHConfig:
config.load(config_path)
return config


async def get_user_task_def() -> PutUserTaskDefRequest:
return PutUserTaskDefRequest(
name="person-details",
fields=[
UserTaskField(
name="PersonDetails",
description="Person complementary information",
display_name="Other Details",
required=True,
type=VariableType.STR
)
]
)
name="person-details",
fields=[
UserTaskField(
name="PersonDetails",
description="Person complementary information",
display_name="Other Details",
required=True,
type=VariableType.STR,
)
],
)


def get_workflow() -> Workflow:
def my_entrypoint(wf: WorkflowThread) -> None:
Expand All @@ -42,16 +44,22 @@ def my_entrypoint(wf: WorkflowThread) -> None:
arg1 = "Sam"
arg2 = {"identification": "1258796641-4", "Address": "NA-Street", "Age": 28}

wf.schedule_reminder_task(user_task_output, delay_in_seconds, task_def_name, arg1, arg2)
wf.schedule_reminder_task(
user_task_output, delay_in_seconds, task_def_name, arg1, arg2
)

return Workflow("example-user-tasks", my_entrypoint)

async def greeting(name: str, person_details: dict[str, Any], ctx: WorkerContext) -> str:

async def greeting(
name: str, person_details: dict[str, Any], ctx: WorkerContext
) -> str:
msg = f"Hello {name}!. WfRun {ctx.wf_run_id.id} Person: {person_details}"
print(msg)
await asyncio.sleep(random.uniform(0.5, 1.5))
return msg


async def main() -> None:
config = get_config()
wf = get_workflow()
Expand All @@ -66,4 +74,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
Loading

0 comments on commit 0abe27e

Please sign in to comment.