Skip to content

Commit

Permalink
feat: Instrument thread pools
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjayVas committed Sep 26, 2024
1 parent 019ee45 commit 7319d0e
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 22 deletions.
11 changes: 10 additions & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ maven.artifact(
artifact = "grpc-testing",
group = "io.grpc",
)
maven.artifact(
testonly = True,
artifact = "opentelemetry-sdk-testing",
group = "io.opentelemetry",
)
maven.artifact(
testonly = True, # Java agent should be used in non-test code.
artifact = "opentelemetry-sdk",
group = "io.opentelemetry",
)
maven.install(
name = "maven",
artifacts = [
Expand Down Expand Up @@ -227,7 +237,6 @@ maven.install(
"org.yaml:snakeyaml:2.2",
"org.liquibase:liquibase-core:4.26.0",
"com.google.cloudspannerecosystem:liquibase-spanner:4.25.1",
"com.google.cloud:google-cloud-spanner-jdbc:2.15.1",
"org.liquibase.ext:liquibase-postgresql:4.11.0",

# Math library.
Expand Down
6 changes: 6 additions & 0 deletions imports/java/io/opentelemetry/sdk/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package(default_visibility = ["//visibility:public"])

alias(
name = "sdk",
actual = "@maven//:io_opentelemetry_opentelemetry_sdk",
)
6 changes: 6 additions & 0 deletions imports/java/io/opentelemetry/sdk/testing/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package(default_visibility = ["//visibility:public"])

alias(
name = "testing",
actual = "@maven//:io_opentelemetry_opentelemetry_sdk_testing",
)
80 changes: 67 additions & 13 deletions maven_install.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL",
"__INPUT_ARTIFACTS_HASH": 1644744982,
"__RESOLVED_ARTIFACTS_HASH": -1860436502,
"__INPUT_ARTIFACTS_HASH": -926653138,
"__RESOLVED_ARTIFACTS_HASH": -411593500,
"artifacts": {
"com.adobe.testing:s3mock": {
"shasums": {
Expand Down Expand Up @@ -552,10 +552,10 @@
},
"com.google.cloud:google-cloud-spanner-jdbc": {
"shasums": {
"jar": "8a7b562e89f2cb87a6b50f6ece06f2e93667dcf072b1d1e8194fdea1bd51e5f7",
"sources": "f319650eb71b5375f327919d3a4dd4465e06ee1caa279faf75ba5d56ef94d5b8"
"jar": "d009bf8fa28490b92b1b95bf789b6610569f16de603c7fbbec64d69e1a073f0a",
"sources": "a880fd80ed843d3d90e1e971352886c5aff1bdf0517b68998724d6b189ed089c"
},
"version": "2.15.1"
"version": "2.21.0"
},
"com.google.cloud:google-cloud-storage": {
"shasums": {
Expand Down Expand Up @@ -1233,6 +1233,13 @@
},
"version": "1.37.0"
},
"io.opentelemetry:opentelemetry-sdk-testing": {
"shasums": {
"jar": "88948d1f2b8213368be6014e3409a6e7356c934c2b8d04d31d31151a9dc3f850",
"sources": "022ae8aa78b9a96863fd4983e990a019ea56c718d0182aebfd3e7d42fd1f42e4"
},
"version": "1.37.0"
},
"io.opentelemetry:opentelemetry-sdk-trace": {
"shasums": {
"jar": "5743bc33f9f046a86c8d827e4c4e321c2224ec69f72e09153232148186f2c77b",
Expand Down Expand Up @@ -2377,7 +2384,7 @@
"com.fasterxml.jackson.core:jackson-databind:2.12.7.1": "com.fasterxml.jackson.core:jackson-databind:2.17.2",
"com.fasterxml.jackson.core:jackson-databind:2.15.3": "com.fasterxml.jackson.core:jackson-databind:2.17.2",
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.3": "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2",
"com.google.api.grpc:proto-google-cloud-spanner-executor-v1:6.56.0": "com.google.api.grpc:proto-google-cloud-spanner-executor-v1:6.74.0",
"com.google.api.grpc:proto-google-cloud-spanner-executor-v1:6.73.0": "com.google.api.grpc:proto-google-cloud-spanner-executor-v1:6.74.0",
"com.google.apis:google-api-services-storage:v1-rev20240809-2.0.0": "com.google.apis:google-api-services-storage:v1-rev20240706-2.0.0",
"com.google.errorprone:error_prone_annotations:2.16": "com.google.errorprone:error_prone_annotations:2.28.0",
"com.google.errorprone:error_prone_annotations:2.18.0": "com.google.errorprone:error_prone_annotations:2.28.0",
Expand All @@ -2386,7 +2393,6 @@
"com.google.errorprone:error_prone_annotations:2.23.0": "com.google.errorprone:error_prone_annotations:2.28.0",
"com.google.errorprone:error_prone_annotations:2.26.1": "com.google.errorprone:error_prone_annotations:2.28.0",
"com.google.errorprone:error_prone_annotations:2.30.0": "com.google.errorprone:error_prone_annotations:2.28.0",
"com.google.guava:failureaccess:1.0.1": "com.google.guava:failureaccess:1.0.2",
"com.google.j2objc:j2objc-annotations:3.0.0": "com.google.j2objc:j2objc-annotations:2.8",
"commons-codec:commons-codec:1.15": "commons-codec:commons-codec:1.17.1",
"commons-codec:commons-codec:1.16.0": "commons-codec:commons-codec:1.17.1",
Expand All @@ -2408,7 +2414,7 @@
"io.netty:netty-transport-native-unix-common:4.1.112.Final": "io.netty:netty-transport-native-unix-common:4.1.108.Final",
"io.netty:netty-transport:4.1.100.Final": "io.netty:netty-transport:4.1.108.Final",
"io.netty:netty-transport:4.1.112.Final": "io.netty:netty-transport:4.1.108.Final",
"io.perfmark:perfmark-api:0.26.0": "io.perfmark:perfmark-api:0.27.0",
"io.opentelemetry:opentelemetry-api-incubator:1.37.0-alpha": "io.opentelemetry:opentelemetry-api-incubator:1.41.0-alpha",
"net.bytebuddy:byte-buddy-agent:1.10.9": "net.bytebuddy:byte-buddy-agent:1.14.15",
"net.bytebuddy:byte-buddy:1.10.9": "net.bytebuddy:byte-buddy:1.14.15",
"net.java.dev.jna:jna:5.13.0": "net.java.dev.jna:jna:5.9.0",
Expand All @@ -2417,10 +2423,8 @@
"org.apache.httpcomponents:httpclient:4.5.13": "org.apache.httpcomponents:httpclient:4.5.14",
"org.apache.httpcomponents:httpcore:4.4.13": "org.apache.httpcomponents:httpcore:4.4.16",
"org.checkerframework:checker-qual:3.31.0": "org.checkerframework:checker-qual:3.37.0",
"org.checkerframework:checker-qual:3.40.0": "org.checkerframework:checker-qual:3.37.0",
"org.checkerframework:checker-qual:3.42.0": "org.checkerframework:checker-qual:3.37.0",
"org.checkerframework:checker-qual:3.46.0": "org.checkerframework:checker-qual:3.37.0",
"org.codehaus.mojo:animal-sniffer-annotations:1.23": "org.codehaus.mojo:animal-sniffer-annotations:1.24",
"org.ow2.asm:asm-analysis:9.2": "org.ow2.asm:asm-analysis:9.5",
"org.ow2.asm:asm-commons:9.6": "org.ow2.asm:asm-commons:9.2",
"org.ow2.asm:asm-tree:9.2": "org.ow2.asm:asm-tree:9.5",
Expand All @@ -2429,8 +2433,7 @@
"org.ow2.asm:asm:9.6": "org.ow2.asm:asm:9.7",
"org.slf4j:slf4j-api:1.7.25": "org.slf4j:slf4j-api:1.7.36",
"org.slf4j:slf4j-api:1.7.30": "org.slf4j:slf4j-api:1.7.36",
"org.slf4j:slf4j-api:2.0.9": "org.slf4j:slf4j-api:1.7.36",
"org.threeten:threetenbp:1.6.8": "org.threeten:threetenbp:1.6.9"
"org.slf4j:slf4j-api:2.0.9": "org.slf4j:slf4j-api:1.7.36"
},
"dependencies": {
"com.adobe.testing:s3mock": [
Expand Down Expand Up @@ -2911,6 +2914,7 @@
"com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1",
"com.google.api.grpc:grpc-google-cloud-spanner-v1",
"com.google.api.grpc:grpc-google-common-protos",
"com.google.api.grpc:proto-google-cloud-monitoring-v3",
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1",
"com.google.api.grpc:proto-google-cloud-spanner-admin-instance-v1",
"com.google.api.grpc:proto-google-cloud-spanner-executor-v1",
Expand All @@ -2926,6 +2930,7 @@
"com.google.auto.value:auto-value-annotations",
"com.google.cloud:google-cloud-core",
"com.google.cloud:google-cloud-core-grpc",
"com.google.cloud:google-cloud-monitoring",
"com.google.cloud:google-cloud-spanner",
"com.google.cloud:grpc-gcp",
"com.google.code.findbugs:jsr305",
Expand All @@ -2941,7 +2946,6 @@
"com.google.protobuf:protobuf-java-util",
"com.google.re2j:re2j",
"commons-codec:commons-codec",
"commons-logging:commons-logging",
"io.grpc:grpc-alts",
"io.grpc:grpc-api",
"io.grpc:grpc-auth",
Expand All @@ -2962,6 +2966,11 @@
"io.opencensus:opencensus-contrib-grpc-util",
"io.opencensus:opencensus-contrib-http-util",
"io.opencensus:opencensus-proto",
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-api-incubator",
"io.opentelemetry:opentelemetry-context",
"io.opentelemetry:opentelemetry-sdk-common",
"io.opentelemetry:opentelemetry-sdk-metrics",
"io.perfmark:perfmark-api",
"javax.annotation:javax.annotation-api",
"org.apache.httpcomponents:httpclient",
Expand Down Expand Up @@ -3348,6 +3357,34 @@
"io.opentelemetry:opentelemetry-api": [
"io.opentelemetry:opentelemetry-context"
],
"io.opentelemetry:opentelemetry-sdk": [
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-sdk-common",
"io.opentelemetry:opentelemetry-sdk-logs",
"io.opentelemetry:opentelemetry-sdk-metrics",
"io.opentelemetry:opentelemetry-sdk-trace"
],
"io.opentelemetry:opentelemetry-sdk-common": [
"io.opentelemetry:opentelemetry-api"
],
"io.opentelemetry:opentelemetry-sdk-logs": [
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-api-incubator",
"io.opentelemetry:opentelemetry-sdk-common"
],
"io.opentelemetry:opentelemetry-sdk-metrics": [
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-api-incubator",
"io.opentelemetry:opentelemetry-sdk-common"
],
"io.opentelemetry:opentelemetry-sdk-testing": [
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-sdk"
],
"io.opentelemetry:opentelemetry-sdk-trace": [
"io.opentelemetry:opentelemetry-api",
"io.opentelemetry:opentelemetry-sdk-common"
],
"io.projectreactor.netty:reactor-netty-core": [
"io.netty:netty-handler",
"io.netty:netty-handler-proxy",
Expand Down Expand Up @@ -5677,6 +5714,16 @@
"io.opentelemetry.sdk.metrics.internal.state",
"io.opentelemetry.sdk.metrics.internal.view"
],
"io.opentelemetry:opentelemetry-sdk-testing": [
"io.opentelemetry.sdk.testing.assertj",
"io.opentelemetry.sdk.testing.context",
"io.opentelemetry.sdk.testing.exporter",
"io.opentelemetry.sdk.testing.junit4",
"io.opentelemetry.sdk.testing.junit5",
"io.opentelemetry.sdk.testing.logs",
"io.opentelemetry.sdk.testing.time",
"io.opentelemetry.sdk.testing.trace"
],
"io.opentelemetry:opentelemetry-sdk-trace": [
"io.opentelemetry.internal.shaded.jctools.counters",
"io.opentelemetry.internal.shaded.jctools.maps",
Expand Down Expand Up @@ -8838,6 +8885,8 @@
"io.opentelemetry:opentelemetry-sdk-logs:jar:sources",
"io.opentelemetry:opentelemetry-sdk-metrics",
"io.opentelemetry:opentelemetry-sdk-metrics:jar:sources",
"io.opentelemetry:opentelemetry-sdk-testing",
"io.opentelemetry:opentelemetry-sdk-testing:jar:sources",
"io.opentelemetry:opentelemetry-sdk-trace",
"io.opentelemetry:opentelemetry-sdk-trace:jar:sources",
"io.opentelemetry:opentelemetry-sdk:jar:sources",
Expand Down Expand Up @@ -9454,6 +9503,11 @@
"io.opentelemetry.contrib.gcp.resource.GCPResourceProvider"
]
},
"io.opentelemetry:opentelemetry-sdk-testing": {
"io.opentelemetry.context.ContextStorageProvider": [
"io.opentelemetry.sdk.testing.context.SettableContextStorageProvider"
]
},
"io.projectreactor.netty:reactor-netty-core": {
"io.micrometer.context.ContextAccessor": [
"reactor.netty.contextpropagation.ChannelContextAccessor"
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ kt_jvm_library(
"//imports/java/com/google/gson",
"//imports/java/com/google/protobuf",
"//imports/java/com/google/protobuf/util",
"//imports/java/io/opentelemetry/api",
"//imports/java/org/jetbrains/annotations",
"//imports/java/picocli",
"//imports/kotlin/com/google/protobuf/kotlin",
Expand Down
118 changes: 118 additions & 0 deletions src/main/kotlin/org/wfanet/measurement/common/Instrumentation.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2024 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.common

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor

/** Common instrumentation for an application. */
object Instrumentation {
/** Root namespace. */
const val ROOT_NAMESPACE = "halo_cmm"

/** Singleton [OpenTelemetry] instance which may be initialized by the Java agent. */
val openTelemetry: OpenTelemetry by lazy {
// Using lazy delegate to avoid accessing global instance before it's initialized.
GlobalOpenTelemetry.get()
}

/** Common [Meter]. */
val meter: Meter by lazy { openTelemetry.getMeter(this::class.java.name) }

/** Instrumentation for thread pools. */
private val threadPools: ThreadPools by lazy { ThreadPools() }

/**
* Instruments the specified thread pool.
*
* @return the instrumented thread pool executor service
*/
fun instrumentThreadPool(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService {
return threadPools.instrument(poolName, threadPool)
}

private class ThreadPools {
private val threadPoolsByName = ConcurrentHashMap<String, ThreadPoolExecutor>()

private val sizeCounter =
meter
.upDownCounterBuilder("$NAMESPACE.size")
.setDescription("Current number of threads")
.buildObserver()
private val activeCounter =
meter
.upDownCounterBuilder("$NAMESPACE.active_count")
.setDescription("Approximate number of threads that are actively executing tasks")
.buildObserver()

init {
meter.batchCallback(::record, sizeCounter, activeCounter)
}

/** Registers the specified thread pool for instrumentation. */
fun instrument(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService {
val previousRegistration = threadPoolsByName.putIfAbsent(poolName, threadPool)
check(previousRegistration == null) { "Thread pool $poolName already instrumented" }
return InstrumentedExecutorService(poolName, threadPool)
}

private fun record() {
for ((poolName, threadPool) in threadPoolsByName) {
val attributes: Attributes = Attributes.of(THREAD_POOL_NAME_ATTRIBUTE_KEY, poolName)
sizeCounter.record(threadPool.poolSize.toLong(), attributes)
activeCounter.record(threadPool.activeCount.toLong(), attributes)
}
}

/** Instrumented [ExecutorService] */
private inner class InstrumentedExecutorService(
private val poolName: String,
private val delegate: ExecutorService,
) : ExecutorService by delegate {
override fun shutdown() {
threadPoolsByName.remove(poolName)
delegate.shutdown()
}

override fun shutdownNow(): MutableList<Runnable> {
threadPoolsByName.remove(poolName)
return delegate.shutdownNow()
}
}

companion object {
private const val NAMESPACE = "$ROOT_NAMESPACE.thread_pool"
/** Attribute key for thread pool name. */
private val THREAD_POOL_NAME_ATTRIBUTE_KEY: AttributeKey<String> =
AttributeKey.stringKey("$NAMESPACE.name")
}
}
}

/**
* Instruments the [ThreadPoolExecutor].
*
* @return the instrumented [ExecutorService]
*/
fun ThreadPoolExecutor.instrumented(poolName: String): ExecutorService =
Instrumentation.instrumentThreadPool(poolName, this)
Loading

0 comments on commit 7319d0e

Please sign in to comment.