diff --git a/src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel index 4d6e061ea..75335a9af 100644 --- a/src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel @@ -17,6 +17,7 @@ kt_jvm_library( "//imports/java/com/google/gson", "//imports/java/com/google/protobuf", "//imports/java/com/google/protobuf/util", + "//imports/java/io/opentelemetry/api", "//imports/java/org/jetbrains/annotations", "//imports/java/picocli", "//imports/kotlin/com/google/protobuf/kotlin", diff --git a/src/main/kotlin/org/wfanet/measurement/common/Instrumentation.kt b/src/main/kotlin/org/wfanet/measurement/common/Instrumentation.kt new file mode 100644 index 000000000..95e5fc36c --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/common/Instrumentation.kt @@ -0,0 +1,118 @@ +/* + * Copyright 2024 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.common + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.metrics.Meter +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.ThreadPoolExecutor + +/** Common instrumentation for an application. */ +object Instrumentation { + /** Root namespace. */ + const val ROOT_NAMESPACE = "halo_cmm" + + /** Singleton [OpenTelemetry] instance which may be initialized by the Java agent. */ + val openTelemetry: OpenTelemetry by lazy { + // Using lazy delegate to avoid accessing global instance before it's initialized. + GlobalOpenTelemetry.get() + } + + /** Common [Meter]. */ + val meter: Meter by lazy { openTelemetry.getMeter(this::class.java.name) } + + /** Instrumentation for thread pools. */ + private val threadPools: ThreadPools by lazy { ThreadPools() } + + /** + * Instruments the specified thread pool. + * + * @return the instrumented thread pool executor service + */ + fun instrumentThreadPool(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService { + return threadPools.instrument(poolName, threadPool) + } + + private class ThreadPools { + private val threadPoolsByName = ConcurrentHashMap() + + private val sizeCounter = + meter + .upDownCounterBuilder("$NAMESPACE.size") + .setDescription("Current number of threads") + .buildObserver() + private val activeCounter = + meter + .upDownCounterBuilder("$NAMESPACE.active_count") + .setDescription("Approximate number of threads that are actively executing tasks") + .buildObserver() + + init { + meter.batchCallback(::record, sizeCounter, activeCounter) + } + + /** Registers the specified thread pool for instrumentation. */ + fun instrument(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService { + val previousRegistration = threadPoolsByName.putIfAbsent(poolName, threadPool) + check(previousRegistration == null) { "Thread pool $poolName already instrumented" } + return InstrumentedExecutorService(poolName, threadPool) + } + + private fun record() { + for ((poolName, threadPool) in threadPoolsByName) { + val attributes: Attributes = Attributes.of(THREAD_POOL_NAME_ATTRIBUTE_KEY, poolName) + sizeCounter.record(threadPool.poolSize.toLong(), attributes) + activeCounter.record(threadPool.activeCount.toLong(), attributes) + } + } + + /** Instrumented [ExecutorService] */ + private inner class InstrumentedExecutorService( + private val poolName: String, + private val delegate: ExecutorService, + ) : ExecutorService by delegate { + override fun shutdown() { + threadPoolsByName.remove(poolName) + delegate.shutdown() + } + + override fun shutdownNow(): MutableList { + threadPoolsByName.remove(poolName) + return delegate.shutdownNow() + } + } + + companion object { + private const val NAMESPACE = "$ROOT_NAMESPACE.thread_pool" + /** Attribute key for thread pool name. */ + private val THREAD_POOL_NAME_ATTRIBUTE_KEY: AttributeKey = + AttributeKey.stringKey("$NAMESPACE.name") + } + } +} + +/** + * Instruments the [ThreadPoolExecutor]. + * + * @return the instrumented [ExecutorService] + */ +fun ThreadPoolExecutor.instrumented(poolName: String): ExecutorService = + Instrumentation.instrumentThreadPool(poolName, this) diff --git a/src/main/kotlin/org/wfanet/measurement/common/grpc/CommonServer.kt b/src/main/kotlin/org/wfanet/measurement/common/grpc/CommonServer.kt index 6f0c672cc..f7c47d1c0 100644 --- a/src/main/kotlin/org/wfanet/measurement/common/grpc/CommonServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/common/grpc/CommonServer.kt @@ -33,6 +33,7 @@ import java.util.logging.Logger import kotlin.properties.Delegates import org.jetbrains.annotations.VisibleForTesting import org.wfanet.measurement.common.crypto.SigningCerts +import org.wfanet.measurement.common.instrumented import picocli.CommandLine class CommonServer @@ -48,8 +49,10 @@ private constructor( init { require(threadPoolSize > 1) } + private val executor: ExecutorService = ThreadPoolExecutor(1, threadPoolSize, 60L, TimeUnit.SECONDS, LinkedBlockingQueue()) + .instrumented(nameForLogging) private val healthStatusManager = HealthStatusManager() private val started = AtomicBoolean(false) @@ -158,7 +161,7 @@ private constructor( @set:CommandLine.Option( names = ["--require-client-auth"], description = ["Require client auth"], - defaultValue = "true" + defaultValue = "true", ) var clientAuthRequired by Delegates.notNull() private set @@ -181,7 +184,7 @@ private constructor( @set:CommandLine.Option( names = ["--debug-verbose-grpc-server-logging"], description = ["Debug mode: log ALL gRPC requests and responses"], - defaultValue = "false" + defaultValue = "false", ) var debugVerboseGrpcLogging by Delegates.notNull() private set @@ -201,7 +204,7 @@ private constructor( services: Iterable, port: Int = 0, healthPort: Int = 0, - threadPoolSize: Int = DEFAULT_THREAD_POOL_SIZE + threadPoolSize: Int = DEFAULT_THREAD_POOL_SIZE, ): CommonServer { return CommonServer( nameForLogging, @@ -218,7 +221,7 @@ private constructor( fun fromFlags( flags: Flags, nameForLogging: String, - services: Iterable + services: Iterable, ): CommonServer { return fromParameters( flags.debugVerboseGrpcLogging, @@ -228,7 +231,7 @@ private constructor( services, flags.port, flags.healthPort, - flags.threadPoolSize + flags.threadPoolSize, ) } @@ -236,21 +239,21 @@ private constructor( fun fromFlags( flags: Flags, nameForLogging: String, - vararg services: ServerServiceDefinition + vararg services: ServerServiceDefinition, ): CommonServer = fromFlags(flags, nameForLogging, services.asIterable()) /** Constructs a [CommonServer] from command-line flags. */ fun fromFlags( flags: Flags, nameForLogging: String, - services: Iterable + services: Iterable, ): CommonServer = fromFlags(flags, nameForLogging, services.map { it.bindService() }) /** Constructs a [CommonServer] from command-line flags. */ fun fromFlags( flags: Flags, nameForLogging: String, - vararg services: BindableService + vararg services: BindableService, ): CommonServer = fromFlags(flags, nameForLogging, services.map { it.bindService() }) } } diff --git a/src/main/kotlin/org/wfanet/measurement/gcloud/spanner/SpannerDatabaseConnector.kt b/src/main/kotlin/org/wfanet/measurement/gcloud/spanner/SpannerDatabaseConnector.kt index cb635e2e6..ecea09551 100644 --- a/src/main/kotlin/org/wfanet/measurement/gcloud/spanner/SpannerDatabaseConnector.kt +++ b/src/main/kotlin/org/wfanet/measurement/gcloud/spanner/SpannerDatabaseConnector.kt @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.logging.Logger import kotlinx.coroutines.TimeoutCancellationException +import org.wfanet.measurement.common.instrumented /** * Wraps a connection to a Spanner database for convenient access to an [AsyncDatabaseClient], the @@ -57,6 +58,7 @@ class SpannerDatabaseConnector( private val transactionExecutor: Lazy = lazy { if (emulatorHost == null) { ThreadPoolExecutor(1, maxTransactionThreads, 60L, TimeUnit.SECONDS, LinkedBlockingQueue()) + .instrumented(databaseId.name) } else { // Spanner emulator only supports a single read-write transaction at a time. Executors.newSingleThreadExecutor()