Skip to content

Commit

Permalink
- Added MetricsCollector interface
Browse files Browse the repository at this point in the history
- Added module with minimal Prometheus Collector
  • Loading branch information
jmigueprieto committed Sep 17, 2024
1 parent 027d895 commit a2818fd
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.automator.events.TaskRunnerEvent;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -321,5 +328,27 @@ public <T extends TaskRunnerEvent> Builder withListener(Class<T> eventType, Cons
listeners.computeIfAbsent(eventType, k -> new LinkedList<>()).add(listener);
return this;
}

public Builder withMetricsCollector(MetricsCollector metricsCollector) {
listeners.computeIfAbsent(PollFailure.class, k -> new LinkedList<>())
.add((Consumer<PollFailure>) metricsCollector::consume);

listeners.computeIfAbsent(PollCompleted.class, k -> new LinkedList<>())
.add((Consumer<PollCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(PollStarted.class, k -> new LinkedList<>())
.add((Consumer<PollStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionStarted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionCompleted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionFailure.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionFailure>) metricsCollector::consume);

return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.metrics;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;

public interface MetricsCollector {

void consume(PollFailure e);

void consume(PollCompleted e);

void consume(PollStarted e);

void consume(TaskExecutionStarted e);

void consume(TaskExecutionCompleted e);

void consume(TaskExecutionFailure e);
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.0.0-alpha15-SNAPSHOT
version=3.0.0-alpha16-SNAPSHOT
90 changes: 90 additions & 0 deletions conductor-clients/java/conductor-java-sdk/metrics/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
plugins {
id 'java-library'
id 'idea'
id 'maven-publish'
id 'signing'
}

dependencies {
implementation 'io.micrometer:micrometer-registry-prometheus:1.10.5'
implementation project(":conductor-client")

testImplementation 'org.mockito:mockito-core:5.4.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

java {
withSourcesJar()
withJavadocJar()
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
pom {
name = 'Orkes Conductor Metrics module'
description = 'OSS & Orkes Conductor Metrics'
url = 'https://github.com/conductor-oss/conductor.git'
scm {
connection = 'scm:git:git://github.com/conductor-oss/conductor.git'
developerConnection = 'scm:git:ssh://github.com/conductor-oss/conductor.git'
url = 'https://github.com/conductor-oss/conductor.git'
}
licenses {
license {
name = 'The Apache License, Version 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
developer {
organization = 'Orkes'
organizationUrl = 'https://orkes.io'
name = 'Orkes Development Team'
email = '[email protected]'
}
}
}
}
}

repositories {
maven {
if (project.hasProperty("mavenCentral")) {
println "Publishing to Sonatype Repository"
url = "https://s01.oss.sonatype.org/${project.version.endsWith('-SNAPSHOT') ? "content/repositories/snapshots/" : "service/local/staging/deploy/maven2/"}"
credentials {
username project.properties.username
password project.properties.password
}
} else {
url = "s3://orkes-artifacts-repo/${project.version.endsWith('-SNAPSHOT') ? "snapshots" : "releases"}"
authentication {
awsIm(AwsImAuthentication)
}
}
}
}
}

signing {
def signingKeyId = findProperty('signingKeyId')
if (signingKeyId) {
println 'Signing the artifact with keys'
signing {
def signingKey = findProperty('signingKey')
def signingPassword = findProperty('signingPassword')
if (signingKeyId && signingKey && signingPassword) {
useInMemoryPgpKeys(signingKeyId, signingKey, signingPassword)
}

sign publishing.publications
}
}
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.metrics.prometheus;

import java.io.IOException;
import java.net.InetSocketAddress;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.metrics.MetricsCollector;

import com.sun.net.httpserver.HttpServer;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;

public class PrometheusMetricsCollector implements MetricsCollector {

private static final PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

public void startServer() throws IOException {
startServer(9991, "/metrics");
}

public void startServer(int port, String endpoint) throws IOException {
var server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext(endpoint, (exchange -> {
var body = prometheusRegistry.scrape();
exchange.getResponseHeaders().set("Content-Type", "text/plain");
exchange.sendResponseHeaders(200, body.getBytes().length);
try (var os = exchange.getResponseBody()) {
os.write(body.getBytes());
}
}));
server.start();
}

@Override
public void consume(PollFailure e) {
var timer = prometheusRegistry.timer("poll_failure", "type", e.getTaskType());
timer.record(e.getDuration());
}

@Override
public void consume(PollCompleted e) {
var timer = prometheusRegistry.timer("poll_success", "type", e.getTaskType());
timer.record(e.getDuration());
}

@Override
public void consume(PollStarted e) {
var counter = prometheusRegistry.counter("poll_started", "type", e.getTaskType());
counter.increment();
}

@Override
public void consume(TaskExecutionStarted e) {
var counter = prometheusRegistry.counter("task_execution_started", "type", e.getTaskType());
counter.increment();
}

@Override
public void consume(TaskExecutionCompleted e) {
var timer = prometheusRegistry.timer("task_execution_completed", "type", e.getTaskType());
timer.record(e.getDuration());
}

@Override
public void consume(TaskExecutionFailure e) {
var timer = prometheusRegistry.timer("task_execution_failure", "type", e.getTaskType());
timer.record(e.getDuration());
}
}
2 changes: 1 addition & 1 deletion conductor-clients/java/conductor-java-sdk/settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ include 'examples'
include 'tests'
include 'orkes-spring'
include 'conductor-client-spring'

include 'metrics'

0 comments on commit a2818fd

Please sign in to comment.