Skip to content

Commit

Permalink
feat: add threads and runs
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 1, 2024
1 parent e6901fd commit 5b3c909
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 24 deletions.
13 changes: 0 additions & 13 deletions canary/canary.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,3 @@ lh.canary.client.id=dev
lh.canary.topic.creation.enable=true
lh.canary.topic.creation.replicas=1
lh.canary.topic.creation.partitions=1

lh.canary.api.host=localhost
lh.canary.api.port=3023

lh.canary.metronome.enable=true
lh.canary.aggregator.enable=true

# Kafka settings
lh.canary.kafka.bootstrap.servers=localhost:9092

# LH settings
lh.canary.lhc.api.host=localhost
lh.canary.lhc.api.port=2023
4 changes: 3 additions & 1 deletion canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ private static void initializeBootstraps(final CanaryConfig config) {
config.getTopicName(),
config.toKafkaProducerConfig().toMap(),
config.toLittleHorseConfig().toMap(),
config.getMetronomeFrequency());
config.getMetronomeFrequency(),
config.getMetronomeThreads(),
config.getMetronomeRuns());
}

if (config.isAggregatorEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ public boolean isAggregatorEnabled() {
public long getMetronomeFrequency() {
return Long.parseLong(configs.get(LH_CANARY_METRONOME_FREQUENCY_MS).toString());
}

public int getMetronomeThreads() {
return Integer.parseInt(configs.get(LH_CANARY_METRONOME_THREADS).toString());
}

public int getMetronomeRuns() {
return Integer.parseInt(configs.get(LH_CANARY_METRONOME_RUNS).toString());
}
}
2 changes: 2 additions & 0 deletions canary/src/main/java/io/littlehorse/canary/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface Config {
String LH_CANARY_METRONOME_ENABLE = LH_CANARY_PREFIX + "metronome.enable";
String LH_CANARY_AGGREGATOR_ENABLE = LH_CANARY_PREFIX + "aggregator.enable";
String LH_CANARY_METRONOME_FREQUENCY_MS = LH_CANARY_PREFIX + "metronome.frequency.ms";
String LH_CANARY_METRONOME_THREADS = LH_CANARY_PREFIX + "metronome.threads";
String LH_CANARY_METRONOME_RUNS = LH_CANARY_PREFIX + "metronome.runs";

Map<String, Object> toMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,45 @@

import io.littlehorse.canary.kafka.MetricsEmitter;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Metronome implements Closeable, Runnable {
public class Metronome implements Closeable {

private final MetricsEmitter emitter;
private final ScheduledExecutorService executor;
private final ScheduledExecutorService mainExecutor;
private final ExecutorService requestsExecutor;
private final int runs;

public Metronome(final MetricsEmitter emitter, final long frequency) {
public Metronome(final MetricsEmitter emitter, final long frequency, final int threads, final int runs) {
this.emitter = emitter;

executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(this, 0, frequency, TimeUnit.MILLISECONDS);
this.runs = runs;

mainExecutor = Executors.newScheduledThreadPool(1);
mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency, TimeUnit.MILLISECONDS);

requestsExecutor = Executors.newFixedThreadPool(threads);
}

@Override
public void run() {
private static void executeRuns() {
log.trace("Executing runs");
}

private void scheduledRun() {
log.trace("Executing metronome");
for (int i = 0; i < runs; i++) {
requestsExecutor.submit(Metronome::executeRuns);
}
}

@Override
public void close() {
executor.shutdown();
mainExecutor.shutdown();
requestsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public MetronomeBootstrap(
final String metricsTopicName,
final Map<String, Object> kafkaProducerConfigMap,
final Map<String, Object> littleHorseConfigMap,
final long frequency) {
final long frequency,
final int threads,
final int runs) {

final LHConfig lhConfig = new LHConfig(littleHorseConfigMap);
final MetricsEmitter emitter = new MetricsEmitter(metricsTopicName, kafkaProducerConfigMap);
Expand All @@ -30,7 +32,7 @@ public MetronomeBootstrap(
initializeWorker(emitter, lhConfig);
initializeWorkflow(lhConfig);

final Metronome metronome = new Metronome(emitter, frequency);
final Metronome metronome = new Metronome(emitter, frequency, threads, runs);
Shutdown.addShutdownHook(metronome);

log.trace("Initialized");
Expand Down
19 changes: 19 additions & 0 deletions canary/src/main/resources/META-INF/microprofile-config.properties
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
# DEFAULT SETTINGS

# Canary settings
lh.canary.client.id=default

lh.canary.api.host=localhost
lh.canary.api.port=3023
lh.canary.metrics.host=localhost
lh.canary.metrics.port=9090

lh.canary.topic.name=canary-metric-beats
lh.canary.topic.creation.enable=false
lh.canary.topic.creation.replicas=3
lh.canary.topic.creation.partitions=12

lh.canary.aggregator.enable=true

lh.canary.metronome.enable=true
lh.canary.metronome.frequency.ms=1000
lh.canary.metronome.threads=4
lh.canary.metronome.runs=4

# LH settings
lh.canary.lhw.task-worker.id=${lh.canary.client.id}
lh.canary.lhc.api.host=localhost
lh.canary.lhc.api.port=2023

# Kafka settings
lh.canary.kafka.bootstrap.servers=localhost:9092

# Kafka producer settings
lh.canary.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
Expand Down

0 comments on commit 5b3c909

Please sign in to comment.