Skip to content

Commit

Permalink
feat: Instrument thread pools
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjayVas committed Sep 25, 2024
1 parent 019ee45 commit e44b867
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/main/kotlin/org/wfanet/measurement/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ kt_jvm_library(
"//imports/java/com/google/gson",
"//imports/java/com/google/protobuf",
"//imports/java/com/google/protobuf/util",
"//imports/java/io/opentelemetry/api",
"//imports/java/org/jetbrains/annotations",
"//imports/java/picocli",
"//imports/kotlin/com/google/protobuf/kotlin",
Expand Down
118 changes: 118 additions & 0 deletions src/main/kotlin/org/wfanet/measurement/common/Instrumentation.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2024 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.common

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor

/** Common instrumentation for an application. */
object Instrumentation {
/** Root namespace. */
const val ROOT_NAMESPACE = "halo_cmm"

/** Singleton [OpenTelemetry] instance which may be initialized by the Java agent. */
val openTelemetry: OpenTelemetry by lazy {
// Using lazy delegate to avoid accessing global instance before it's initialized.
GlobalOpenTelemetry.get()
}

/** Common [Meter]. */
val meter: Meter by lazy { openTelemetry.getMeter(this::class.java.name) }

/** Instrumentation for thread pools. */
private val threadPools: ThreadPools by lazy { ThreadPools() }

/**
* Instruments the specified thread pool.
*
* @return the instrumented thread pool executor service
*/
fun instrumentThreadPool(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService {
return threadPools.instrument(poolName, threadPool)
}

private class ThreadPools {
private val threadPoolsByName = ConcurrentHashMap<String, ThreadPoolExecutor>()

private val sizeCounter =
meter
.upDownCounterBuilder("$NAMESPACE.size")
.setDescription("Current number of threads")
.buildObserver()
private val activeCounter =
meter
.upDownCounterBuilder("$NAMESPACE.active_count")
.setDescription("Approximate number of threads that are actively executing tasks")
.buildObserver()

init {
meter.batchCallback(::record, sizeCounter, activeCounter)
}

/** Registers the specified thread pool for instrumentation. */
fun instrument(poolName: String, threadPool: ThreadPoolExecutor): ExecutorService {
val previousRegistration = threadPoolsByName.putIfAbsent(poolName, threadPool)
check(previousRegistration == null) { "Thread pool $poolName already instrumented" }
return InstrumentedExecutorService(poolName, threadPool)
}

private fun record() {
for ((poolName, threadPool) in threadPoolsByName) {
val attributes: Attributes = Attributes.of(THREAD_POOL_NAME_ATTRIBUTE_KEY, poolName)
sizeCounter.record(threadPool.poolSize.toLong(), attributes)
activeCounter.record(threadPool.activeCount.toLong(), attributes)
}
}

/** Instrumented [ExecutorService] */
private inner class InstrumentedExecutorService(
private val poolName: String,
private val delegate: ExecutorService,
) : ExecutorService by delegate {
override fun shutdown() {
threadPoolsByName.remove(poolName)
delegate.shutdown()
}

override fun shutdownNow(): MutableList<Runnable> {
threadPoolsByName.remove(poolName)
return delegate.shutdownNow()
}
}

companion object {
private const val NAMESPACE = "$ROOT_NAMESPACE.thread_pool"
/** Attribute key for thread pool name. */
private val THREAD_POOL_NAME_ATTRIBUTE_KEY: AttributeKey<String> =
AttributeKey.stringKey("$NAMESPACE.name")
}
}
}

/**
* Instruments the [ThreadPoolExecutor].
*
* @return the instrumented [ExecutorService]
*/
fun ThreadPoolExecutor.instrumented(poolName: String): ExecutorService =
Instrumentation.instrumentThreadPool(poolName, this)
19 changes: 11 additions & 8 deletions src/main/kotlin/org/wfanet/measurement/common/grpc/CommonServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import java.util.logging.Logger
import kotlin.properties.Delegates
import org.jetbrains.annotations.VisibleForTesting
import org.wfanet.measurement.common.crypto.SigningCerts
import org.wfanet.measurement.common.instrumented
import picocli.CommandLine

class CommonServer
Expand All @@ -48,8 +49,10 @@ private constructor(
init {
require(threadPoolSize > 1)
}

private val executor: ExecutorService =
ThreadPoolExecutor(1, threadPoolSize, 60L, TimeUnit.SECONDS, LinkedBlockingQueue())
.instrumented(nameForLogging)
private val healthStatusManager = HealthStatusManager()
private val started = AtomicBoolean(false)

Expand Down Expand Up @@ -158,7 +161,7 @@ private constructor(
@set:CommandLine.Option(
names = ["--require-client-auth"],
description = ["Require client auth"],
defaultValue = "true"
defaultValue = "true",
)
var clientAuthRequired by Delegates.notNull<Boolean>()
private set
Expand All @@ -181,7 +184,7 @@ private constructor(
@set:CommandLine.Option(
names = ["--debug-verbose-grpc-server-logging"],
description = ["Debug mode: log ALL gRPC requests and responses"],
defaultValue = "false"
defaultValue = "false",
)
var debugVerboseGrpcLogging by Delegates.notNull<Boolean>()
private set
Expand All @@ -201,7 +204,7 @@ private constructor(
services: Iterable<ServerServiceDefinition>,
port: Int = 0,
healthPort: Int = 0,
threadPoolSize: Int = DEFAULT_THREAD_POOL_SIZE
threadPoolSize: Int = DEFAULT_THREAD_POOL_SIZE,
): CommonServer {
return CommonServer(
nameForLogging,
Expand All @@ -218,7 +221,7 @@ private constructor(
fun fromFlags(
flags: Flags,
nameForLogging: String,
services: Iterable<ServerServiceDefinition>
services: Iterable<ServerServiceDefinition>,
): CommonServer {
return fromParameters(
flags.debugVerboseGrpcLogging,
Expand All @@ -228,29 +231,29 @@ private constructor(
services,
flags.port,
flags.healthPort,
flags.threadPoolSize
flags.threadPoolSize,
)
}

/** Constructs a [CommonServer] from command-line flags. */
fun fromFlags(
flags: Flags,
nameForLogging: String,
vararg services: ServerServiceDefinition
vararg services: ServerServiceDefinition,
): CommonServer = fromFlags(flags, nameForLogging, services.asIterable())

/** Constructs a [CommonServer] from command-line flags. */
fun fromFlags(
flags: Flags,
nameForLogging: String,
services: Iterable<BindableService>
services: Iterable<BindableService>,
): CommonServer = fromFlags(flags, nameForLogging, services.map { it.bindService() })

/** Constructs a [CommonServer] from command-line flags. */
fun fromFlags(
flags: Flags,
nameForLogging: String,
vararg services: BindableService
vararg services: BindableService,
): CommonServer = fromFlags(flags, nameForLogging, services.map { it.bindService() })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import kotlinx.coroutines.TimeoutCancellationException
import org.wfanet.measurement.common.instrumented

/**
* Wraps a connection to a Spanner database for convenient access to an [AsyncDatabaseClient], the
Expand Down Expand Up @@ -57,6 +58,7 @@ class SpannerDatabaseConnector(
private val transactionExecutor: Lazy<ExecutorService> = lazy {
if (emulatorHost == null) {
ThreadPoolExecutor(1, maxTransactionThreads, 60L, TimeUnit.SECONDS, LinkedBlockingQueue())
.instrumented(databaseId.name)
} else {
// Spanner emulator only supports a single read-write transaction at a time.
Executors.newSingleThreadExecutor()
Expand Down

0 comments on commit e44b867

Please sign in to comment.