Skip to content

Commit

Permalink
feat: add frequency config
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 1, 2024
1 parent 7eea94a commit e6901fd
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 14 deletions.
3 changes: 2 additions & 1 deletion canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private static void initializeBootstraps(final CanaryConfig config) {
final MetronomeBootstrap metronomeBootstrap = new MetronomeBootstrap(
config.getTopicName(),
config.toKafkaProducerConfig().toMap(),
config.toLittleHorseConfig().toMap());
config.toLittleHorseConfig().toMap(),
config.getMetronomeFrequency());
}

if (config.isAggregatorEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private static Topology buildTopology(final String metricsTopicName) {
final KStream<String, Metric> metricStream =
builder.stream(metricsTopicName, SERDES).mapValues(AggregatorBootstrap::toMetric);

metricStream.foreach((key, value) -> log.debug("Aggregating {} {}", value.getMetricCase(), key));

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ public boolean isMetronomeEnabled() {
public boolean isAggregatorEnabled() {
return Boolean.parseBoolean(configs.get(LH_CANARY_AGGREGATOR_ENABLE).toString());
}

public long getMetronomeFrequency() {
return Long.parseLong(configs.get(LH_CANARY_METRONOME_FREQUENCY_MS).toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public interface Config {
String LH_CANARY_TOPIC_CREATION_REPLICAS = LH_CANARY_PREFIX + "topic.creation.replicas";
String LH_CANARY_METRONOME_ENABLE = LH_CANARY_PREFIX + "metronome.enable";
String LH_CANARY_AGGREGATOR_ENABLE = LH_CANARY_PREFIX + "aggregator.enable";
String LH_CANARY_METRONOME_FREQUENCY_MS = LH_CANARY_PREFIX + "metronome.frequency.ms";

Map<String, Object> toMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public Future<RecordMetadata> future(final String key, final Metric metric) {

return producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.debug("Emitting message {} [key={}]", metric.getMetricCase(), key);
log.debug("Emitting message {} {}", metric.getMetricCase(), key);
} else {
log.error("Emitting message {} [key={}]", metric.getMetricCase(), key, exception);
log.error("Emitting message {} {}", metric.getMetricCase(), key, exception);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

import io.littlehorse.canary.kafka.MetricsEmitter;
import java.io.Closeable;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Metronome extends TimerTask implements Closeable {
public class Metronome implements Closeable, Runnable {

private final MetricsEmitter emitter;
private final Timer executor;
private final ScheduledExecutorService executor;

public Metronome(final MetricsEmitter emitter) {
public Metronome(final MetricsEmitter emitter, final long frequency) {
this.emitter = emitter;
executor = new Timer(true);
executor.schedule(this, 0, TimeUnit.SECONDS.toMillis(1));

executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(this, 0, frequency, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -26,6 +27,6 @@ public void run() {

@Override
public void close() {
executor.cancel();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public class MetronomeBootstrap implements Bootstrap {
public MetronomeBootstrap(
final String metricsTopicName,
final Map<String, Object> kafkaProducerConfigMap,
final Map<String, Object> littleHorseConfigMap) {
final Map<String, Object> littleHorseConfigMap,
final long frequency) {

final LHConfig lhConfig = new LHConfig(littleHorseConfigMap);
final MetricsEmitter emitter = new MetricsEmitter(metricsTopicName, kafkaProducerConfigMap);
Expand All @@ -29,7 +30,7 @@ public MetronomeBootstrap(
initializeWorker(emitter, lhConfig);
initializeWorkflow(lhConfig);

final Metronome metronome = new Metronome(emitter);
final Metronome metronome = new Metronome(emitter, frequency);
Shutdown.addShutdownHook(metronome);

log.trace("Initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private void emitTaskRunLatencyMetric(final long startTime, final WorkerContext
.setTime(Timestamps.fromMillis(System.currentTimeMillis()))
.setTaskRunLatency(TaskRunLatency.newBuilder().setLatency(latency))
.build();
emitter.future(context.getIdempotencyKey(), metric);
emitter.future(context.getWfRunId().getId(), metric);
}

private void emitDuplicatedTaskRunMetric(final WorkerContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ lh.canary.topic.creation.enable=false
lh.canary.topic.creation.replicas=3
lh.canary.topic.creation.partitions=12

lh.canary.metronome.frequency.ms=1000

# LH settings
lh.canary.lhw.task-worker.id=${lh.canary.client.id}

Expand Down

0 comments on commit e6901fd

Please sign in to comment.