From 5b3c9096734096b322b660dc719f481da3ff261b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Pi=C3=B1a?= Date: Thu, 1 Feb 2024 09:40:53 -0500 Subject: [PATCH] feat: add threads and runs --- canary/canary.properties | 13 -------- .../main/java/io/littlehorse/canary/Main.java | 4 ++- .../canary/config/CanaryConfig.java | 8 +++++ .../io/littlehorse/canary/config/Config.java | 2 ++ .../canary/metronome/Metronome.java | 30 ++++++++++++++----- .../canary/metronome/MetronomeBootstrap.java | 6 ++-- .../META-INF/microprofile-config.properties | 19 ++++++++++++ 7 files changed, 58 insertions(+), 24 deletions(-) diff --git a/canary/canary.properties b/canary/canary.properties index b28b22831..28aecf0ef 100644 --- a/canary/canary.properties +++ b/canary/canary.properties @@ -4,16 +4,3 @@ lh.canary.client.id=dev lh.canary.topic.creation.enable=true lh.canary.topic.creation.replicas=1 lh.canary.topic.creation.partitions=1 - -lh.canary.api.host=localhost -lh.canary.api.port=3023 - -lh.canary.metronome.enable=true -lh.canary.aggregator.enable=true - -# Kafka settings -lh.canary.kafka.bootstrap.servers=localhost:9092 - -# LH settings -lh.canary.lhc.api.host=localhost -lh.canary.lhc.api.port=2023 diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java index c01cf6adc..c151702b2 100644 --- a/canary/src/main/java/io/littlehorse/canary/Main.java +++ b/canary/src/main/java/io/littlehorse/canary/Main.java @@ -44,7 +44,9 @@ private static void initializeBootstraps(final CanaryConfig config) { config.getTopicName(), config.toKafkaProducerConfig().toMap(), config.toLittleHorseConfig().toMap(), - config.getMetronomeFrequency()); + config.getMetronomeFrequency(), + config.getMetronomeThreads(), + config.getMetronomeRuns()); } if (config.isAggregatorEnabled()) { diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java index da03f5887..c8c3dcf9d 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java +++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java @@ -63,4 +63,12 @@ public boolean isAggregatorEnabled() { public long getMetronomeFrequency() { return Long.parseLong(configs.get(LH_CANARY_METRONOME_FREQUENCY_MS).toString()); } + + public int getMetronomeThreads() { + return Integer.parseInt(configs.get(LH_CANARY_METRONOME_THREADS).toString()); + } + + public int getMetronomeRuns() { + return Integer.parseInt(configs.get(LH_CANARY_METRONOME_RUNS).toString()); + } } diff --git a/canary/src/main/java/io/littlehorse/canary/config/Config.java b/canary/src/main/java/io/littlehorse/canary/config/Config.java index f5f270e44..2e4b29104 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/Config.java +++ b/canary/src/main/java/io/littlehorse/canary/config/Config.java @@ -12,6 +12,8 @@ public interface Config { String LH_CANARY_METRONOME_ENABLE = LH_CANARY_PREFIX + "metronome.enable"; String LH_CANARY_AGGREGATOR_ENABLE = LH_CANARY_PREFIX + "aggregator.enable"; String LH_CANARY_METRONOME_FREQUENCY_MS = LH_CANARY_PREFIX + "metronome.frequency.ms"; + String LH_CANARY_METRONOME_THREADS = LH_CANARY_PREFIX + "metronome.threads"; + String LH_CANARY_METRONOME_RUNS = LH_CANARY_PREFIX + "metronome.runs"; Map toMap(); } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java index 72b3296fd..57efeb1a9 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java @@ -2,31 +2,45 @@ import io.littlehorse.canary.kafka.MetricsEmitter; import java.io.Closeable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @Slf4j -public class Metronome implements Closeable, Runnable { +public class Metronome implements Closeable { private final MetricsEmitter emitter; - private final ScheduledExecutorService executor; + private final ScheduledExecutorService mainExecutor; + private final ExecutorService requestsExecutor; + private final int runs; - public Metronome(final MetricsEmitter emitter, final long frequency) { + public Metronome(final MetricsEmitter emitter, final long frequency, final int threads, final int runs) { this.emitter = emitter; - executor = Executors.newScheduledThreadPool(1); - executor.scheduleAtFixedRate(this, 0, frequency, TimeUnit.MILLISECONDS); + this.runs = runs; + + mainExecutor = Executors.newScheduledThreadPool(1); + mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency, TimeUnit.MILLISECONDS); + + requestsExecutor = Executors.newFixedThreadPool(threads); } - @Override - public void run() { + private static void executeRuns() { + log.trace("Executing runs"); + } + + private void scheduledRun() { log.trace("Executing metronome"); + for (int i = 0; i < runs; i++) { + requestsExecutor.submit(Metronome::executeRuns); + } } @Override public void close() { - executor.shutdown(); + mainExecutor.shutdown(); + requestsExecutor.shutdown(); } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java index 9608a2e47..30e0185b6 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java @@ -21,7 +21,9 @@ public MetronomeBootstrap( final String metricsTopicName, final Map kafkaProducerConfigMap, final Map littleHorseConfigMap, - final long frequency) { + final long frequency, + final int threads, + final int runs) { final LHConfig lhConfig = new LHConfig(littleHorseConfigMap); final MetricsEmitter emitter = new MetricsEmitter(metricsTopicName, kafkaProducerConfigMap); @@ -30,7 +32,7 @@ public MetronomeBootstrap( initializeWorker(emitter, lhConfig); initializeWorkflow(lhConfig); - final Metronome metronome = new Metronome(emitter, frequency); + final Metronome metronome = new Metronome(emitter, frequency, threads, runs); Shutdown.addShutdownHook(metronome); log.trace("Initialized"); diff --git a/canary/src/main/resources/META-INF/microprofile-config.properties b/canary/src/main/resources/META-INF/microprofile-config.properties index 5d9d7cc44..d644e53be 100644 --- a/canary/src/main/resources/META-INF/microprofile-config.properties +++ b/canary/src/main/resources/META-INF/microprofile-config.properties @@ -1,13 +1,32 @@ +# DEFAULT SETTINGS + # Canary settings +lh.canary.client.id=default + +lh.canary.api.host=localhost +lh.canary.api.port=3023 +lh.canary.metrics.host=localhost +lh.canary.metrics.port=9090 + lh.canary.topic.name=canary-metric-beats lh.canary.topic.creation.enable=false lh.canary.topic.creation.replicas=3 lh.canary.topic.creation.partitions=12 +lh.canary.aggregator.enable=true + +lh.canary.metronome.enable=true lh.canary.metronome.frequency.ms=1000 +lh.canary.metronome.threads=4 +lh.canary.metronome.runs=4 # LH settings lh.canary.lhw.task-worker.id=${lh.canary.client.id} +lh.canary.lhc.api.host=localhost +lh.canary.lhc.api.port=2023 + +# Kafka settings +lh.canary.kafka.bootstrap.servers=localhost:9092 # Kafka producer settings lh.canary.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer