diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0dd77a642..6dc7284e2 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -87,33 +87,12 @@ repos:
# Tests
- id: java-build
- name: Tests Build
+ name: Tests Java Projects
language: system
always_run: true
pass_filenames: false
entry: ./gradlew build
- - id: server-tests
- name: Running Server Tests
- language: system
- always_run: true
- pass_filenames: false
- entry: ./gradlew server:test
-
- - id: canary-tests
- name: Running Canary Java Tests
- language: system
- always_run: true
- pass_filenames: false
- entry: ./gradlew canary:test
-
- - id: java-tests
- name: Running SDK Java Tests
- language: system
- always_run: true
- pass_filenames: false
- entry: ./gradlew sdk-java:test
-
- id: javadoc
name: Running JavaDoc
language: system
diff --git a/canary/build.gradle b/canary/build.gradle
index d8d15cb99..0ea8f3334 100644
--- a/canary/build.gradle
+++ b/canary/build.gradle
@@ -1,6 +1,7 @@
plugins {
id 'java'
id 'application'
+ id 'checkstyle'
id 'com.diffplug.spotless' version '6.25.0'
id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'io.freefair.lombok' version '8.4'
@@ -82,3 +83,12 @@ protobuf {
}
}
}
+
+checkstyle {
+ configFile = file("${rootDir}/canary/checkstyle.xml")
+ checkstyleTest.enabled = false
+}
+
+tasks.withType(Checkstyle).configureEach {
+ exclude('**io/littlehorse/canary/proto**')
+}
diff --git a/canary/checkstyle.xml b/canary/checkstyle.xml
new file mode 100644
index 000000000..bd8444dc5
--- /dev/null
+++ b/canary/checkstyle.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
diff --git a/canary/src/main/java/io/littlehorse/canary/CanaryException.java b/canary/src/main/java/io/littlehorse/canary/CanaryException.java
index cceb74544..b8bf28b6d 100644
--- a/canary/src/main/java/io/littlehorse/canary/CanaryException.java
+++ b/canary/src/main/java/io/littlehorse/canary/CanaryException.java
@@ -1,17 +1,8 @@
package io.littlehorse.canary;
public class CanaryException extends RuntimeException {
- public CanaryException() {}
- public CanaryException(String message) {
- super(message);
- }
-
- public CanaryException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public CanaryException(Throwable cause) {
+ public CanaryException(final Throwable cause) {
super(cause);
}
}
diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java
index 72797738c..21407b903 100644
--- a/canary/src/main/java/io/littlehorse/canary/Main.java
+++ b/canary/src/main/java/io/littlehorse/canary/Main.java
@@ -13,8 +13,8 @@
@Slf4j
public class Main {
- public static void main(String[] args) throws IOException, InterruptedException {
- CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();
+ public static void main(final String[] args) throws IOException, InterruptedException {
+ final CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load();
log.debug("Canary configurations: {}", config);
log.debug("KafkaAdmin configurations: {}", config.toKafkaAdminConfig());
@@ -22,8 +22,8 @@ public static void main(String[] args) throws IOException, InterruptedException
log.debug("KafkaStreams configurations: {}", config.toKafkaStreamsConfig());
log.debug("LittleHorse configurations: {}", config.toLittleHorseConfig());
- int latchSize = 1 + (config.isMetronomeEnabled() ? 1 : 0) + (config.isAggregatorEnabled() ? 1 : 0);
- CountDownLatch latch = new CountDownLatch(latchSize);
+ final int latchSize = 1 + (config.isMetronomeEnabled() ? 1 : 0) + (config.isAggregatorEnabled() ? 1 : 0);
+ final CountDownLatch latch = new CountDownLatch(latchSize);
try {
initiateKafkaTopicBootstrap(config, latch);
@@ -45,22 +45,22 @@ public static void main(String[] args) throws IOException, InterruptedException
log.info("Stopped");
}
- private static void initiateAggregatorBootstrap(CanaryConfig config, CountDownLatch latch) {
- AggregatorBootstrap aggregatorBootstrap = new AggregatorBootstrap(
+ private static void initiateAggregatorBootstrap(final CanaryConfig config, final CountDownLatch latch) {
+ final AggregatorBootstrap aggregatorBootstrap = new AggregatorBootstrap(
config.getTopicName(), config.toKafkaStreamsConfig().toMap());
addShutdownHook(aggregatorBootstrap, latch);
}
- private static void initiateMetronomeBootstrap(CanaryConfig config, CountDownLatch latch) {
- MetronomeBootstrap metronomeBootstrap = new MetronomeBootstrap(
+ private static void initiateMetronomeBootstrap(final CanaryConfig config, final CountDownLatch latch) {
+ final MetronomeBootstrap metronomeBootstrap = new MetronomeBootstrap(
config.getTopicName(),
config.toKafkaProducerConfig().toMap(),
config.toLittleHorseConfig().toMap());
addShutdownHook(metronomeBootstrap, latch);
}
- private static void initiateKafkaTopicBootstrap(CanaryConfig config, CountDownLatch latch) {
- KafkaTopicBootstrap kafkaTopicBootstrap = new KafkaTopicBootstrap(
+ private static void initiateKafkaTopicBootstrap(final CanaryConfig config, final CountDownLatch latch) {
+ final KafkaTopicBootstrap kafkaTopicBootstrap = new KafkaTopicBootstrap(
config.getTopicName(),
config.getTopicPartitions(),
config.getTopicReplicas(),
@@ -68,7 +68,7 @@ private static void initiateKafkaTopicBootstrap(CanaryConfig config, CountDownLa
addShutdownHook(kafkaTopicBootstrap, latch);
}
- private static void addShutdownHook(Bootstrap bootstrap, CountDownLatch latch) {
+ private static void addShutdownHook(final Bootstrap bootstrap, final CountDownLatch latch) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.trace("{} shutdown process started", bootstrap.getClass().getSimpleName());
try {
diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java b/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java
index 4425c1917..6f6dab6fc 100644
--- a/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java
+++ b/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java
@@ -21,14 +21,14 @@ public class AggregatorBootstrap implements Bootstrap {
private static final Consumed SERDES = Consumed.with(Serdes.String(), Serdes.Bytes());
private final KafkaStreams kafkaStreams;
- public AggregatorBootstrap(String topicName, Map kafkaStreamsConfigMap) {
- kafkaStreams = new KafkaStreams(buildTopology(topicName), new StreamsConfig(kafkaStreamsConfigMap));
+ public AggregatorBootstrap(final String metricsTopicName, final Map kafkaStreamsConfigMap) {
+ kafkaStreams = new KafkaStreams(buildTopology(metricsTopicName), new StreamsConfig(kafkaStreamsConfigMap));
kafkaStreams.start();
log.trace("Initialized");
}
- private static Metric toMetric(Bytes value) {
+ private static Metric toMetric(final Bytes value) {
try {
return Metric.parseFrom(value.get());
} catch (InvalidProtocolBufferException e) {
@@ -39,11 +39,11 @@ private static Metric toMetric(Bytes value) {
}
}
- private Topology buildTopology(String topicName) {
- StreamsBuilder builder = new StreamsBuilder();
+ private Topology buildTopology(final String metricsTopicName) {
+ final StreamsBuilder builder = new StreamsBuilder();
- KStream metricStream =
- builder.stream(topicName, SERDES).mapValues(AggregatorBootstrap::toMetric);
+ final KStream metricStream =
+ builder.stream(metricsTopicName, SERDES).mapValues(AggregatorBootstrap::toMetric);
return builder.build();
}
diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java
index 2bd209751..9497b3320 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java
@@ -8,7 +8,7 @@ public class CanaryConfig implements Config {
private final Map configs;
- public CanaryConfig(Map configs) {
+ public CanaryConfig(final Map configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_PREFIX))
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
diff --git a/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java
index fbf5a1699..25275a18b 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java
@@ -24,7 +24,7 @@ private ConfigLoader() {}
* @return A CanaryConfig with all system configuration
*/
public static CanaryConfig load() {
- SmallRyeConfig config = new SmallRyeConfigBuilder()
+ final SmallRyeConfig config = new SmallRyeConfigBuilder()
.addDefaultInterceptors()
.addDefaultSources()
.build();
@@ -42,8 +42,8 @@ public static CanaryConfig load() {
* @return A CanaryConfig with all system configuration
* @throws IOException If properties file was not found
*/
- public static CanaryConfig load(Path path) throws IOException {
- SmallRyeConfig config = new SmallRyeConfigBuilder()
+ public static CanaryConfig load(final Path path) throws IOException {
+ final SmallRyeConfig config = new SmallRyeConfigBuilder()
.addDefaultInterceptors()
.addDefaultSources()
.withSources(new PropertiesConfigSource(path.toUri().toURL(), 200))
@@ -61,8 +61,8 @@ public static CanaryConfig load(Path path) throws IOException {
* @param properties Properties object
* @return A CanaryConfig with all system configuration
*/
- public static CanaryConfig load(Properties properties) {
- SmallRyeConfig config = new SmallRyeConfigBuilder()
+ public static CanaryConfig load(final Properties properties) {
+ final SmallRyeConfig config = new SmallRyeConfigBuilder()
.addDefaultInterceptors()
.addDefaultSources()
.withSources(new PropertiesConfigSource(
@@ -71,8 +71,8 @@ public static CanaryConfig load(Properties properties) {
return new CanaryConfig(toMap(config));
}
- private static Map toMap(Config config) {
- Map configs = new TreeMap<>();
+ private static Map toMap(final Config config) {
+ final Map configs = new TreeMap<>();
for (String key : config.getPropertyNames()) {
configs.put(key, config.getOptionalValue(key, String.class).orElse(""));
}
diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java
index 7ea66524c..d493182f4 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java
@@ -10,7 +10,7 @@
public class KafkaAdminConfig implements Config {
private final Map configs;
- public KafkaAdminConfig(Map configs) {
+ public KafkaAdminConfig(final Map configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java
index 785f7f6be..b6729ba8a 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java
@@ -9,7 +9,7 @@
public class KafkaProducerConfig implements Config {
private final Map configs;
- public KafkaProducerConfig(Map configs) {
+ public KafkaProducerConfig(final Map configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java
index 3a712b44a..6b440c476 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java
@@ -9,7 +9,7 @@
public class KafkaStreamsConfig implements Config {
private final Map configs;
- public KafkaStreamsConfig(Map configs) {
+ public KafkaStreamsConfig(final Map configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX))
.map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue()))
diff --git a/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java b/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java
index 7ffd2f580..20817839e 100644
--- a/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java
+++ b/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java
@@ -9,11 +9,11 @@
public class LittleHorseConfig implements Config {
private final Map configs;
- public LittleHorseConfig(Map configs) {
+ public LittleHorseConfig(final Map configs) {
this.configs = configs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(LH_CANARY_PREFIX))
.map(entry -> {
- String formattedKey = entry.getKey()
+ final String formattedKey = entry.getKey()
.substring(LH_CANARY_PREFIX.length())
.toUpperCase()
.replace(".", "_")
diff --git a/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java b/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java
index 6faca5c0f..2b1316efc 100644
--- a/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java
+++ b/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java
@@ -16,14 +16,17 @@ public class KafkaTopicBootstrap implements Bootstrap {
private final AdminClient adminClient;
public KafkaTopicBootstrap(
- String topicName, int topicPartitions, short topicReplicas, Map kafkaAdminConfigMap) {
+ final String metricsTopicName,
+ final int topicPartitions,
+ final short topicReplicas,
+ final Map kafkaAdminConfigMap) {
adminClient = KafkaAdminClient.create(kafkaAdminConfigMap);
try {
- NewTopic canaryTopic = new NewTopic(topicName, topicPartitions, topicReplicas);
+ final NewTopic canaryTopic = new NewTopic(metricsTopicName, topicPartitions, topicReplicas);
adminClient.createTopics(List.of(canaryTopic)).all().get();
- log.info("Topics {} created", topicName);
+ log.info("Topics {} created", metricsTopicName);
} catch (Exception e) {
if (e.getCause() instanceof TopicExistsException) {
log.warn(e.getMessage());
diff --git a/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java b/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java
index ddea00dad..5fe6229bb 100644
--- a/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java
+++ b/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java
@@ -19,13 +19,14 @@ public class MetricsEmitter implements Closeable {
private final Producer producer;
private final String topicName;
- public MetricsEmitter(String topicName, Map kafkaProducerConfigMap) {
+ public MetricsEmitter(final String topicName, final Map kafkaProducerConfigMap) {
this.producer = new KafkaProducer<>(kafkaProducerConfigMap);
this.topicName = topicName;
}
- public Future future(String key, Metric metric) {
- ProducerRecord record = new ProducerRecord<>(topicName, key, Bytes.wrap(metric.toByteArray()));
+ public Future future(final String key, final Metric metric) {
+ final ProducerRecord record =
+ new ProducerRecord<>(topicName, key, Bytes.wrap(metric.toByteArray()));
return producer.send(record, (metadata, exception) -> {
if (exception == null) {
@@ -36,9 +37,9 @@ public Future future(String key, Metric metric) {
});
}
- public void emit(String key, Metric metric) {
+ public RecordMetadata emit(final String key, final Metric metric) {
try {
- future(key, metric).get();
+ return future(key, metric).get();
} catch (InterruptedException | ExecutionException e) {
throw new CanaryException(e);
}
diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java
index 1e78d5f5f..fac5606bb 100644
--- a/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java
+++ b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java
@@ -13,7 +13,7 @@ public class Metronome implements Closeable, Runnable {
private final ScheduledExecutorService executor;
private final MetricsEmitter emitter;
- public Metronome(MetricsEmitter emitter) {
+ public Metronome(final MetricsEmitter emitter) {
this.emitter = emitter;
executor = Executors.newScheduledThreadPool(1);
}
diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java
index 4405a663f..508ba6663 100644
--- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java
+++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java
@@ -20,14 +20,16 @@ public class MetronomeBootstrap implements Bootstrap {
private final Metronome metronome;
public MetronomeBootstrap(
- String topicName, Map kafkaProducerConfigMap, Map littleHorseConfigMap) {
+ final String metricsTopicName,
+ final Map kafkaProducerConfigMap,
+ final Map littleHorseConfigMap) {
// Initialize kafka producer
- emitter = new MetricsEmitter(topicName, kafkaProducerConfigMap);
+ emitter = new MetricsEmitter(metricsTopicName, kafkaProducerConfigMap);
metronome = new Metronome(emitter);
metronome.start();
// Initialize task worker
- LHConfig lhConfig = new LHConfig(littleHorseConfigMap);
+ final LHConfig lhConfig = new LHConfig(littleHorseConfigMap);
try {
worker = new LHTaskWorker(new MetronomeTask(emitter), TASK_NAME, lhConfig);
worker.registerTaskDef();
@@ -37,7 +39,7 @@ public MetronomeBootstrap(
}
// Initialize workflow
- Workflow workflow = Workflow.newWorkflow(
+ final Workflow workflow = Workflow.newWorkflow(
"canary-workflow",
thread -> thread.execute(TASK_NAME, thread.addVariable(VARIABLE_NAME, VariableType.INT)));
try {
diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java
index 28d63ca54..1743de6d4 100644
--- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java
+++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java
@@ -16,32 +16,32 @@ class MetronomeTask {
private final MetricsEmitter emitter;
- public MetronomeTask(MetricsEmitter emitter) {
+ public MetronomeTask(final MetricsEmitter emitter) {
this.emitter = emitter;
}
@LHTaskMethod(MetronomeBootstrap.TASK_NAME)
- public void executeTask(long startTime, WorkerContext context) {
+ public void executeTask(final long startTime, final WorkerContext context) {
log.trace("Executing task {}", MetronomeBootstrap.TASK_NAME);
emitTaskRunLatencyMetric(startTime, context);
emitDuplicatedTaskRunMetric(context);
}
- private void emitTaskRunLatencyMetric(long startTime, WorkerContext context) {
- long latency =
+ private void emitTaskRunLatencyMetric(final long startTime, final WorkerContext context) {
+ final long latency =
Duration.between(Instant.ofEpochMilli(startTime), Instant.now()).toMillis();
log.debug("Latency {}ms", latency);
- Metric metric = Metric.newBuilder()
+ final Metric metric = Metric.newBuilder()
.setTime(Timestamps.fromMillis(System.currentTimeMillis()))
.setTaskRunLatency(TaskRunLatency.newBuilder().setLatency(latency))
.build();
emitter.future(context.getIdempotencyKey(), metric);
}
- private void emitDuplicatedTaskRunMetric(WorkerContext context) {
- String key = String.format("%s/%s", context.getIdempotencyKey(), context.getAttemptNumber());
+ private void emitDuplicatedTaskRunMetric(final WorkerContext context) {
+ final String key = String.format("%s/%s", context.getIdempotencyKey(), context.getAttemptNumber());
log.debug("Key {}", key);
- Metric metric = Metric.newBuilder()
+ final Metric metric = Metric.newBuilder()
.setTime(Timestamps.fromMillis(System.currentTimeMillis()))
.setDuplicatedTaskRun(DuplicatedTaskRun.newBuilder().setUniqueTaskScheduleId(key))
.build();
diff --git a/local-dev/do-canary.sh b/local-dev/do-canary.sh
index deca63ac0..394adb509 100755
--- a/local-dev/do-canary.sh
+++ b/local-dev/do-canary.sh
@@ -7,5 +7,5 @@ WORK_DIR=$(cd "$SCRIPT_DIR/.." && pwd)
cd "$WORK_DIR"
-./gradlew canary:installDist -x shadowJar -x test
+./gradlew canary:installDist
./canary/build/install/canary/bin/canary canary/canary.properties