From 1e998ff357a1962ddeed8340afdc1cc7d24649ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Pi=C3=B1a?= Date: Mon, 5 Feb 2024 09:24:58 -0500 Subject: [PATCH] feat: add lh canary first version (#637) Included: - Metronome (synthetic monitoring simulator) - Aggregator (kafka stream topology) for calculating metrics - TaskRunLatency metric - Configurations - do-canary.sh script - Log level improvements - Protobuf classes - Docker build Future: - Finish DuplicatedTaskRun metric - Kafka Stream tests - gRPC server - gRPC health check - Prometheus integration --------- Co-authored-by: Mijail Rondon Refers: #637 --- .dockerignore | 1 + .github/workflows/branch.yml | 45 +++++++-- .github/workflows/publish-docker.yml | 37 ++++++- .github/workflows/tests.yml | 13 +++ .pre-commit-config.yaml | 26 +---- canary/README.md | 19 ++++ canary/build.gradle | 96 +++++++++++++++++++ canary/canary.properties | 14 +++ canary/checkstyle.xml | 9 ++ .../java/io/littlehorse/canary/Bootstrap.java | 3 + .../littlehorse/canary/CanaryException.java | 8 ++ .../main/java/io/littlehorse/canary/Main.java | 58 +++++++++++ .../aggregator/AggregatorBootstrap.java | 40 ++++++++ .../internal/MetricTimeExtractor.java | 29 ++++++ .../ProtobufDeserializationException.java | 7 ++ ...otobufDeserializationExceptionHandler.java | 29 ++++++ .../serdes/MetricAverageDeserializer.java | 24 +++++ .../serdes/MetricAverageSerializer.java | 14 +++ .../aggregator/serdes/MetricDeserializer.java | 24 +++++ .../aggregator/serdes/MetricSerializer.java | 14 +++ .../aggregator/serdes/ProtobufSerdes.java | 18 ++++ .../topology/TaskRunLatencyTopology.java | 44 +++++++++ .../canary/config/CanaryConfig.java | 74 ++++++++++++++ .../io/littlehorse/canary/config/Config.java | 19 ++++ .../canary/config/ConfigLoader.java | 81 ++++++++++++++++ .../canary/config/KafkaAdminConfig.java | 30 ++++++ .../canary/config/KafkaProducerConfig.java | 29 ++++++ .../canary/config/KafkaStreamsConfig.java | 29 ++++++ .../canary/config/LittleHorseConfig.java | 37 +++++++ .../canary/kafka/KafkaTopicBootstrap.java | 41 ++++++++ .../canary/kafka/MetricsEmitter.java | 64 +++++++++++++ .../canary/metronome/Metronome.java | 83 ++++++++++++++++ .../canary/metronome/MetronomeBootstrap.java | 74 ++++++++++++++ .../canary/metronome/MetronomeTask.java | 67 +++++++++++++ .../canary/metronome/MetronomeWorkflow.java | 24 +++++ .../io/littlehorse/canary/util/Shutdown.java | 32 +++++++ canary/src/main/proto/canary.proto | 36 +++++++ .../META-INF/microprofile-config.properties | 42 ++++++++ canary/src/main/resources/log4j2.properties | 58 +++++++++++ canary/src/main/resources/logback.xml | 16 ++++ .../internal/MetricTimeExtractorTest.java | 58 +++++++++++ .../serdes/MetricAverageDeserializerTest.java | 37 +++++++ .../serdes/MetricAverageSerializerTest.java | 34 +++++++ .../serdes/MetricDeserializerTest.java | 37 +++++++ .../serdes/MetricSerializerTest.java | 34 +++++++ .../topology/TaskRunLatencyTopologyTest.java | 9 ++ .../canary/config/CanaryConfigTest.java | 33 +++++++ .../canary/config/ConfigLoaderTest.java | 72 ++++++++++++++ .../canary/config/KafkaAdminConfigTest.java | 37 +++++++ .../config/KafkaProducerConfigTest.java | 59 ++++++++++++ .../canary/config/KafkaStreamsConfigTest.java | 52 ++++++++++ .../canary/config/LittleHorseConfigTest.java | 65 +++++++++++++ docker/canary/Dockerfile | 7 ++ docker/canary/docker-entrypoint.sh | 10 ++ docker/canary/logback.xml | 16 ++++ docker/server/Dockerfile | 2 +- .../driver/TestDriverExternal.java | 7 +- .../driver/TestDriverStandalone.java | 7 +- .../main/java/io/littlehorse/tests/Test.java | 2 +- .../cases/lifecycle/ADTaskDefDeleted.java | 3 +- .../cases/lifecycle/AEImproperTaskNode.java | 3 +- .../io/littlehorse/examples/BasicExample.java | 2 +- .../examples/ChildThreadExample.java | 2 +- .../examples/ChildWorkflowExample.java | 2 +- .../examples/ConditionalsWhileExample.java | 2 +- .../examples/ConditionalsExample.java | 2 +- .../examples/ExceptionHandlerExample.java | 2 +- .../examples/ExternalEventExample.java | 2 +- .../io/littlehorse/examples/HundredTasks.java | 2 +- .../examples/InterruptHandlerExample.java | 2 +- .../io/littlehorse/examples/JsonExample.java | 2 +- .../littlehorse/examples/MutationExample.java | 2 +- .../examples/ParallelApprovalExample.java | 2 +- .../io/littlehorse/examples/RunWfExample.java | 2 +- .../io/littlehorse/examples/SagaExample.java | 2 +- .../examples/SpawnThreadForEachExample.java | 2 +- .../examples/UserTasksExample.java | 2 +- .../examples/VariablesExample.java | 2 +- .../examples/WorkerContextExample.java | 2 +- local-dev/README.md | 23 +---- local-dev/build.sh | 39 +------- local-dev/do-canary.sh | 11 +++ local-dev/do-server.sh | 19 +--- local-dev/setup.sh | 2 +- .../sdk/common/config/LHConfig.java | 43 +++++---- .../littlehorse/sdk/worker/LHTaskWorker.java | 13 +-- .../worker/internal/LHServerConnection.java | 7 +- .../internal/LHServerConnectionManager.java | 24 ++--- .../sdk/worker/LHTaskWorkerTest.java | 8 +- .../LHServerConnectionManagerTest.java | 14 +-- server/src/main/java/io/littlehorse/App.java | 3 +- .../server/KafkaStreamsServerImpl.java | 2 +- .../server/streams/BackendInternalComms.java | 4 +- .../test/java/e2e/TaskDefLifecycleTest.java | 3 +- settings.gradle | 49 +++++----- .../java/io/littlehorse/test/LHExtension.java | 11 +-- .../internal/ExternalTestBootstrapper.java | 11 +-- .../test/internal/TestContext.java | 3 +- 98 files changed, 2060 insertions(+), 257 deletions(-) create mode 100644 canary/README.md create mode 100644 canary/build.gradle create mode 100644 canary/canary.properties create mode 100644 canary/checkstyle.xml create mode 100644 canary/src/main/java/io/littlehorse/canary/Bootstrap.java create mode 100644 canary/src/main/java/io/littlehorse/canary/CanaryException.java create mode 100644 canary/src/main/java/io/littlehorse/canary/Main.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractor.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationException.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationExceptionHandler.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializer.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializer.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializer.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricSerializer.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerdes.java create mode 100644 canary/src/main/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopology.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/Config.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java create mode 100644 canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java create mode 100644 canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java create mode 100644 canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java create mode 100644 canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java create mode 100644 canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java create mode 100644 canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java create mode 100644 canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java create mode 100644 canary/src/main/java/io/littlehorse/canary/util/Shutdown.java create mode 100644 canary/src/main/proto/canary.proto create mode 100644 canary/src/main/resources/META-INF/microprofile-config.properties create mode 100644 canary/src/main/resources/log4j2.properties create mode 100644 canary/src/main/resources/logback.xml create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractorTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializerTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializerTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializerTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricSerializerTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopologyTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/KafkaAdminConfigTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/KafkaStreamsConfigTest.java create mode 100644 canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java create mode 100644 docker/canary/Dockerfile create mode 100755 docker/canary/docker-entrypoint.sh create mode 100644 docker/canary/logback.xml create mode 100755 local-dev/do-canary.sh diff --git a/.dockerignore b/.dockerignore index a86e25d4b..60ee29f30 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ # Ignore everything * +!canary/ !server/ !sdk-java/ !sdk-go/ diff --git a/.github/workflows/branch.yml b/.github/workflows/branch.yml index 9d9b22e1b..08e6d7d95 100644 --- a/.github/workflows/branch.yml +++ b/.github/workflows/branch.yml @@ -6,11 +6,14 @@ on: branches-ignore: - "master" paths: - - dashboard/** + - .github/workflows/branch.yml - docker/dashboard/** - - server/** - docker/server/** - - .github/workflows/branch.yml + - docker/standalone/** + - docker/canary/** + - server/** + - dashboard/** + - canary/** permissions: id-token: write contents: read @@ -28,14 +31,14 @@ jobs: java-version: 17 - name: Tests and Build - run: ./gradlew server:test server:shadowJar + run: ./gradlew server:build - uses: actions/upload-artifact@v4 with: name: server-jar path: server/build/libs/server-*-all.jar - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-server @@ -43,6 +46,34 @@ jobs: registry: ecr prefix: branch- + lh-canary: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Setup Java + uses: actions/setup-java@v3 + with: + distribution: "corretto" + java-version: 17 + + - name: Tests and Build + run: ./gradlew canary:build + + - uses: actions/upload-artifact@v4 + with: + name: canary-jar + path: canary/build/libs/canary-*-all.jar + + - name: Build and Publish + uses: ./.github/actions/publish-image + with: + image-name: lh-canary + dockerfile: docker/canary/Dockerfile + registry: ecr + prefix: branch- + lh-dashboard: runs-on: ubuntu-latest steps: @@ -66,7 +97,7 @@ jobs: pnpm install pnpm build - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-dashboard @@ -105,7 +136,7 @@ jobs: name: server-jar path: server/build/libs/ - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-standalone diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml index 1001886fe..e35fe3ee9 100644 --- a/.github/workflows/publish-docker.yml +++ b/.github/workflows/publish-docker.yml @@ -33,19 +33,46 @@ jobs: - name: Checkout uses: actions/checkout@v3 - - name: Dowload Server Jar artifact + - name: Download Server Jar artifact uses: actions/download-artifact@v4 with: name: server-jar path: server/build/libs/ - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-server dockerfile: docker/server/Dockerfile github-token: ${{ secrets.GITHUB_TOKEN }} + lh-canary: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Setup Java + uses: actions/setup-java@v3 + with: + distribution: "corretto" + java-version: 17 + + - name: Tests and Build + run: ./gradlew canary:build + + - uses: actions/upload-artifact@v4 + with: + name: canary-jar + path: canary/build/libs/canary-*-all.jar + + - name: Build and Publish + uses: ./.github/actions/publish-image + with: + image-name: lh-canary + dockerfile: docker/canary/Dockerfile + github-token: ${{ secrets.GITHUB_TOKEN }} + lhctl: runs-on: ubuntu-latest needs: @@ -54,7 +81,7 @@ jobs: - name: Checkout uses: actions/checkout@v3 - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lhctl @@ -84,7 +111,7 @@ jobs: pnpm install pnpm build - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-dashboard @@ -122,7 +149,7 @@ jobs: name: server-jar path: server/build/libs/ - - name: Build and publish + - name: Build and Publish uses: ./.github/actions/publish-image with: image-name: lh-standalone diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 04888a762..85f928a5b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -65,6 +65,19 @@ jobs: - name: Tests working-directory: ./sdk-python run: poetry run python -m unittest -v + tests-canary: + if: ${{ !contains(github.event.head_commit.message, '[skip main]') }} + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup Java + uses: actions/setup-java@v3 + with: + distribution: "corretto" + java-version: 17 + - name: Tests + run: ./gradlew canary:test canary:build tests-server: if: ${{ !contains(github.event.head_commit.message, '[skip main]') }} runs-on: ubuntu-latest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78d70c22f..4c947a56e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks -define: &exclude | +exclude: | (?x)( ^server/src/main/java/io/littlehorse/common/proto/| ^sdk-java/src/main/java/io/littlehorse/sdk/common/proto/| @@ -10,17 +10,15 @@ define: &exclude | ^dashboard/apps/web/littlehorse-public-api/| ^docs/images/ ) - +fail_fast: true repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: trailing-whitespace name: Trailing Whitespace - exclude: *exclude - id: end-of-file-fixer name: Check End of Files - exclude: *exclude - id: check-added-large-files name: Check For Any Large Added File - repo: https://github.com/shellcheck-py/shellcheck-py @@ -89,26 +87,12 @@ repos: # Tests - id: java-build - name: Tests Build + name: Tests Java Projects language: system always_run: true pass_filenames: false entry: ./gradlew build - - id: server-tests - name: Running Server Tests - language: system - always_run: true - pass_filenames: false - entry: ./gradlew server:test - - - id: java-tests - name: Running SDK Java Tests - language: system - always_run: true - pass_filenames: false - entry: ./gradlew sdk-java:test - - id: javadoc name: Running JavaDoc language: system @@ -130,8 +114,8 @@ repos: pass_filenames: false entry: poetry -C ./sdk-python run python -m unittest discover -s sdk-python/ - - id: js-linting - name: Verify Build & Linting Dashboard Code + - id: js-build + name: Verify Build language: system always_run: true pass_filenames: false diff --git a/canary/README.md b/canary/README.md new file mode 100644 index 000000000..19157d572 --- /dev/null +++ b/canary/README.md @@ -0,0 +1,19 @@ +# LittleHorse Canary App + +Start a local kafka cluster: + +``` +../local-dev/setup.sh +``` + +Start LH Server: + +``` +../local-dev/do-server.sh +``` + +Start LH Canary: + +``` +../local-dev/do-canary.sh +``` diff --git a/canary/build.gradle b/canary/build.gradle new file mode 100644 index 000000000..51776ae05 --- /dev/null +++ b/canary/build.gradle @@ -0,0 +1,96 @@ +plugins { + id 'java' + id 'application' + id 'checkstyle' + id 'com.diffplug.spotless' version '6.25.0' + id 'com.github.johnrengelman.shadow' version '8.1.1' + id 'io.freefair.lombok' version '8.4' + id 'com.google.protobuf' version '0.9.4' +} + +repositories { + mavenCentral() +} + +dependencies { + // LittleHorse + implementation project(':sdk-java') + + // Tools + implementation 'com.google.guava:guava:33.0.0-jre' + + // Configurations + implementation 'io.smallrye.config:smallrye-config:3.5.2' + + // Logging + implementation 'org.slf4j:slf4j-api:2.0.11' + // implementation 'ch.qos.logback:logback-classic:1.4.14' + implementation 'org.apache.logging.log4j:log4j-slf4j2-impl:2.22.1' + + // Kafka + implementation 'org.apache.kafka:kafka-streams:3.6.1' + + // Tests + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' + testImplementation 'org.assertj:assertj-core:3.11.1' + testImplementation 'org.junit-pioneer:junit-pioneer:2.2.0' + testImplementation 'org.mockito:mockito-core:5.10.0' + testImplementation 'org.mockito:mockito-junit-jupiter:5.10.0' + testImplementation 'net.datafaker:datafaker:2.1.0' + testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.1' +} + +test { + useJUnitPlatform() + testLogging { + events 'passed', 'skipped', 'failed' + exceptionFormat = 'full' + } + jvmArgs = ['--add-opens', 'java.base/java.lang=ALL-UNNAMED', '--add-opens', 'java.base/java.util=ALL-UNNAMED'] +} + +spotless { + java { + target('**/*.java') + palantirJavaFormat() + } +} + +application { + mainClass = 'io.littlehorse.canary.Main' +} + +shadowJar { + mergeServiceFiles() +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +compileJava { + options.encoding = 'UTF-8' + options.compilerArgs << '-parameters' +} + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:3.25.2' + } + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java:1.61.0' + } + } +} + +checkstyle { + configFile = file("${rootDir}/canary/checkstyle.xml") + checkstyleTest.enabled = false +} + +tasks.withType(Checkstyle).configureEach { + exclude('**io/littlehorse/canary/proto**') +} diff --git a/canary/canary.properties b/canary/canary.properties new file mode 100644 index 000000000..54f7dcb78 --- /dev/null +++ b/canary/canary.properties @@ -0,0 +1,14 @@ +# Canary settings +lh.canary.client.id=dev +lh.canary.topic.creation.enable=true +lh.canary.topic.creation.replicas=1 +lh.canary.topic.creation.partitions=1 + +# Aggregator settings +lh.canary.aggregator.enable=true + +# Metronome settings +lh.canary.metronome.enable=true +lh.canary.metronome.frequency.ms=1000 +lh.canary.metronome.threads=1 +lh.canary.metronome.runs=1 diff --git a/canary/checkstyle.xml b/canary/checkstyle.xml new file mode 100644 index 000000000..bd8444dc5 --- /dev/null +++ b/canary/checkstyle.xml @@ -0,0 +1,9 @@ + + + + + + + diff --git a/canary/src/main/java/io/littlehorse/canary/Bootstrap.java b/canary/src/main/java/io/littlehorse/canary/Bootstrap.java new file mode 100644 index 000000000..88bcf33cc --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/Bootstrap.java @@ -0,0 +1,3 @@ +package io.littlehorse.canary; + +public interface Bootstrap {} diff --git a/canary/src/main/java/io/littlehorse/canary/CanaryException.java b/canary/src/main/java/io/littlehorse/canary/CanaryException.java new file mode 100644 index 000000000..b8bf28b6d --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/CanaryException.java @@ -0,0 +1,8 @@ +package io.littlehorse.canary; + +public class CanaryException extends RuntimeException { + + public CanaryException(final Throwable cause) { + super(cause); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java new file mode 100644 index 000000000..d51d8ab17 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/Main.java @@ -0,0 +1,58 @@ +package io.littlehorse.canary; + +import io.littlehorse.canary.aggregator.AggregatorBootstrap; +import io.littlehorse.canary.config.CanaryConfig; +import io.littlehorse.canary.config.ConfigLoader; +import io.littlehorse.canary.kafka.KafkaTopicBootstrap; +import io.littlehorse.canary.metronome.MetronomeBootstrap; +import io.littlehorse.canary.util.Shutdown; +import java.io.IOException; +import java.nio.file.Paths; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Main { + + public static void main(final String[] args) throws IOException, InterruptedException { + final CanaryConfig config = args.length > 0 ? ConfigLoader.load(Paths.get(args[0])) : ConfigLoader.load(); + + log.info("Canary configurations: {}", config); + log.info("KafkaAdmin configurations: {}", config.toKafkaAdminConfig()); + log.info("KafkaProducer configurations: {}", config.toKafkaProducerConfig()); + log.info("KafkaStreams configurations: {}", config.toKafkaStreamsConfig()); + log.info("LittleHorse configurations: {}", config.toLittleHorseConfig()); + + try { + initializeBootstraps(config); + } catch (Exception e) { + log.error("Error starting application", e); + System.exit(-1); + } + + log.info("Canary started"); + Shutdown.block(); + } + + private static void initializeBootstraps(final CanaryConfig config) { + final KafkaTopicBootstrap kafkaTopicBootstrap = new KafkaTopicBootstrap( + config.getTopicName(), + config.getTopicPartitions(), + config.getTopicReplicas(), + config.toKafkaAdminConfig().toMap()); + + if (config.isMetronomeEnabled()) { + final MetronomeBootstrap metronomeBootstrap = new MetronomeBootstrap( + config.getTopicName(), + config.toKafkaProducerConfig().toMap(), + config.toLittleHorseConfig().toMap(), + config.getMetronomeFrequency(), + config.getMetronomeThreads(), + config.getMetronomeRuns()); + } + + if (config.isAggregatorEnabled()) { + final AggregatorBootstrap aggregatorBootstrap = new AggregatorBootstrap( + config.getTopicName(), config.toKafkaStreamsConfig().toMap()); + } + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java b/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java new file mode 100644 index 000000000..71d9979c5 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/AggregatorBootstrap.java @@ -0,0 +1,40 @@ +package io.littlehorse.canary.aggregator; + +import io.littlehorse.canary.Bootstrap; +import io.littlehorse.canary.aggregator.internal.MetricTimeExtractor; +import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes; +import io.littlehorse.canary.aggregator.topology.TaskRunLatencyTopology; +import io.littlehorse.canary.proto.Metric; +import io.littlehorse.canary.util.Shutdown; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; + +@Slf4j +public class AggregatorBootstrap implements Bootstrap { + + private static final Consumed SERDES = + Consumed.with(Serdes.String(), ProtobufSerdes.Metric()).withTimestampExtractor(new MetricTimeExtractor()); + + public AggregatorBootstrap(final String metricsTopicName, final Map kafkaStreamsConfigMap) { + final KafkaStreams kafkaStreams = + new KafkaStreams(buildTopology(metricsTopicName), new StreamsConfig(kafkaStreamsConfigMap)); + Shutdown.addShutdownHook(kafkaStreams); + kafkaStreams.start(); + + log.trace("Initialized"); + } + + private static Topology buildTopology(final String metricsTopicName) { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream metricStream = builder.stream(metricsTopicName, SERDES); + new TaskRunLatencyTopology(metricStream); + return builder.build(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractor.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractor.java new file mode 100644 index 000000000..5dfbd6393 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractor.java @@ -0,0 +1,29 @@ +package io.littlehorse.canary.aggregator.internal; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.proto.Metric; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +@Slf4j +public class MetricTimeExtractor implements TimestampExtractor { + @Override + public long extract(final ConsumerRecord record, final long partitionTime) { + if (!Metric.class.isInstance(record.value())) { + log.warn( + "It's not possible to extract timestamp for class {}, using default timestamp", + record.value().getClass()); + return partitionTime; + } + + final Metric metric = (Metric) record.value(); + + if (!metric.getMetadata().hasTime()) { + log.warn("Metadata is was not provided for record {}, using default timestamp", record.key()); + return partitionTime; + } + + return Timestamps.toMillis(metric.getMetadata().getTime()); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationException.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationException.java new file mode 100644 index 000000000..4533fd654 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationException.java @@ -0,0 +1,7 @@ +package io.littlehorse.canary.aggregator.internal; + +public class ProtobufDeserializationException extends RuntimeException { + public ProtobufDeserializationException(final Throwable cause) { + super(cause); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationExceptionHandler.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationExceptionHandler.java new file mode 100644 index 000000000..6454ff846 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/ProtobufDeserializationExceptionHandler.java @@ -0,0 +1,29 @@ +package io.littlehorse.canary.aggregator.internal; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.ProcessorContext; + +@Slf4j +public class ProtobufDeserializationExceptionHandler implements DeserializationExceptionHandler { + + private AtomicInteger errorsCounter; + + @Override + public DeserializationHandlerResponse handle( + final ProcessorContext context, final ConsumerRecord record, final Exception exception) { + + if (ProtobufDeserializationException.class.isInstance(exception)) { + return DeserializationHandlerResponse.CONTINUE; + } + + log.error("Unexpected error when deserializing", exception); + return DeserializationHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) {} +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializer.java b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializer.java new file mode 100644 index 000000000..868547fc1 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializer.java @@ -0,0 +1,24 @@ +package io.littlehorse.canary.aggregator.serdes; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.littlehorse.canary.aggregator.internal.ProtobufDeserializationException; +import io.littlehorse.canary.proto.MetricAverage; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; + +@Slf4j +public class MetricAverageDeserializer implements Deserializer { + + @Override + public MetricAverage deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + + try { + return MetricAverage.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new ProtobufDeserializationException(e); + } + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializer.java b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializer.java new file mode 100644 index 000000000..24bcb486e --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializer.java @@ -0,0 +1,14 @@ +package io.littlehorse.canary.aggregator.serdes; + +import io.littlehorse.canary.proto.MetricAverage; +import org.apache.kafka.common.serialization.Serializer; + +public class MetricAverageSerializer implements Serializer { + @Override + public byte[] serialize(final String topic, final MetricAverage data) { + if (data == null) { + return null; + } + return data.toByteArray(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializer.java b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializer.java new file mode 100644 index 000000000..16a2da4e8 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializer.java @@ -0,0 +1,24 @@ +package io.littlehorse.canary.aggregator.serdes; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.littlehorse.canary.aggregator.internal.ProtobufDeserializationException; +import io.littlehorse.canary.proto.Metric; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; + +@Slf4j +public class MetricDeserializer implements Deserializer { + + @Override + public Metric deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + + try { + return Metric.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new ProtobufDeserializationException(e); + } + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricSerializer.java b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricSerializer.java new file mode 100644 index 000000000..2841b2bec --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/MetricSerializer.java @@ -0,0 +1,14 @@ +package io.littlehorse.canary.aggregator.serdes; + +import io.littlehorse.canary.proto.Metric; +import org.apache.kafka.common.serialization.Serializer; + +public class MetricSerializer implements Serializer { + @Override + public byte[] serialize(final String topic, final Metric data) { + if (data == null) { + return null; + } + return data.toByteArray(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerdes.java b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerdes.java new file mode 100644 index 000000000..392911294 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/serdes/ProtobufSerdes.java @@ -0,0 +1,18 @@ +package io.littlehorse.canary.aggregator.serdes; + +import io.littlehorse.canary.proto.Metric; +import io.littlehorse.canary.proto.MetricAverage; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; + +public class ProtobufSerdes { + private ProtobufSerdes() {} + + public static Serde Metric() { + return Serdes.serdeFrom(new MetricSerializer(), new MetricDeserializer()); + } + + public static Serde MetricAverage() { + return Serdes.serdeFrom(new MetricAverageSerializer(), new MetricAverageDeserializer()); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopology.java b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopology.java new file mode 100644 index 000000000..a70ed2ce3 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopology.java @@ -0,0 +1,44 @@ +package io.littlehorse.canary.aggregator.topology; + +import io.littlehorse.canary.aggregator.serdes.ProtobufSerdes; +import io.littlehorse.canary.proto.Metric; +import io.littlehorse.canary.proto.MetricAverage; +import java.time.Duration; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.TimeWindows; + +@Slf4j +public class TaskRunLatencyTopology { + + public TaskRunLatencyTopology(final KStream metricStream) { + metricStream + .filter((key, value) -> value.hasTaskRunLatency()) + .groupByKey() + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(30))) + .aggregate( + () -> MetricAverage.newBuilder().build(), + (key, value, aggregate) -> aggregate(value, aggregate), + Materialized.with(Serdes.String(), ProtobufSerdes.MetricAverage())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) + .toStream() + .map((key, value) -> KeyValue.pair(key.key(), value)) + .peek((key, value) -> log.info( + "server={}, count={}, sum={}, avg={}", key, value.getCount(), value.getSum(), value.getAvg())); + } + + private static MetricAverage aggregate(final Metric value, final MetricAverage aggregate) { + final long count = aggregate.getCount() + 1; + final double sum = aggregate.getSum() + value.getTaskRunLatency().getLatency(); + final double avg = sum / count; + return MetricAverage.newBuilder() + .setCount(count) + .setSum(sum) + .setAvg(avg) + .build(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java new file mode 100644 index 000000000..c8c3dcf9d --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java @@ -0,0 +1,74 @@ +package io.littlehorse.canary.config; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +public class CanaryConfig implements Config { + + private final Map configs; + + public CanaryConfig(final Map configs) { + this.configs = configs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(LH_CANARY_PREFIX)) + .collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue)); + } + + @Override + public Map toMap() { + return configs; + } + + @Override + public String toString() { + return configs.toString(); + } + + public KafkaAdminConfig toKafkaAdminConfig() { + return new KafkaAdminConfig(configs); + } + + public LittleHorseConfig toLittleHorseConfig() { + return new LittleHorseConfig(configs); + } + + public KafkaProducerConfig toKafkaProducerConfig() { + return new KafkaProducerConfig(configs); + } + + public KafkaStreamsConfig toKafkaStreamsConfig() { + return new KafkaStreamsConfig(configs); + } + + public String getTopicName() { + return configs.get(LH_CANARY_TOPIC_NAME).toString(); + } + + public int getTopicPartitions() { + return Integer.parseInt(configs.get(LH_CANARY_TOPIC_CREATION_PARTITIONS).toString()); + } + + public short getTopicReplicas() { + return Short.parseShort(configs.get(LH_CANARY_TOPIC_CREATION_REPLICAS).toString()); + } + + public boolean isMetronomeEnabled() { + return Boolean.parseBoolean(configs.get(LH_CANARY_METRONOME_ENABLE).toString()); + } + + public boolean isAggregatorEnabled() { + return Boolean.parseBoolean(configs.get(LH_CANARY_AGGREGATOR_ENABLE).toString()); + } + + public long getMetronomeFrequency() { + return Long.parseLong(configs.get(LH_CANARY_METRONOME_FREQUENCY_MS).toString()); + } + + public int getMetronomeThreads() { + return Integer.parseInt(configs.get(LH_CANARY_METRONOME_THREADS).toString()); + } + + public int getMetronomeRuns() { + return Integer.parseInt(configs.get(LH_CANARY_METRONOME_RUNS).toString()); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/Config.java b/canary/src/main/java/io/littlehorse/canary/config/Config.java new file mode 100644 index 000000000..2e4b29104 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/Config.java @@ -0,0 +1,19 @@ +package io.littlehorse.canary.config; + +import java.util.Map; + +public interface Config { + String LH_CANARY_PREFIX = "lh.canary."; + String LH_CANARY_KAFKA_PREFIX = LH_CANARY_PREFIX + "kafka."; + + String LH_CANARY_TOPIC_NAME = LH_CANARY_PREFIX + "topic.name"; + String LH_CANARY_TOPIC_CREATION_PARTITIONS = LH_CANARY_PREFIX + "topic.creation.partitions"; + String LH_CANARY_TOPIC_CREATION_REPLICAS = LH_CANARY_PREFIX + "topic.creation.replicas"; + String LH_CANARY_METRONOME_ENABLE = LH_CANARY_PREFIX + "metronome.enable"; + String LH_CANARY_AGGREGATOR_ENABLE = LH_CANARY_PREFIX + "aggregator.enable"; + String LH_CANARY_METRONOME_FREQUENCY_MS = LH_CANARY_PREFIX + "metronome.frequency.ms"; + String LH_CANARY_METRONOME_THREADS = LH_CANARY_PREFIX + "metronome.threads"; + String LH_CANARY_METRONOME_RUNS = LH_CANARY_PREFIX + "metronome.runs"; + + Map toMap(); +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java new file mode 100644 index 000000000..25275a18b --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/ConfigLoader.java @@ -0,0 +1,81 @@ +package io.littlehorse.canary.config; + +import io.smallrye.config.PropertiesConfigSource; +import io.smallrye.config.SmallRyeConfig; +import io.smallrye.config.SmallRyeConfigBuilder; +import io.smallrye.config.common.utils.ConfigSourceUtil; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import org.eclipse.microprofile.config.Config; + +public class ConfigLoader { + + private ConfigLoader() {} + + /** + * Loads properties in this order: + * 1. MicroProfile Config configuration file META-INF/microprofile-config.properties in the classpath + * 2. Environment variables + * 3. System properties + * + * @return A CanaryConfig with all system configuration + */ + public static CanaryConfig load() { + final SmallRyeConfig config = new SmallRyeConfigBuilder() + .addDefaultInterceptors() + .addDefaultSources() + .build(); + return new CanaryConfig(toMap(config)); + } + + /** + * Loads properties in this order: + * 1. MicroProfile Config configuration file META-INF/microprofile-config.properties in the classpath + * 2. The given external properties file + * 3. Environment variables + * 4. System properties + * + * @param path Properties location + * @return A CanaryConfig with all system configuration + * @throws IOException If properties file was not found + */ + public static CanaryConfig load(final Path path) throws IOException { + final SmallRyeConfig config = new SmallRyeConfigBuilder() + .addDefaultInterceptors() + .addDefaultSources() + .withSources(new PropertiesConfigSource(path.toUri().toURL(), 200)) + .build(); + return new CanaryConfig(toMap(config)); + } + + /** + * Loads properties in this order: + * 1. MicroProfile Config configuration file META-INF/microprofile-config.properties in the classpath + * 2. The given Properties object + * 3. Environment variables + * 4. System properties + * + * @param properties Properties object + * @return A CanaryConfig with all system configuration + */ + public static CanaryConfig load(final Properties properties) { + final SmallRyeConfig config = new SmallRyeConfigBuilder() + .addDefaultInterceptors() + .addDefaultSources() + .withSources(new PropertiesConfigSource( + ConfigSourceUtil.propertiesToMap(properties), "PropertiesConfigSource[source=Properties]", 200)) + .build(); + return new CanaryConfig(toMap(config)); + } + + private static Map toMap(final Config config) { + final Map configs = new TreeMap<>(); + for (String key : config.getPropertyNames()) { + configs.put(key, config.getOptionalValue(key, String.class).orElse("")); + } + return configs; + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java new file mode 100644 index 000000000..d493182f4 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaAdminConfig.java @@ -0,0 +1,30 @@ +package io.littlehorse.canary.config; + +import static java.util.Map.entry; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClientConfig; + +public class KafkaAdminConfig implements Config { + private final Map configs; + + public KafkaAdminConfig(final Map configs) { + this.configs = configs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX)) + .map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue())) + .filter(entry -> AdminClientConfig.configNames().contains(entry.getKey())) + .collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue)); + } + + @Override + public Map toMap() { + return configs; + } + + @Override + public String toString() { + return configs.toString(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java new file mode 100644 index 000000000..b6729ba8a --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaProducerConfig.java @@ -0,0 +1,29 @@ +package io.littlehorse.canary.config; + +import static java.util.Map.entry; + +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.producer.ProducerConfig; + +public class KafkaProducerConfig implements Config { + private final Map configs; + + public KafkaProducerConfig(final Map configs) { + this.configs = configs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX)) + .map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue())) + .filter(entry -> ProducerConfig.configNames().contains(entry.getKey())) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Map toMap() { + return configs; + } + + @Override + public String toString() { + return configs.toString(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java b/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java new file mode 100644 index 000000000..6b440c476 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/KafkaStreamsConfig.java @@ -0,0 +1,29 @@ +package io.littlehorse.canary.config; + +import static java.util.Map.entry; + +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.streams.StreamsConfig; + +public class KafkaStreamsConfig implements Config { + private final Map configs; + + public KafkaStreamsConfig(final Map configs) { + this.configs = configs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(LH_CANARY_KAFKA_PREFIX)) + .map(entry -> entry(entry.getKey().substring(LH_CANARY_KAFKA_PREFIX.length()), entry.getValue())) + .filter(entry -> StreamsConfig.configDef().names().contains(entry.getKey())) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Map toMap() { + return configs; + } + + @Override + public String toString() { + return configs.toString(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java b/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java new file mode 100644 index 000000000..20817839e --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/config/LittleHorseConfig.java @@ -0,0 +1,37 @@ +package io.littlehorse.canary.config; + +import static java.util.Map.entry; + +import io.littlehorse.sdk.common.config.LHConfig; +import java.util.Map; +import java.util.stream.Collectors; + +public class LittleHorseConfig implements Config { + private final Map configs; + + public LittleHorseConfig(final Map configs) { + this.configs = configs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(LH_CANARY_PREFIX)) + .map(entry -> { + final String formattedKey = entry.getKey() + .substring(LH_CANARY_PREFIX.length()) + .toUpperCase() + .replace(".", "_") + .replace("-", "_"); + + return entry(formattedKey, entry.getValue()); + }) + .filter(entry -> LHConfig.configNames().contains(entry.getKey())) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Map toMap() { + return configs; + } + + @Override + public String toString() { + return configs.toString(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java b/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java new file mode 100644 index 000000000..c9c523e57 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/kafka/KafkaTopicBootstrap.java @@ -0,0 +1,41 @@ +package io.littlehorse.canary.kafka; + +import io.littlehorse.canary.Bootstrap; +import io.littlehorse.canary.CanaryException; +import io.littlehorse.canary.util.Shutdown; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; + +@Slf4j +public class KafkaTopicBootstrap implements Bootstrap { + + public KafkaTopicBootstrap( + final String metricsTopicName, + final int topicPartitions, + final short topicReplicas, + final Map kafkaAdminConfigMap) { + + final AdminClient adminClient = KafkaAdminClient.create(kafkaAdminConfigMap); + Shutdown.addShutdownHook(adminClient); + + try { + final NewTopic canaryTopic = new NewTopic(metricsTopicName, topicPartitions, topicReplicas); + + adminClient.createTopics(List.of(canaryTopic)).all().get(); + log.info("Topics {} created", metricsTopicName); + } catch (Exception e) { + if (e.getCause() instanceof TopicExistsException) { + log.warn(e.getMessage()); + } else { + throw new CanaryException(e); + } + } + + log.trace("Initialized"); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java b/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java new file mode 100644 index 000000000..baa4de144 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/kafka/MetricsEmitter.java @@ -0,0 +1,64 @@ +package io.littlehorse.canary.kafka; + +import io.littlehorse.canary.CanaryException; +import io.littlehorse.canary.proto.Metric; +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Bytes; + +@Slf4j +public class MetricsEmitter implements Closeable { + + private final Producer producer; + private final String topicName; + + public MetricsEmitter(final String topicName, final Map kafkaProducerConfigMap) { + this.producer = new KafkaProducer<>(kafkaProducerConfigMap); + this.topicName = topicName; + } + + /** + * Asynchronous method that produces a metric to kafka but does not block the current thread + * @param key + * @param metric + * @return Future + */ + public Future future(final String key, final Metric metric) { + final ProducerRecord record = + new ProducerRecord<>(topicName, key, Bytes.wrap(metric.toByteArray())); + + return producer.send(record, (metadata, exception) -> { + if (exception == null) { + log.debug("Emitting message {} {}", metric.getMetricCase(), key); + } else { + log.error("Emitting message {} {}", metric.getMetricCase(), key, exception); + } + }); + } + + /** + * Blocking method that produces a metric and waits until kafka acknowledges + * @param key + * @param metric + * @return RecordMetadata + */ + public RecordMetadata emit(final String key, final Metric metric) { + try { + return future(key, metric).get(); + } catch (InterruptedException | ExecutionException e) { + throw new CanaryException(e); + } + } + + @Override + public void close() { + producer.close(); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java new file mode 100644 index 000000000..936bd071b --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/Metronome.java @@ -0,0 +1,83 @@ +package io.littlehorse.canary.metronome; + +import static io.littlehorse.canary.metronome.MetronomeWorkflow.CANARY_WORKFLOW; +import static io.littlehorse.canary.metronome.MetronomeWorkflow.VARIABLE_NAME; +import static io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub; + +import io.littlehorse.canary.kafka.MetricsEmitter; +import io.littlehorse.sdk.common.proto.RunWfRequest; +import io.littlehorse.sdk.common.proto.VariableValue; +import java.io.Closeable; +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Metronome implements Closeable { + + private final MetricsEmitter emitter; + private final ScheduledExecutorService mainExecutor; + private final ExecutorService requestsExecutor; + private final int runs; + private final LittleHorseBlockingStub lhClient; + + public Metronome( + final MetricsEmitter emitter, + final LittleHorseBlockingStub lhClient, + final long frequency, + final int threads, + final int runs) { + this.emitter = emitter; + this.runs = runs; + this.lhClient = lhClient; + + mainExecutor = Executors.newScheduledThreadPool(1); + mainExecutor.scheduleAtFixedRate(this::scheduledRun, 0, frequency, TimeUnit.MILLISECONDS); + + requestsExecutor = Executors.newFixedThreadPool(threads); + } + + private void executeRun() { + final String wfId = UUID.randomUUID().toString().replace("-", ""); + log.trace("Executing run {}", wfId); + + lhClient.runWf(RunWfRequest.newBuilder() + .setWfSpecName(CANARY_WORKFLOW) + .setId(wfId) + .putVariables( + VARIABLE_NAME, + VariableValue.newBuilder() + .setInt(Instant.now().toEpochMilli()) + .build()) + .build()); + } + + private void scheduledRun() { + log.trace("Executing metronome"); + for (int i = 0; i < runs; i++) { + requestsExecutor.submit(this::executeRun); + } + } + + @Override + public void close() { + mainExecutor.shutdown(); + requestsExecutor.shutdown(); + + try { + mainExecutor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Error on terminating main executor {}", e.getMessage(), e); + } + + try { + requestsExecutor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Error on terminating requests executor {}", e.getMessage(), e); + } + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java new file mode 100644 index 000000000..d4664df98 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeBootstrap.java @@ -0,0 +1,74 @@ +package io.littlehorse.canary.metronome; + +import com.google.protobuf.Empty; +import io.littlehorse.canary.Bootstrap; +import io.littlehorse.canary.kafka.MetricsEmitter; +import io.littlehorse.canary.util.Shutdown; +import io.littlehorse.sdk.common.config.LHConfig; +import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub; +import io.littlehorse.sdk.common.proto.ServerVersionResponse; +import io.littlehorse.sdk.worker.LHTaskWorker; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetronomeBootstrap implements Bootstrap { + + public MetronomeBootstrap( + final String metricsTopicName, + final Map kafkaProducerConfigMap, + final Map littleHorseConfigMap, + final long frequency, + final int threads, + final int runs) { + + final LHConfig lhConfig = new LHConfig(littleHorseConfigMap); + final LittleHorseBlockingStub lhClient = lhConfig.getBlockingStub(); + + final MetricsEmitter emitter = new MetricsEmitter(metricsTopicName, kafkaProducerConfigMap); + Shutdown.addShutdownHook(emitter); + + initializeWorker(emitter, lhConfig); + initializeWorkflow(lhClient); + initializeMetronome(emitter, lhClient, frequency, threads, runs); + + log.trace("Initialized"); + } + + private static void initializeMetronome( + final MetricsEmitter emitter, + final LittleHorseBlockingStub lhClient, + final long frequency, + final int threads, + final int runs) { + final Metronome metronome = new Metronome(emitter, lhClient, frequency, threads, runs); + Shutdown.addShutdownHook(metronome); + } + + private static void initializeWorkflow(final LittleHorseBlockingStub lhClient) { + final MetronomeWorkflow workflow = new MetronomeWorkflow(lhClient); + workflow.register(); + } + + private static void initializeWorker(final MetricsEmitter emitter, final LHConfig lhConfig) { + final MetronomeTask executable = new MetronomeTask( + emitter, lhConfig.getApiBootstrapHost(), lhConfig.getApiBootstrapPort(), getServerVersion(lhConfig)); + final LHTaskWorker worker = new LHTaskWorker(executable, MetronomeWorkflow.TASK_NAME, lhConfig); + Shutdown.addShutdownHook(worker); + worker.registerTaskDef(); + worker.start(); + } + + private static String getServerVersion(final LHConfig lhConfig) { + final ServerVersionResponse serverVersionResponse = + lhConfig.getBlockingStub().getServerVersion(Empty.getDefaultInstance()); + return String.format( + "%s.%s.%s%s", + serverVersionResponse.getMajorVersion(), + serverVersionResponse.getMinorVersion(), + serverVersionResponse.getPatchVersion(), + serverVersionResponse.hasPreReleaseIdentifier() + ? "-" + serverVersionResponse.getPreReleaseIdentifier() + : ""); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java new file mode 100644 index 000000000..e0d9713fc --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeTask.java @@ -0,0 +1,67 @@ +package io.littlehorse.canary.metronome; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.kafka.MetricsEmitter; +import io.littlehorse.canary.proto.DuplicatedTaskRun; +import io.littlehorse.canary.proto.Metadata; +import io.littlehorse.canary.proto.Metric; +import io.littlehorse.canary.proto.TaskRunLatency; +import io.littlehorse.sdk.worker.LHTaskMethod; +import io.littlehorse.sdk.worker.WorkerContext; +import java.time.Duration; +import java.time.Instant; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class MetronomeTask { + + private final MetricsEmitter emitter; + private final String serverHost; + private final int serverPort; + private final String serverVersion; + + public MetronomeTask( + final MetricsEmitter emitter, final String serverHost, final int serverPort, final String serverVersion) { + this.emitter = emitter; + this.serverHost = serverHost; + this.serverPort = serverPort; + this.serverVersion = serverVersion; + } + + private Metric.Builder getMetricBuilder() { + return Metric.newBuilder() + .setMetadata(Metadata.newBuilder() + .setTime(Timestamps.fromMillis(System.currentTimeMillis())) + .setServerHost(serverHost) + .setServerPort(serverPort) + .setServerVersion(serverVersion)); + } + + @LHTaskMethod(MetronomeWorkflow.TASK_NAME) + public void executeTask(final long startTime, final WorkerContext context) { + log.trace("Executing task {}", MetronomeWorkflow.TASK_NAME); + emitTaskRunLatencyMetric(startTime, context); + emitDuplicatedTaskRunMetric(context); + } + + private void emitTaskRunLatencyMetric(final long startTime, final WorkerContext context) { + final Duration latency = Duration.between(Instant.ofEpochMilli(startTime), Instant.now()); + final String key = String.format("%s:%s", serverHost, serverPort); + + final Metric metric = getMetricBuilder() + .setTaskRunLatency(TaskRunLatency.newBuilder().setLatency(latency.toMillis())) + .build(); + + emitter.future(key, metric); + } + + private void emitDuplicatedTaskRunMetric(final WorkerContext context) { + final String key = String.format("%s/%s", context.getIdempotencyKey(), context.getAttemptNumber()); + + final Metric metric = getMetricBuilder() + .setDuplicatedTaskRun(DuplicatedTaskRun.newBuilder().setUniqueTaskScheduleId(key)) + .build(); + + emitter.emit(key, metric); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java new file mode 100644 index 000000000..67916a269 --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorkflow.java @@ -0,0 +1,24 @@ +package io.littlehorse.canary.metronome; + +import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub; +import io.littlehorse.sdk.common.proto.VariableType; +import io.littlehorse.sdk.wfsdk.Workflow; + +public class MetronomeWorkflow { + public static final String TASK_NAME = "canary-worker-task"; + public static final String VARIABLE_NAME = "start-time"; + public static final String CANARY_WORKFLOW = "canary-workflow"; + private final Workflow workflow; + private final LittleHorseBlockingStub lhClient; + + public MetronomeWorkflow(final LittleHorseBlockingStub lhClient) { + workflow = Workflow.newWorkflow( + CANARY_WORKFLOW, + thread -> thread.execute(TASK_NAME, thread.addVariable(VARIABLE_NAME, VariableType.INT))); + this.lhClient = lhClient; + } + + public void register() { + workflow.registerWfSpec(lhClient); + } +} diff --git a/canary/src/main/java/io/littlehorse/canary/util/Shutdown.java b/canary/src/main/java/io/littlehorse/canary/util/Shutdown.java new file mode 100644 index 000000000..2d380624d --- /dev/null +++ b/canary/src/main/java/io/littlehorse/canary/util/Shutdown.java @@ -0,0 +1,32 @@ +package io.littlehorse.canary.util; + +import java.util.concurrent.CountDownLatch; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Shutdown { + private Shutdown() {} + + public static void addShutdownHook(final AutoCloseable closeable) { + addShutdownHook(null, closeable); + } + + public static void addShutdownHook(final String name, final AutoCloseable closeable) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + final String nameToPresent = + name != null ? name : closeable.getClass().getSimpleName(); + try { + closeable.close(); + } catch (Exception e) { + log.error("Error in ShutdownHook '{}'", nameToPresent, e); + } + log.trace("{} shutdown process completed", nameToPresent); + })); + } + + public static void block() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + addShutdownHook("Main", latch::countDown); + latch.await(); + } +} diff --git a/canary/src/main/proto/canary.proto b/canary/src/main/proto/canary.proto new file mode 100644 index 000000000..76aa7aec1 --- /dev/null +++ b/canary/src/main/proto/canary.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; +package littlehorse; + +option java_multiple_files = true; +option java_package = "io.littlehorse.canary.proto"; + +import "google/protobuf/timestamp.proto"; + +message MetricAverage { + int64 count=1; + double sum=2; + double avg=3; +} + +message Metric { + Metadata metadata = 1; + oneof metric { + DuplicatedTaskRun duplicated_task_run = 2; + TaskRunLatency task_run_latency = 3; + } +} + +message Metadata { + google.protobuf.Timestamp time = 1; + string server_host = 2; + int32 server_port = 3; + string server_version = 4; +} + +message DuplicatedTaskRun { + string unique_task_schedule_id = 1; +} + +message TaskRunLatency { + int64 latency = 1; +} diff --git a/canary/src/main/resources/META-INF/microprofile-config.properties b/canary/src/main/resources/META-INF/microprofile-config.properties new file mode 100644 index 000000000..661d2c49e --- /dev/null +++ b/canary/src/main/resources/META-INF/microprofile-config.properties @@ -0,0 +1,42 @@ +# DEFAULT SETTINGS + +# Canary settings +lh.canary.client.id=default +lh.canary.api.host=localhost +lh.canary.api.port=3023 +lh.canary.metrics.host=localhost +lh.canary.metrics.port=9090 +lh.canary.topic.name=canary-metric-beats +lh.canary.topic.creation.enable=false +lh.canary.topic.creation.replicas=3 +lh.canary.topic.creation.partitions=12 + +# Aggregator settings +lh.canary.aggregator.enable=true + +# Metronome settings +lh.canary.metronome.enable=true +lh.canary.metronome.frequency.ms=1000 +lh.canary.metronome.threads=1 +lh.canary.metronome.runs=1 + +# LH settings +lh.canary.lhw.task-worker.id=${lh.canary.client.id} +lh.canary.lhc.api.host=localhost +lh.canary.lhc.api.port=2023 + +# Kafka settings +lh.canary.kafka.bootstrap.servers=localhost:9092 + +# Kafka producer settings +lh.canary.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer +lh.canary.kafka.value.serializer=org.apache.kafka.common.serialization.BytesSerializer +lh.canary.kafka.acks=all +lh.canary.kafka.client.id=${lh.canary.client.id} +lh.canary.kafka.enable.idempotence=true + +# Kafka streams settings +lh.canary.kafka.application.id=${lh.canary.client.id} +lh.canary.kafka.state.dir=/tmp/canaryState +lh.canary.kafka.application.server=${lh.canary.api.host}:${lh.canary.api.port} +lh.canary.kafka.default.deserialization.exception.handler=io.littlehorse.canary.aggregator.internal.ProtobufDeserializationExceptionHandler diff --git a/canary/src/main/resources/log4j2.properties b/canary/src/main/resources/log4j2.properties new file mode 100644 index 000000000..15ad69dc8 --- /dev/null +++ b/canary/src/main/resources/log4j2.properties @@ -0,0 +1,58 @@ +# Set to debug or trace if log4j initialization is failing +status = ERROR + +# Name of the configuration +name = LHCanary + +# Console root error appender configuration +appender.error.type = Console +appender.error.name = error +appender.error.layout.type = PatternLayout +appender.error.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} %c - %m%n + +# Console kafka appender configuration +appender.kafka.type = Console +appender.kafka.name = kafka +appender.kafka.layout.type = PatternLayout +appender.kafka.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [KAFKA] %c - %m%n + +# Console canary appender configuration +appender.canary.type = Console +appender.canary.name = canary +appender.canary.layout.type = PatternLayout +appender.canary.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [CANARY] %c - %m%n + +# Console lh appender configuration +appender.lh.type = Console +appender.lh.name = lh +appender.lh.layout.type = PatternLayout +appender.lh.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [LH] %c - %m%n + +# Console grpc appender configuration +appender.grpc.type = Console +appender.grpc.name = grpc +appender.grpc.layout.type = PatternLayout +appender.grpc.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [GRPC] %c - %m%n + +# Root logger level +rootLogger = ERROR, error + +# Kafka logger +logger.kafka = WARN, kafka +logger.kafka.name = org.apache.kafka +logger.kafka.additivity = false + +# canary logger +logger.canary = DEBUG, canary +logger.canary.name = io.littlehorse.canary +logger.canary.additivity = false + +# lh logger +logger.lh = INFO, lh +logger.lh.name = io.littlehorse.sdk +logger.lh.additivity = false + +# gRPC logger +logger.grpc = WARN, grpc +logger.grpc.name = io.grpc +logger.grpc.additivity = false diff --git a/canary/src/main/resources/logback.xml b/canary/src/main/resources/logback.xml new file mode 100644 index 000000000..c9b475b3f --- /dev/null +++ b/canary/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + %d{HH:mm:ss.SSS} %highlight(%-5level) %logger{36} - %msg%n + + + + + + + + + + + + diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractorTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractorTest.java new file mode 100644 index 000000000..ec6653f8c --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricTimeExtractorTest.java @@ -0,0 +1,58 @@ +package io.littlehorse.canary.aggregator.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.proto.Metadata; +import io.littlehorse.canary.proto.Metric; +import net.datafaker.Faker; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +class MetricTimeExtractorTest { + + Faker faker = new Faker(); + + private static ConsumerRecord newRecord(Object value) { + return new ConsumerRecord<>("my-topic", 0, 0, "my-key", value); + } + + @Test + void returnDefaultTimeIfMetadataIsNotPresent() { + MetricTimeExtractor extractor = new MetricTimeExtractor(); + + long expectedTime = faker.number().randomNumber(); + ConsumerRecord record = newRecord(Metric.newBuilder().build()); + long result = extractor.extract(record, expectedTime); + + assertThat(result).isEqualTo(expectedTime); + } + + @Test + void returnTheRightTimestamp() { + MetricTimeExtractor extractor = new MetricTimeExtractor(); + + long expectedTime = faker.number().randomNumber(); + long notExpectedTime = faker.number().randomNumber(); + Metric metric = Metric.newBuilder() + .setMetadata(Metadata.newBuilder() + .setTime(Timestamps.fromMillis(expectedTime)) + .build()) + .build(); + ConsumerRecord record = newRecord(metric); + long result = extractor.extract(record, notExpectedTime); + + assertThat(result).isEqualTo(expectedTime); + } + + @Test + void returnDefaultTimeIfWrongClassWassPassed() { + MetricTimeExtractor extractor = new MetricTimeExtractor(); + + long expectedTime = faker.number().randomNumber(); + ConsumerRecord record = newRecord("Another class"); + long result = extractor.extract(record, expectedTime); + + assertThat(result).isEqualTo(expectedTime); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializerTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializerTest.java new file mode 100644 index 000000000..8f751872d --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageDeserializerTest.java @@ -0,0 +1,37 @@ +package io.littlehorse.canary.aggregator.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.littlehorse.canary.proto.MetricAverage; +import net.datafaker.Faker; +import org.junit.jupiter.api.Test; + +class MetricAverageDeserializerTest { + + Faker faker = new Faker(); + + @Test + void returnNullIfReceivesNull() { + MetricAverageDeserializer deserializer = new MetricAverageDeserializer(); + + assertNull(deserializer.deserialize(null, null)); + } + + @Test + void deserialize() { + MetricAverageDeserializer deserializer = new MetricAverageDeserializer(); + + MetricAverage expected = MetricAverage.newBuilder() + .setAvg(faker.number().randomNumber()) + .setCount(faker.number().randomNumber()) + .setAvg(faker.number().randomNumber()) + .build(); + + byte[] input = expected.toByteArray(); + + MetricAverage actual = deserializer.deserialize(null, input); + + assertThat(expected).isEqualTo(actual); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializerTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializerTest.java new file mode 100644 index 000000000..c9f364b13 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricAverageSerializerTest.java @@ -0,0 +1,34 @@ +package io.littlehorse.canary.aggregator.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.littlehorse.canary.proto.MetricAverage; +import net.datafaker.Faker; +import org.junit.jupiter.api.Test; + +class MetricAverageSerializerTest { + + Faker faker = new Faker(); + + @Test + void returnNullInCaseOfNull() { + MetricAverageSerializer serializer = new MetricAverageSerializer(); + assertNull(serializer.serialize(null, null)); + } + + @Test + void rightSerialization() { + MetricAverageSerializer serializer = new MetricAverageSerializer(); + MetricAverage metric = MetricAverage.newBuilder() + .setAvg(faker.number().randomNumber()) + .setCount(faker.number().randomNumber()) + .setAvg(faker.number().randomNumber()) + .build(); + + byte[] expected = metric.toByteArray(); + byte[] actual = serializer.serialize(null, metric); + + assertThat(actual).isEqualTo(expected); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializerTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializerTest.java new file mode 100644 index 000000000..7aa6fd199 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricDeserializerTest.java @@ -0,0 +1,37 @@ +package io.littlehorse.canary.aggregator.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.proto.Metadata; +import io.littlehorse.canary.proto.Metric; +import org.junit.jupiter.api.Test; + +class MetricDeserializerTest { + + @Test + void returnNullIfReceivesNull() { + MetricDeserializer deserializer = new MetricDeserializer(); + + assertNull(deserializer.deserialize(null, null)); + } + + @Test + void deserialize() { + MetricDeserializer deserializer = new MetricDeserializer(); + + Metric expected = Metric.newBuilder() + .setMetadata(Metadata.newBuilder() + .setTime(Timestamps.now()) + .setServerVersion("my-version") + .setServerHost("my-server")) + .build(); + + byte[] input = expected.toByteArray(); + + Metric actual = deserializer.deserialize(null, input); + + assertThat(expected).isEqualTo(actual); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricSerializerTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricSerializerTest.java new file mode 100644 index 000000000..794c06251 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/serdes/MetricSerializerTest.java @@ -0,0 +1,34 @@ +package io.littlehorse.canary.aggregator.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.google.protobuf.util.Timestamps; +import io.littlehorse.canary.proto.Metadata; +import io.littlehorse.canary.proto.Metric; +import org.junit.jupiter.api.Test; + +class MetricSerializerTest { + + @Test + void returnNullInCaseOfNull() { + MetricSerializer serializer = new MetricSerializer(); + assertNull(serializer.serialize(null, null)); + } + + @Test + void rightSerialization() { + MetricSerializer serializer = new MetricSerializer(); + Metric metric = Metric.newBuilder() + .setMetadata(Metadata.newBuilder() + .setTime(Timestamps.now()) + .setServerVersion("my-version") + .setServerHost("my-server")) + .build(); + + byte[] expected = metric.toByteArray(); + byte[] actual = serializer.serialize(null, metric); + + assertThat(actual).isEqualTo(expected); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopologyTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopologyTest.java new file mode 100644 index 000000000..43bd0a7d5 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/TaskRunLatencyTopologyTest.java @@ -0,0 +1,9 @@ +package io.littlehorse.canary.aggregator.topology; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.kafka.streams.TopologyTestDriver; + +class TaskRunLatencyTopologyTest { + TopologyTestDriver testDriver; +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java new file mode 100644 index 000000000..98335ae87 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java @@ -0,0 +1,33 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class CanaryConfigTest { + + public static final String EXPECTED_KEY = "lh.canary.test"; + public static final String EXPECTED_VALUE = "test"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(EXPECTED_KEY, EXPECTED_VALUE); + CanaryConfig canaryConfig = new CanaryConfig(input); + + Map output = canaryConfig.toMap(); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMap() { + Map input = Map.of(EXPECTED_KEY, EXPECTED_VALUE, "not.a.valid.key", "to be filtered"); + CanaryConfig canaryConfig = new CanaryConfig(input); + + Map output = canaryConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_KEY, EXPECTED_VALUE)); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java b/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java new file mode 100644 index 000000000..9b8961999 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/ConfigLoaderTest.java @@ -0,0 +1,72 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.ClearEnvironmentVariable; +import org.junitpioneer.jupiter.SetEnvironmentVariable; + +class ConfigLoaderTest { + + public static final String EXPECTED_KEY = "lh.canary.test"; + public static final String ENV_VARIABLE_NAME = "LH_CANARY_TEST"; + public static final String EXPECTED_FILE_VALUE = "test from file"; + public static final String EXPECTED_ENV_VALUE = "test from env"; + public static final String EXPECTED_PROPERTIES_VALUE = "test from properties"; + + @Test + @ClearEnvironmentVariable(key = ENV_VARIABLE_NAME) + void loadFromFile() throws IOException { + Path configPath = createTemporaryProperties(); + + CanaryConfig canaryConfig = ConfigLoader.load(configPath); + + assertThat(canaryConfig.toMap()).contains(entry(EXPECTED_KEY, EXPECTED_FILE_VALUE)); + } + + @Test + @SetEnvironmentVariable(key = ENV_VARIABLE_NAME, value = EXPECTED_ENV_VALUE) + void overwriteFileConfigWithEnvConfigs() throws IOException { + Path configPath = createTemporaryProperties(); + + CanaryConfig canaryConfig = ConfigLoader.load(configPath); + + assertThat(canaryConfig.toMap()).contains(entry(EXPECTED_KEY, EXPECTED_ENV_VALUE)); + } + + @Test + @ClearEnvironmentVariable(key = ENV_VARIABLE_NAME) + void loadFromPropertiesObject() { + Properties properties = new Properties(); + properties.put(EXPECTED_KEY, EXPECTED_PROPERTIES_VALUE); + + CanaryConfig canaryConfig = ConfigLoader.load(properties); + + assertThat(canaryConfig.toMap()).contains(entry(EXPECTED_KEY, EXPECTED_PROPERTIES_VALUE)); + } + + @Test + @SetEnvironmentVariable(key = ENV_VARIABLE_NAME, value = EXPECTED_ENV_VALUE) + void overwritePropertiesWithEnvConfigs() { + Properties properties = new Properties(); + properties.put(EXPECTED_KEY, EXPECTED_PROPERTIES_VALUE); + + CanaryConfig canaryConfig = ConfigLoader.load(properties); + + assertThat(canaryConfig.toMap()).contains(entry(EXPECTED_KEY, EXPECTED_ENV_VALUE)); + } + + private static Path createTemporaryProperties() throws IOException { + Path tmpFile = Files.createTempFile("canaryUnitTests", "properties"); + Properties tmpProperties = new Properties(); + tmpProperties.put(EXPECTED_KEY, EXPECTED_FILE_VALUE); + tmpProperties.store(new FileWriter(tmpFile.toFile()), null); + return tmpFile; + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/KafkaAdminConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/KafkaAdminConfigTest.java new file mode 100644 index 000000000..26b077712 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/KafkaAdminConfigTest.java @@ -0,0 +1,37 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +class KafkaAdminConfigTest { + + public static final String PREFIX = "lh.canary.kafka."; + public static final String EXPECTED_KEY = "bootstrap.servers"; + public static final String EXPECTED_VALUE = "localhost:9092"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE); + KafkaAdminConfig kafkaAdminConfig = new KafkaAdminConfig(input); + + Map output = kafkaAdminConfig.toMap(); + log.info("Configs: {}", output); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMap() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE, "not.a.valid.key", "To be filtered"); + KafkaAdminConfig kafkaAdminConfig = new KafkaAdminConfig(input); + + Map output = kafkaAdminConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_KEY, EXPECTED_VALUE)); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java new file mode 100644 index 000000000..31423cead --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/KafkaProducerConfigTest.java @@ -0,0 +1,59 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +class KafkaProducerConfigTest { + + public static final String PREFIX = "lh.canary.kafka."; + public static final String EXPECTED_KEY = "bootstrap.servers"; + public static final String EXPECTED_VALUE = "localhost:9092"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE); + KafkaProducerConfig kafkaAdminConfig = new KafkaProducerConfig(input); + + Map output = kafkaAdminConfig.toMap(); + log.info("Configs: {}", output); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMap() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE, "not.a.valid.key", "To be filtered"); + KafkaProducerConfig kafkaAdminConfig = new KafkaProducerConfig(input); + + Map output = kafkaAdminConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_KEY, EXPECTED_VALUE)); + } + + @Test + void mustKeepProducerConfigs() { + Map input = Map.of( + "lh.canary.kafka.key.serializer", "org.apache.kafka.common.serialization.StringSerializer", + "lh.canary.kafka.value.serializer", "org.apache.kafka.common.serialization.BytesSerializer", + "lh.canary.kafka.acks", "all", + "lh.canary.kafka.client.id", "id", + "lh.canary.kafka.enable.idempotence", "true"); + Map expected = Map.of( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", + "value.serializer", "org.apache.kafka.common.serialization.BytesSerializer", + "acks", "all", + "client.id", "id", + "enable.idempotence", "true"); + KafkaProducerConfig kafkaAdminConfig = new KafkaProducerConfig(input); + + Map output = kafkaAdminConfig.toMap(); + log.info("Configs: {}", output); + + assertThat(output).isEqualTo(expected); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/KafkaStreamsConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/KafkaStreamsConfigTest.java new file mode 100644 index 000000000..de91b2056 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/KafkaStreamsConfigTest.java @@ -0,0 +1,52 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +class KafkaStreamsConfigTest { + public static final String PREFIX = "lh.canary.kafka."; + public static final String EXPECTED_KEY = "bootstrap.servers"; + public static final String EXPECTED_VALUE = "localhost:9092"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE); + KafkaStreamsConfig kafkaAdminConfig = new KafkaStreamsConfig(input); + + Map output = kafkaAdminConfig.toMap(); + log.info("Configs: {}", output); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMap() { + Map input = Map.of(PREFIX + EXPECTED_KEY, EXPECTED_VALUE, "not.a.valid.key", "To be filtered"); + KafkaStreamsConfig kafkaAdminConfig = new KafkaStreamsConfig(input); + + Map output = kafkaAdminConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_KEY, EXPECTED_VALUE)); + } + + @Test + void mustKeepProducerConfigs() { + Map input = Map.of( + "lh.canary.kafka.application.id", "id", + "lh.canary.kafka.state.dir", "/tmp/canaryState"); + Map expected = Map.of( + "application.id", "id", + "state.dir", "/tmp/canaryState"); + KafkaStreamsConfig kafkaAdminConfig = new KafkaStreamsConfig(input); + + Map output = kafkaAdminConfig.toMap(); + log.info("Configs: {}", output); + + assertThat(output).isEqualTo(expected); + } +} diff --git a/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java new file mode 100644 index 000000000..c32b67921 --- /dev/null +++ b/canary/src/test/java/io/littlehorse/canary/config/LittleHorseConfigTest.java @@ -0,0 +1,65 @@ +package io.littlehorse.canary.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class LittleHorseConfigTest { + + @Nested + class WorkerConfigs { + public static final String INPUT_LHW_KEY = "lh.canary.lhw.task-worker.id"; + public static final String EXPECTED_LHW_KEY = "LHW_TASK_WORKER_ID"; + public static final String EXPECTED_LHW_VALUE = "MY-ID-123"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(INPUT_LHW_KEY, EXPECTED_LHW_VALUE); + LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); + + Map output = littleHorseConfig.toMap(); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMapForLHW() { + Map input = Map.of(INPUT_LHW_KEY, EXPECTED_LHW_VALUE, "not.a.valid.key", "To be filtered"); + LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); + + Map output = littleHorseConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_LHW_KEY, EXPECTED_LHW_VALUE)); + } + } + + @Nested + class ClientConfigs { + public static final String INPUT_LHC_KEY = "lh.canary.lhc.api.host"; + public static final String EXPECTED_LHC_KEY = "LHC_API_HOST"; + public static final String EXPECTED_LHC_VALUE = "localhost"; + + @Test + void toMapMustCreateCopy() { + Map input = Map.of(INPUT_LHC_KEY, EXPECTED_LHC_VALUE); + LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); + + Map output = littleHorseConfig.toMap(); + + assertThat(output).isNotSameAs(input); + } + + @Test + void filterMapForLHC() { + Map input = Map.of(INPUT_LHC_KEY, EXPECTED_LHC_VALUE, "not.a.valid.key", "To be filtered"); + LittleHorseConfig littleHorseConfig = new LittleHorseConfig(input); + + Map output = littleHorseConfig.toMap(); + + assertThat(output).containsExactly(entry(EXPECTED_LHC_KEY, EXPECTED_LHC_VALUE)); + } + } +} diff --git a/docker/canary/Dockerfile b/docker/canary/Dockerfile new file mode 100644 index 000000000..3a0c9ada4 --- /dev/null +++ b/docker/canary/Dockerfile @@ -0,0 +1,7 @@ +FROM amazoncorretto:17 +WORKDIR /lh +COPY ./docker/canary/docker-entrypoint.sh /lh +COPY ./docker/canary/logback.xml /lh +COPY ./canary/build/libs/canary-*-all.jar /lh/canary.jar +ENTRYPOINT ["/lh/docker-entrypoint.sh"] +CMD ["canary"] diff --git a/docker/canary/docker-entrypoint.sh b/docker/canary/docker-entrypoint.sh new file mode 100755 index 000000000..e87c45625 --- /dev/null +++ b/docker/canary/docker-entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +if [ "$1" = 'canary' ]; then + shift + exec java -Dlogback.configurationFile=/lh/logback.xml -jar /lh/canary.jar "$@" +fi + +exec "$@" diff --git a/docker/canary/logback.xml b/docker/canary/logback.xml new file mode 100644 index 000000000..da12eb419 --- /dev/null +++ b/docker/canary/logback.xml @@ -0,0 +1,16 @@ + + + + %d{HH:mm:ss.SSS} %highlight(%-5level) %logger{36} - %msg%n + + + + + + + + + + + + diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 57e354320..fb00e45bc 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -1,5 +1,5 @@ FROM amazoncorretto:17 -RUN mkdir /lh +WORKDIR /lh COPY ./docker/server/docker-entrypoint.sh /lh COPY ./docker/server/log4j2.properties /lh COPY ./server/build/libs/server-*-all.jar /lh/server.jar diff --git a/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverExternal.java b/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverExternal.java index dad161f7b..8d5793d7f 100644 --- a/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverExternal.java +++ b/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverExternal.java @@ -1,7 +1,6 @@ package io.littlehorse.driver; import io.littlehorse.sdk.common.config.LHConfig; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Set; @@ -21,10 +20,6 @@ public void setup() { if (Files.exists(configPath)) { workerConfig = new LHConfig(configPath.toString()); } - try { - client = workerConfig.getBlockingStub(); - } catch (IOException exn) { - throw new RuntimeException(exn); - } + client = workerConfig.getBlockingStub(); } } diff --git a/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverStandalone.java b/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverStandalone.java index e70f11e10..20655f5b2 100644 --- a/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverStandalone.java +++ b/e2e-tests/src/main/java/io/littlehorse/driver/TestDriverStandalone.java @@ -32,12 +32,7 @@ public void setup() throws Exception { kafka.start(); startServer(); workerConfig = new LHConfig(); - - try { - client = workerConfig.getBlockingStub(); - } catch (IOException exn) { - throw new RuntimeException(exn); - } + client = workerConfig.getBlockingStub(); } private void startServer() throws Exception { diff --git a/e2e-tests/src/main/java/io/littlehorse/tests/Test.java b/e2e-tests/src/main/java/io/littlehorse/tests/Test.java index c36b766d3..c29ba7834 100644 --- a/e2e-tests/src/main/java/io/littlehorse/tests/Test.java +++ b/e2e-tests/src/main/java/io/littlehorse/tests/Test.java @@ -218,7 +218,7 @@ public TaskRun getTaskRun(LittleHorseBlockingStub client, TaskRunId taskRunId) t } public NodeRun getNodeRun(LittleHorseBlockingStub client, String wfRunId, int threadRunNumber, int nodeRunPosition) - throws TestFailure, IOException { + throws TestFailure { NodeRun result; try { result = client.getNodeRun(NodeRunId.newBuilder() diff --git a/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/ADTaskDefDeleted.java b/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/ADTaskDefDeleted.java index a53803646..538f6cafd 100644 --- a/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/ADTaskDefDeleted.java +++ b/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/ADTaskDefDeleted.java @@ -15,7 +15,6 @@ import io.littlehorse.sdk.worker.LHTaskMethod; import io.littlehorse.sdk.worker.LHTaskWorker; import io.littlehorse.tests.Test; -import java.io.IOException; /* * This test involves deploying a WfSpec, then deleting a TaskDef, then @@ -48,7 +47,7 @@ public String getDescription() { """; } - public void test() throws InterruptedException, IOException { + public void test() throws InterruptedException { worker1 = new LHTaskWorker(new TaskWfSpecLifecycleWorker(), TASK_DEF_1, workerConfig); worker1.registerTaskDef(); worker2 = new LHTaskWorker(new TaskWfSpecLifecycleWorker(), TASK_DEF_2, workerConfig); diff --git a/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/AEImproperTaskNode.java b/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/AEImproperTaskNode.java index a8edbd3ca..c7108a0c5 100644 --- a/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/AEImproperTaskNode.java +++ b/e2e-tests/src/main/java/io/littlehorse/tests/cases/lifecycle/AEImproperTaskNode.java @@ -26,7 +26,6 @@ import io.littlehorse.sdk.worker.LHTaskMethod; import io.littlehorse.sdk.worker.LHTaskWorker; import io.littlehorse.tests.Test; -import java.io.IOException; import java.util.Map; public class AEImproperTaskNode extends Test { @@ -52,7 +51,7 @@ public String getDescription() { """; } - public void test() throws InterruptedException, IOException { + public void test() throws InterruptedException { worker = new LHTaskWorker(new AETaskNodeValidationWorker(), TASK_DEF_NAME, workerConfig); worker.registerTaskDef(); diff --git a/examples/basic/src/main/java/io/littlehorse/examples/BasicExample.java b/examples/basic/src/main/java/io/littlehorse/examples/BasicExample.java index 9750d8c40..9eeb06552 100644 --- a/examples/basic/src/main/java/io/littlehorse/examples/BasicExample.java +++ b/examples/basic/src/main/java/io/littlehorse/examples/BasicExample.java @@ -44,7 +44,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { MyWorker executable = new MyWorker(); LHTaskWorker worker = new LHTaskWorker(executable, "greet", config); diff --git a/examples/child-thread/src/main/java/io/littlehorse/examples/ChildThreadExample.java b/examples/child-thread/src/main/java/io/littlehorse/examples/ChildThreadExample.java index 3006b534b..7634c4e5f 100644 --- a/examples/child-thread/src/main/java/io/littlehorse/examples/ChildThreadExample.java +++ b/examples/child-thread/src/main/java/io/littlehorse/examples/ChildThreadExample.java @@ -72,7 +72,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { ChildThreadWorker executable = new ChildThreadWorker(); List workers = List.of( new LHTaskWorker(executable, "parent-task-1", config), diff --git a/examples/child-workflow/src/main/java/io/littlehorse/examples/ChildWorkflowExample.java b/examples/child-workflow/src/main/java/io/littlehorse/examples/ChildWorkflowExample.java index 378747949..814cf274c 100644 --- a/examples/child-workflow/src/main/java/io/littlehorse/examples/ChildWorkflowExample.java +++ b/examples/child-workflow/src/main/java/io/littlehorse/examples/ChildWorkflowExample.java @@ -63,7 +63,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { MyWorker executable = new MyWorker(); LHTaskWorker worker = new LHTaskWorker(executable, "greet", config); diff --git a/examples/conditionals-while/src/main/java/io/littlehorse/examples/ConditionalsWhileExample.java b/examples/conditionals-while/src/main/java/io/littlehorse/examples/ConditionalsWhileExample.java index 81238fbb6..4df5ccce7 100644 --- a/examples/conditionals-while/src/main/java/io/littlehorse/examples/ConditionalsWhileExample.java +++ b/examples/conditionals-while/src/main/java/io/littlehorse/examples/ConditionalsWhileExample.java @@ -55,7 +55,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { ConditionalWhileTaskWorker executable = new ConditionalWhileTaskWorker(); LHTaskWorker worker = new LHTaskWorker(executable, "eating-donut", config); diff --git a/examples/conditionals/src/main/java/io/littlehorse/examples/ConditionalsExample.java b/examples/conditionals/src/main/java/io/littlehorse/examples/ConditionalsExample.java index 773cc8d1b..18bf66dbb 100644 --- a/examples/conditionals/src/main/java/io/littlehorse/examples/ConditionalsExample.java +++ b/examples/conditionals/src/main/java/io/littlehorse/examples/ConditionalsExample.java @@ -68,7 +68,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { ConditionalsTaskWorker executable = new ConditionalsTaskWorker(); List workers = List.of( new LHTaskWorker(executable, "task-a", config), diff --git a/examples/exception-handler/src/main/java/io/littlehorse/examples/ExceptionHandlerExample.java b/examples/exception-handler/src/main/java/io/littlehorse/examples/ExceptionHandlerExample.java index 719b80492..93f4a56e6 100644 --- a/examples/exception-handler/src/main/java/io/littlehorse/examples/ExceptionHandlerExample.java +++ b/examples/exception-handler/src/main/java/io/littlehorse/examples/ExceptionHandlerExample.java @@ -58,7 +58,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { ExceptionHandlerWorker executable = new ExceptionHandlerWorker(); List workers = List.of( new LHTaskWorker(executable, "fail", config), diff --git a/examples/external-event/src/main/java/io/littlehorse/examples/ExternalEventExample.java b/examples/external-event/src/main/java/io/littlehorse/examples/ExternalEventExample.java index d1e0fdd31..d68ca9fe6 100644 --- a/examples/external-event/src/main/java/io/littlehorse/examples/ExternalEventExample.java +++ b/examples/external-event/src/main/java/io/littlehorse/examples/ExternalEventExample.java @@ -62,7 +62,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { WaitForExternalEventWorker executable = new WaitForExternalEventWorker(); List workers = List.of( new LHTaskWorker(executable, "ask-for-name", config), diff --git a/examples/hundred-tasks/src/main/java/io/littlehorse/examples/HundredTasks.java b/examples/hundred-tasks/src/main/java/io/littlehorse/examples/HundredTasks.java index ce9c5076f..fafa40499 100644 --- a/examples/hundred-tasks/src/main/java/io/littlehorse/examples/HundredTasks.java +++ b/examples/hundred-tasks/src/main/java/io/littlehorse/examples/HundredTasks.java @@ -47,7 +47,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { MyWorker executable = new MyWorker(); List out = new ArrayList<>(); diff --git a/examples/interrupt-handler/src/main/java/io/littlehorse/examples/InterruptHandlerExample.java b/examples/interrupt-handler/src/main/java/io/littlehorse/examples/InterruptHandlerExample.java index 824e839f3..023db305e 100644 --- a/examples/interrupt-handler/src/main/java/io/littlehorse/examples/InterruptHandlerExample.java +++ b/examples/interrupt-handler/src/main/java/io/littlehorse/examples/InterruptHandlerExample.java @@ -58,7 +58,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { InterruptHandlerWorker executable = new InterruptHandlerWorker(); List workers = List.of( new LHTaskWorker(executable, "my-task", config), diff --git a/examples/json/src/main/java/io/littlehorse/examples/JsonExample.java b/examples/json/src/main/java/io/littlehorse/examples/JsonExample.java index 730c4d7a7..fc3246cb8 100644 --- a/examples/json/src/main/java/io/littlehorse/examples/JsonExample.java +++ b/examples/json/src/main/java/io/littlehorse/examples/JsonExample.java @@ -56,7 +56,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { CarTaskWorker executable = new CarTaskWorker(); List workers = List.of( new LHTaskWorker(executable, "greet", config), diff --git a/examples/mutation/src/main/java/io/littlehorse/examples/MutationExample.java b/examples/mutation/src/main/java/io/littlehorse/examples/MutationExample.java index 777dc31d1..e099bc91e 100644 --- a/examples/mutation/src/main/java/io/littlehorse/examples/MutationExample.java +++ b/examples/mutation/src/main/java/io/littlehorse/examples/MutationExample.java @@ -47,7 +47,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { SpiderManMutator executable = new SpiderManMutator(); LHTaskWorker worker = new LHTaskWorker(executable, "spider-bite", config); diff --git a/examples/parallel-approval/src/main/java/io/littlehorse/examples/ParallelApprovalExample.java b/examples/parallel-approval/src/main/java/io/littlehorse/examples/ParallelApprovalExample.java index 66526b1ad..cd1ba1c83 100644 --- a/examples/parallel-approval/src/main/java/io/littlehorse/examples/ParallelApprovalExample.java +++ b/examples/parallel-approval/src/main/java/io/littlehorse/examples/ParallelApprovalExample.java @@ -193,7 +193,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { Notifier executable = new Notifier(); List workers = List.of( new LHTaskWorker(executable, "calculate-next-notification", config), diff --git a/examples/run-wf/src/main/java/io/littlehorse/examples/RunWfExample.java b/examples/run-wf/src/main/java/io/littlehorse/examples/RunWfExample.java index f8acf47f3..77f9ec3a0 100644 --- a/examples/run-wf/src/main/java/io/littlehorse/examples/RunWfExample.java +++ b/examples/run-wf/src/main/java/io/littlehorse/examples/RunWfExample.java @@ -57,7 +57,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { MyWorker executable = new MyWorker(); LHTaskWorker worker = new LHTaskWorker( executable, diff --git a/examples/saga/src/main/java/io/littlehorse/examples/SagaExample.java b/examples/saga/src/main/java/io/littlehorse/examples/SagaExample.java index 8486c6bb3..ad3287999 100644 --- a/examples/saga/src/main/java/io/littlehorse/examples/SagaExample.java +++ b/examples/saga/src/main/java/io/littlehorse/examples/SagaExample.java @@ -102,7 +102,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorkers(LHConfig config) throws IOException { + public static List getTaskWorkers(LHConfig config) { ReservationBooker executable = new ReservationBooker(); List workers = List.of( new LHTaskWorker(executable, "book-flight", config), diff --git a/examples/spawn-thread-foreach/src/main/java/io/littlehorse/examples/SpawnThreadForEachExample.java b/examples/spawn-thread-foreach/src/main/java/io/littlehorse/examples/SpawnThreadForEachExample.java index 76b3b2eba..202cde23d 100644 --- a/examples/spawn-thread-foreach/src/main/java/io/littlehorse/examples/SpawnThreadForEachExample.java +++ b/examples/spawn-thread-foreach/src/main/java/io/littlehorse/examples/SpawnThreadForEachExample.java @@ -57,7 +57,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config){ SpawnThreadForEachWorker executable = new SpawnThreadForEachWorker(); LHTaskWorker worker = new LHTaskWorker(executable, "task-executor", config); diff --git a/examples/user-tasks/src/main/java/io/littlehorse/examples/UserTasksExample.java b/examples/user-tasks/src/main/java/io/littlehorse/examples/UserTasksExample.java index 3926a08f9..88949bd5c 100644 --- a/examples/user-tasks/src/main/java/io/littlehorse/examples/UserTasksExample.java +++ b/examples/user-tasks/src/main/java/io/littlehorse/examples/UserTasksExample.java @@ -133,7 +133,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public LHTaskWorker getTaskWorker(LHConfig config) { EmailSender executable = new EmailSender(); LHTaskWorker worker = new LHTaskWorker(executable, "send-email", config); diff --git a/examples/variables/src/main/java/io/littlehorse/examples/VariablesExample.java b/examples/variables/src/main/java/io/littlehorse/examples/VariablesExample.java index 388998788..488dfd358 100644 --- a/examples/variables/src/main/java/io/littlehorse/examples/VariablesExample.java +++ b/examples/variables/src/main/java/io/littlehorse/examples/VariablesExample.java @@ -87,7 +87,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static List getTaskWorker(LHConfig config) throws IOException { + public static List getTaskWorker(LHConfig config) { MyWorker executable = new MyWorker(); List workers = List.of( new LHTaskWorker(executable, "sentiment-analysis", config), diff --git a/examples/worker-context/src/main/java/io/littlehorse/examples/WorkerContextExample.java b/examples/worker-context/src/main/java/io/littlehorse/examples/WorkerContextExample.java index 280778519..b955a4ebc 100644 --- a/examples/worker-context/src/main/java/io/littlehorse/examples/WorkerContextExample.java +++ b/examples/worker-context/src/main/java/io/littlehorse/examples/WorkerContextExample.java @@ -45,7 +45,7 @@ public static Properties getConfigProps() throws IOException { return props; } - public static LHTaskWorker getTaskWorker(LHConfig config) throws IOException { + public static LHTaskWorker getTaskWorker(LHConfig config) { MyWorker executable = new MyWorker(); LHTaskWorker worker = new LHTaskWorker(executable, "task", config); diff --git a/local-dev/README.md b/local-dev/README.md index 9c03ab409..86ea62557 100644 --- a/local-dev/README.md +++ b/local-dev/README.md @@ -79,31 +79,10 @@ Upgrade to a new version: ## Building the Docker Image -You can build the `littlehorse-server` and `littlehorse-standalone` docker images by running: - -``` -./local-dev/build.sh -``` - To build the `littlehorse-server` image for local development utilizing the local gradle cache, you can run: ``` -./local-dev/build.sh --quick -``` - -Run server with docker (default config `local-dev/server-1.config`): - -``` -./local-dev/do-server.sh --docker -``` - -Run server with docker and specific config: - -``` -./local-dev/do-server.sh --docker - -# Example -./local-dev/do-server.sh --docker server-2 +./local-dev/build.sh ``` ## Compile Schemas diff --git a/local-dev/build.sh b/local-dev/build.sh index c5d501cb0..ba92b9b88 100755 --- a/local-dev/build.sh +++ b/local-dev/build.sh @@ -3,43 +3,10 @@ set -e SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) -LH_SERVER_WORK_DIR=$(cd "$SCRIPT_DIR/../docker/server" && pwd) -LH_DASHBOARD_WORK_DIR=$(cd "$SCRIPT_DIR/../docker/dashboard" && pwd) -LH_STANDALONE_WORK_DIR=$(cd "$SCRIPT_DIR/../docker/standalone" && pwd) CONTEXT_DIR=$(cd "$SCRIPT_DIR/.." && pwd) cd "${CONTEXT_DIR}" -if [ "$1" == "--quick" ]; then - # The quick build compiles the jar using gradle on the host machine, which - # enables usage of the gradle cache. This is much faster than building from - # scratch in a fresh container, and is suitable for local development. - echo "Building server image using host machine's gradle cache" - ./gradlew server:shadowJar -x test - - docker build --tag littlehorse/lh-server:latest -f- . < configNames = Collections.unmodifiableSet(Set.of( + private static final Set configNames = Set.of( LHConfig.API_HOST_KEY, LHConfig.API_PORT_KEY, LHConfig.API_PROTOCOL_KEY, @@ -86,7 +86,7 @@ public class LHConfig extends ConfigBase { LHConfig.OAUTH_CLIENT_SECRET_KEY, LHConfig.NUM_WORKER_THREADS_KEY, LHConfig.SERVER_CONNECT_LISTENER_KEY, - LHConfig.TASK_WORKER_VERSION_KEY)); + LHConfig.TASK_WORKER_VERSION_KEY); /** * Returns a set of all config names. @@ -144,9 +144,8 @@ public LHConfig(Map configs) { * generally the loadbalancer url. * * @return a blocking gRPC stub for the configured host/port. - * @throws IOException if stub creation fails. */ - public LittleHorseBlockingStub getBlockingStub() throws IOException { + public LittleHorseBlockingStub getBlockingStub() { return getBlockingStub(getApiBootstrapHost(), getApiBootstrapPort()); } @@ -155,9 +154,8 @@ public LittleHorseBlockingStub getBlockingStub() throws IOException { * the loadbalancer url. * * @return an async gRPC stub for the configured host/port. - * @throws IOException if stub creation fails. */ - public LittleHorseStub getAsyncStub() throws IOException { + public LittleHorseStub getAsyncStub() { return getAsyncStub(getApiBootstrapHost(), getApiBootstrapPort()); } @@ -167,7 +165,7 @@ public LittleHorseStub getAsyncStub() throws IOException { * @param taskDefName is the TaskDef's name. * @return the specified TaskDefPb. */ - public TaskDef getTaskDef(String taskDefName) throws IOException { + public TaskDef getTaskDef(String taskDefName) { return getBlockingStub() .getTaskDef(TaskDefId.newBuilder().setName(taskDefName).build()); } @@ -198,10 +196,8 @@ public String getConnectListener() { * @param host is the host that the LH Server lives on. * @param port is the port that the LH Server lives on. * @return an async gRPC stub for that host/port combo. - * @throws IOException if stub creation fails. */ - public LittleHorseStub getAsyncStub(String host, int port) throws IOException { - + public LittleHorseStub getAsyncStub(String host, int port) { if (isOauth()) { return getBaseAsyncStub(host, port).withCallCredentials(oauthCredentialsProvider); } @@ -216,9 +212,8 @@ public LittleHorseStub getAsyncStub(String host, int port) throws IOException { * @param host is the host that the LH Server lives on. * @param port is the port that the LH Server lives on. * @return a blocking gRPC stub for that host/port combo. - * @throws IOException if stub creation fails. */ - public LittleHorseBlockingStub getBlockingStub(String host, int port) throws IOException { + public LittleHorseBlockingStub getBlockingStub(String host, int port) { if (isOauth()) { return getBaseBlockingStub(host, port).withCallCredentials(oauthCredentialsProvider); } @@ -231,9 +226,8 @@ public LittleHorseBlockingStub getBlockingStub(String host, int port) throws IOE * @param host The host to connect to. * @param port the port to connect to. * @return a gRPC channel for that specified host/port combo. - * @throws IOException if we can't connect. */ - private Channel getChannel(String host, int port) throws IOException { + private Channel getChannel(String host, int port) { String hostKey = host + ":" + port; if (createdChannels.containsKey(hostKey)) { return createdChannels.get(hostKey); @@ -253,12 +247,20 @@ private Channel getChannel(String host, int port) throws IOException { TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder(); if (caCertFile != null) { - tlsBuilder.trustManager(new File(caCertFile)); + try { + tlsBuilder.trustManager(new File(caCertFile)); + } catch (IOException e) { + throw new LHMisconfigurationException("Error accessing to certificate", e); + } } if (clientCertFile != null && clientKeyFile != null) { log.info("Using mtls!"); - tlsBuilder.keyManager(new File(clientCertFile), new File(clientKeyFile)); + try { + tlsBuilder.keyManager(new File(clientCertFile), new File(clientKeyFile)); + } catch (IOException e) { + throw new LHMisconfigurationException("Error accessing to certificate", e); + } } builder = Grpc.newChannelBuilderForAddress(host, port, tlsBuilder.build()); @@ -275,7 +277,7 @@ private Channel getChannel(String host, int port) throws IOException { /** * Get a blocking stub with the application defaults */ - private LittleHorseBlockingStub getBaseBlockingStub(String host, int port) throws IOException { + private LittleHorseBlockingStub getBaseBlockingStub(String host, int port) { String tenantId = getTenantId(); LittleHorseBlockingStub blockingStub = LittleHorseGrpc.newBlockingStub(getChannel(host, port)); if (tenantId != null) { @@ -287,7 +289,7 @@ private LittleHorseBlockingStub getBaseBlockingStub(String host, int port) throw /** * Get a async stub with the application defaults */ - private LittleHorseStub getBaseAsyncStub(String host, int port) throws IOException { + private LittleHorseStub getBaseAsyncStub(String host, int port) { String tenantId = getTenantId(); LittleHorseStub asyncStub = LittleHorseGrpc.newStub(getChannel(host, port)); if (tenantId != null) { @@ -335,7 +337,7 @@ public boolean isOauth() { String tokenEndpointUrl = getOrSetDefault(OAUTH_ACCESS_TOKEN_URL, null); if (clientId == null && clientSecret == null && tokenEndpointUrl == null) { - log.info("OAuth is disable"); + log.warn("OAuth is disable"); return false; } @@ -361,7 +363,6 @@ public boolean isOauth() { oauthCredentialsProvider = new OAuthCredentialsProvider(oauthClient); } - log.info("OAuth initialized"); return true; } diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/worker/LHTaskWorker.java b/sdk-java/src/main/java/io/littlehorse/sdk/worker/LHTaskWorker.java index d9af9120a..8ae36df35 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/worker/LHTaskWorker.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/worker/LHTaskWorker.java @@ -15,7 +15,6 @@ import io.littlehorse.sdk.worker.internal.LHServerConnectionManager; import io.littlehorse.sdk.worker.internal.util.VariableMapping; import java.io.Closeable; -import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Parameter; import java.time.Duration; @@ -62,9 +61,8 @@ public class LHTaskWorker implements Closeable { * That method will be used to execute the tasks. * @param taskDefName is the name of the `TaskDef` to execute. * @param config is a valid LHConfig. - * @throws IOException */ - public LHTaskWorker(Object executable, String taskDefName, LHConfig config) throws IOException { + public LHTaskWorker(Object executable, String taskDefName, LHConfig config) { this.config = config; this.executable = executable; this.mappings = new ArrayList<>(); @@ -72,8 +70,7 @@ public LHTaskWorker(Object executable, String taskDefName, LHConfig config) thro this.grpcClient = config.getBlockingStub(); } - public LHTaskWorker(Object executable, String taskDefName, LHConfig config, LHServerConnectionManager manager) - throws IOException { + public LHTaskWorker(Object executable, String taskDefName, LHConfig config, LHServerConnectionManager manager) { this(executable, taskDefName, config); this.manager = manager; } @@ -87,7 +84,7 @@ public String getTaskDefName() { return taskDefName; } - private void createManager() throws IOException { + private void createManager() { validateTaskDefAndExecutable(); if (this.manager == null) { this.manager = new LHServerConnectionManager( @@ -187,10 +184,8 @@ private void validateTaskDefAndExecutable() throws TaskSchemaMismatchError { /** * Starts polling for and executing tasks. - * - * @throws IOException if unexpected error occurs opening connections. */ - public void start() throws IOException { + public void start() { createManager(); manager.start(); } diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnection.java b/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnection.java index 2a9340aa9..387e341ff 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnection.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnection.java @@ -8,7 +8,6 @@ import io.littlehorse.sdk.common.proto.PollTaskResponse; import io.littlehorse.sdk.common.proto.ScheduledTask; import java.io.Closeable; -import java.io.IOException; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -21,7 +20,7 @@ public class LHServerConnection implements Closeable, StreamObserver pollClient; private LittleHorseStub stub; - public LHServerConnection(LHServerConnectionManager manager, LHHostInfo host) throws IOException { + public LHServerConnection(LHServerConnectionManager manager, LHHostInfo host) { stillRunning = true; this.manager = manager; this.host = host; @@ -53,11 +52,11 @@ public void onNext(PollTaskResponse taskToDo) { if (taskToDo.hasResult()) { ScheduledTask scheduledTask = taskToDo.getResult(); String wfRunId = LHLibUtil.getWfRunId(scheduledTask.getSource()).getId(); - log.info("Received task schedule request for wfRun {}", wfRunId); + log.debug("Received task schedule request for wfRun {}", wfRunId); manager.submitTaskForExecution(scheduledTask, this.stub); - log.info("Scheduled task on threadpool for wfRun {}", wfRunId); + log.debug("Scheduled task on threadpool for wfRun {}", wfRunId); } else { log.error("Didn't successfully claim task, likely due to server restart."); } diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManager.java b/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManager.java index 2ca4560a2..96db7d3b8 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManager.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManager.java @@ -22,7 +22,6 @@ import io.littlehorse.sdk.worker.internal.util.ReportTaskObserver; import io.littlehorse.sdk.worker.internal.util.VariableMapping; import java.io.Closeable; -import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; @@ -62,8 +61,7 @@ public LHServerConnectionManager( LHConfig config, List mappings, Object executable, - ConnectionManagerLivenessController livenessController) - throws IOException { + ConnectionManagerLivenessController livenessController) { this.executable = executable; this.taskMethod = taskMethod; taskMethod.setAccessible(true); @@ -122,17 +120,12 @@ public void onNext(RegisterTaskWorkerResponse next) { for (LHHostInfo host : next.getYourHostsList()) { if (!isAlreadyRunning(host)) { - try { - runningConnections.add(new LHServerConnection(this, host)); - log.info( - "Adding connection to: {}:{} for taskdef {}", - host.getHost(), - host.getPort(), - taskDef.getId().getName()); - } catch (IOException exn) { - log.error("Yikes, caught IOException in onNext", exn); - throw new RuntimeException(exn); - } + runningConnections.add(new LHServerConnection(this, host)); + log.info( + "Adding connection to: {}:{} for taskdef {}", + host.getHost(), + host.getPort(), + taskDef.getId().getName()); } } @@ -288,8 +281,7 @@ private ReportTaskRun executeTask(ScheduledTask scheduledTask, Date scheduleTime return taskResult.build(); } - private Object invoke(ScheduledTask scheduledTask, WorkerContext context) - throws InputVarSubstitutionError, Exception { + private Object invoke(ScheduledTask scheduledTask, WorkerContext context) throws Exception { List inputs = new ArrayList<>(); for (VariableMapping mapping : this.mappings) { inputs.add(mapping.assign(scheduledTask, context)); diff --git a/sdk-java/src/test/java/io/littlehorse/sdk/worker/LHTaskWorkerTest.java b/sdk-java/src/test/java/io/littlehorse/sdk/worker/LHTaskWorkerTest.java index 0a06ccb46..5d1479798 100644 --- a/sdk-java/src/test/java/io/littlehorse/sdk/worker/LHTaskWorkerTest.java +++ b/sdk-java/src/test/java/io/littlehorse/sdk/worker/LHTaskWorkerTest.java @@ -10,7 +10,7 @@ public class LHTaskWorkerTest { private final LHServerConnectionManager manager = mock(); @Test - public void theWorkerIsHealthyIfNoCallFailureHasBeenNotifiedAndClusterIsHealthy() throws Exception { + public void theWorkerIsHealthyIfNoCallFailureHasBeenNotifiedAndClusterIsHealthy() { final LHTaskWorker worker = new LHTaskWorker(new GreetWorker(), "test_task", mock(), manager); when(manager.wasThereAnyFailure()).thenReturn(false); @@ -23,7 +23,7 @@ public void theWorkerIsHealthyIfNoCallFailureHasBeenNotifiedAndClusterIsHealthy( } @Test - public void theWorkerIsUnhealthyIfAFailureHasBeenNotifiedEvenIfClusterIsHealthy() throws Exception { + public void theWorkerIsUnhealthyIfAFailureHasBeenNotifiedEvenIfClusterIsHealthy() { final LHTaskWorker worker = new LHTaskWorker(new GreetWorker(), "test_task", mock(), manager); when(manager.wasThereAnyFailure()).thenReturn(true); @@ -34,7 +34,7 @@ public void theWorkerIsUnhealthyIfAFailureHasBeenNotifiedEvenIfClusterIsHealthy( } @Test - public void theWorkerIsUnhealthyIfNoFailureOnCallsButClusterIsUnhealthy() throws Exception { + public void theWorkerIsUnhealthyIfNoFailureOnCallsButClusterIsUnhealthy() { final LHTaskWorker worker = new LHTaskWorker(new GreetWorker(), "test_task", mock(), manager); when(manager.wasThereAnyFailure()).thenReturn(false); @@ -45,7 +45,7 @@ public void theWorkerIsUnhealthyIfNoFailureOnCallsButClusterIsUnhealthy() throws } @Test - public void theWorkerIsUnhealthyIfFailureOnCallsAndClusterIsUnhealthy() throws Exception { + public void theWorkerIsUnhealthyIfFailureOnCallsAndClusterIsUnhealthy() { final LHTaskWorker worker = new LHTaskWorker(new GreetWorker(), "test_task", mock(), manager); when(manager.wasThereAnyFailure()).thenReturn(true); diff --git a/sdk-java/src/test/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManagerTest.java b/sdk-java/src/test/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManagerTest.java index 0348279ff..141236215 100644 --- a/sdk-java/src/test/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManagerTest.java +++ b/sdk-java/src/test/java/io/littlehorse/sdk/worker/internal/LHServerConnectionManagerTest.java @@ -46,7 +46,7 @@ public void connectionManagerKeepsRunningWhenAnErrorOccurredButTheTimeoutHasNotB @Test @Timeout(value = 20) - public void shouldStopManagerWhenRetriesWhereExhaustedAfterErrorsHasOccurred() throws Exception { + public void shouldStopManagerWhenRetriesWhereExhaustedAfterErrorsHasOccurred() { final long timeout = 15_000; final ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(timeout); @@ -79,7 +79,7 @@ public void connectionManagerKeepsAlive() throws Exception { } @Test - public void connectionManagerIsNotHealthyIfAFailureHasBeenNotified() throws Exception { + public void connectionManagerIsNotHealthyIfAFailureHasBeenNotified() { final long timeout = 100L; ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(timeout); @@ -91,7 +91,7 @@ public void connectionManagerIsNotHealthyIfAFailureHasBeenNotified() throws Exce } @Test - public void connectionManagerRecoverHealthWhenASuccessHeartbeatIsNotified() throws Exception { + public void connectionManagerRecoverHealthWhenASuccessHeartbeatIsNotified() { final long timeout = 100L; ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(timeout); @@ -105,7 +105,7 @@ public void connectionManagerRecoverHealthWhenASuccessHeartbeatIsNotified() thro } @Test - public void clusterIsHealthyWhenResponseIndicatesThat() throws Exception { + public void clusterIsHealthyWhenResponseIndicatesThat() { final long timeout = 100L; ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(timeout); @@ -120,7 +120,7 @@ public void clusterIsHealthyWhenResponseIndicatesThat() throws Exception { } @Test - public void establishClusterAsHealthyWhenTheResponseIndicatesThat() throws IOException { + public void establishClusterAsHealthyWhenTheResponseIndicatesThat() { ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(100); RegisterTaskWorkerResponse responseIndicatingClusterIsHealthy = RegisterTaskWorkerResponse.newBuilder() .setIsClusterHealthy(true) @@ -135,7 +135,7 @@ public void establishClusterAsHealthyWhenTheResponseIndicatesThat() throws IOExc } @Test - public void establishClusterAsUnhealthyWhenTheResponseIndicatesThat() throws IOException { + public void establishClusterAsUnhealthyWhenTheResponseIndicatesThat() { ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(100); RegisterTaskWorkerResponse responseIndicatingClusterIsUnhealthy = RegisterTaskWorkerResponse.newBuilder() .setIsClusterHealthy(false) @@ -150,7 +150,7 @@ public void establishClusterAsUnhealthyWhenTheResponseIndicatesThat() throws IOE } @Test - public void establishClusterAsHealthyWhenTheResponseDoesNotHaveThatMetadata() throws IOException { + public void establishClusterAsHealthyWhenTheResponseDoesNotHaveThatMetadata() { ConnectionManagerLivenessController livenessController = new ConnectionManagerLivenessController(100); RegisterTaskWorkerResponse responseWithoutClusterHealthMetadata = RegisterTaskWorkerResponse.newBuilder().build(); diff --git a/server/src/main/java/io/littlehorse/App.java b/server/src/main/java/io/littlehorse/App.java index 6260638b9..dfb93bc92 100644 --- a/server/src/main/java/io/littlehorse/App.java +++ b/server/src/main/java/io/littlehorse/App.java @@ -2,7 +2,6 @@ import io.littlehorse.common.LHServerConfig; import io.littlehorse.server.KafkaStreamsServerImpl; -import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -32,7 +31,7 @@ public static void doIdempotentSetup(LHServerConfig config) throws InterruptedEx } } - public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + public static void main(String[] args) throws InterruptedException, ExecutionException { LHServerConfig config; if (args.length > 0) { diff --git a/server/src/main/java/io/littlehorse/server/KafkaStreamsServerImpl.java b/server/src/main/java/io/littlehorse/server/KafkaStreamsServerImpl.java index 348a38e5d..c651c71c6 100644 --- a/server/src/main/java/io/littlehorse/server/KafkaStreamsServerImpl.java +++ b/server/src/main/java/io/littlehorse/server/KafkaStreamsServerImpl.java @@ -970,7 +970,7 @@ public void close() { } } - public static void doMain(LHServerConfig config) throws IOException, InterruptedException { + public static void doMain(LHServerConfig config) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); KafkaStreamsServerImpl server = new KafkaStreamsServerImpl(config); Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java b/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java index 645b1b494..c83cb2d47 100644 --- a/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java +++ b/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java @@ -19,7 +19,6 @@ import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.Storeable; import io.littlehorse.common.exceptions.LHApiException; -import io.littlehorse.common.exceptions.LHBadRequestError; import io.littlehorse.common.model.AbstractCommand; import io.littlehorse.common.model.AbstractGetable; import io.littlehorse.common.model.getable.ObjectIdModel; @@ -259,8 +258,7 @@ public LHHostInfo getAdvertisedHost( return desiredHost; } - public List getAllAdvertisedHosts(String listenerName) - throws LHBadRequestError { + public List getAllAdvertisedHosts(String listenerName) { Set hosts = getAllInternalHosts(); List out = new ArrayList<>(); diff --git a/server/src/test/java/e2e/TaskDefLifecycleTest.java b/server/src/test/java/e2e/TaskDefLifecycleTest.java index 2fb4f06c3..1d4619527 100644 --- a/server/src/test/java/e2e/TaskDefLifecycleTest.java +++ b/server/src/test/java/e2e/TaskDefLifecycleTest.java @@ -22,7 +22,6 @@ import io.littlehorse.sdk.worker.LHTaskWorker; import io.littlehorse.test.LHTest; import io.littlehorse.test.exception.LHTestExceptionUtil; -import java.io.IOException; import java.time.Duration; import java.util.UUID; import org.awaitility.Awaitility; @@ -91,7 +90,7 @@ void shouldBeAbleToReadTaskDefImmediatelyAfterCreation() { } @Test - void workerShouldWaitForTaskDef() throws IOException { + void workerShouldWaitForTaskDef() { String taskDefName = "only-to-use-with-wait-for-taskdef"; PutTaskDefRequest req = PutTaskDefRequest.newBuilder().setName(taskDefName).build(); diff --git a/settings.gradle b/settings.gradle index 466b37790..edc8aecc7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,33 +1,34 @@ rootProject.name = 'littlehorse' include( - 'server', - 'sdk-java', - 'e2e-tests', - 'test-utils' + 'server', + 'sdk-java', + 'e2e-tests', + 'test-utils', + 'canary' ) [ - 'basic', - 'variables', - 'child-thread', - 'child-workflow', - 'conditionals', - 'wf-versions', - 'worker-context', - 'run-wf', - 'conditionals-while', - 'exception-handler', - 'external-event', - 'interrupt-handler', - 'json', - 'saga', - 'parallel-approval', - 'mutation', - 'user-tasks', - 'hundred-tasks', - 'spawn-thread-foreach', - 'shared-variables' + 'basic', + 'variables', + 'child-thread', + 'child-workflow', + 'conditionals', + 'wf-versions', + 'worker-context', + 'run-wf', + 'conditionals-while', + 'exception-handler', + 'external-event', + 'interrupt-handler', + 'json', + 'saga', + 'parallel-approval', + 'mutation', + 'user-tasks', + 'hundred-tasks', + 'spawn-thread-foreach', + 'shared-variables' ].each { example -> include "example-$example" project(":example-$example").projectDir = new File(rootProject.projectDir, "examples/$example") diff --git a/test-utils/src/main/java/io/littlehorse/test/LHExtension.java b/test-utils/src/main/java/io/littlehorse/test/LHExtension.java index 372209559..709376a04 100644 --- a/test-utils/src/main/java/io/littlehorse/test/LHExtension.java +++ b/test-utils/src/main/java/io/littlehorse/test/LHExtension.java @@ -18,7 +18,6 @@ import io.littlehorse.test.exception.LHTestInitializationException; import io.littlehorse.test.internal.StandaloneTestBootstrapper; import io.littlehorse.test.internal.TestContext; -import java.io.IOException; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; @@ -67,19 +66,15 @@ public void postProcessTestInstance(Object testInstance, ExtensionContext contex .ignoreExceptionsMatching(exn -> LHTestExceptionUtil.isNotFoundException(exn)) .until(() -> testContext.getLhClient().getTaskDef(taskDefId), taskDef -> taskDef != null); Awaitility.await().until(() -> { - try { - worker.start(); - return true; - } catch (IOException e) { - throw new IllegalStateException(e); - } + worker.start(); + return true; }); } testContext.registerUserTaskSchemas(testInstance); List externalEventDefinitions = testContext.discoverExternalEventDefinitions(testInstance); externalEventDefinitions.forEach(testContext::registerExternalEventDef); - } catch (IOException | IllegalAccessException e) { + } catch (IllegalAccessException e) { throw new LHTestInitializationException("Something went wrong registering task workers", e); } testContext.instrument(testInstance); diff --git a/test-utils/src/main/java/io/littlehorse/test/internal/ExternalTestBootstrapper.java b/test-utils/src/main/java/io/littlehorse/test/internal/ExternalTestBootstrapper.java index 6f10488b1..1083f7897 100644 --- a/test-utils/src/main/java/io/littlehorse/test/internal/ExternalTestBootstrapper.java +++ b/test-utils/src/main/java/io/littlehorse/test/internal/ExternalTestBootstrapper.java @@ -2,29 +2,22 @@ import io.littlehorse.sdk.common.config.LHConfig; import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub; -import io.littlehorse.test.exception.LHTestInitializationException; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; public class ExternalTestBootstrapper implements TestBootstrapper { private static final String LH_CONFIG_FILE = ".config/littlehorse.config"; - private Path configPath = Path.of(System.getProperty("user.home"), LH_CONFIG_FILE); - private final LHConfig workerConfig; private final LittleHorseBlockingStub lhClient; + private Path configPath = Path.of(System.getProperty("user.home"), LH_CONFIG_FILE); public ExternalTestBootstrapper() { if (Files.notExists(configPath)) { throw new IllegalStateException(String.format("Configuration file %s doesn't exist", LH_CONFIG_FILE)); } workerConfig = new LHConfig(configPath.toString()); - try { - lhClient = workerConfig.getBlockingStub(); - } catch (IOException e) { - throw new LHTestInitializationException(e); - } + lhClient = workerConfig.getBlockingStub(); } @Override diff --git a/test-utils/src/main/java/io/littlehorse/test/internal/TestContext.java b/test-utils/src/main/java/io/littlehorse/test/internal/TestContext.java index 45ab541ed..822b5e547 100644 --- a/test-utils/src/main/java/io/littlehorse/test/internal/TestContext.java +++ b/test-utils/src/main/java/io/littlehorse/test/internal/TestContext.java @@ -19,7 +19,6 @@ import io.littlehorse.test.LHWorkflow; import io.littlehorse.test.WorkflowVerifier; import io.littlehorse.test.exception.LHTestExceptionUtil; -import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; @@ -52,7 +51,7 @@ public TestContext(TestBootstrapper bootstrapper) { this.wfSpecStoreLock = new ReentrantLock(); } - public List discoverTaskWorkers(Object testInstance) throws IOException { + public List discoverTaskWorkers(Object testInstance) { List workers = new ArrayList<>(); List annotatedMethods = ReflectionUtil.findAnnotatedMethods(testInstance.getClass(), LHTaskMethod.class);