From 421e287385bc025c85c68dfe1168ab0ac1194138 Mon Sep 17 00:00:00 2001 From: Min Huang Date: Tue, 14 Nov 2023 17:30:10 -0800 Subject: [PATCH] AsyncGauge returns error code for when measurement breaches max timeout (#28) By default, the error code is -1, but it will be configurable. --- gradle.properties | 2 +- .../io/tehuti/metrics/AsyncGaugeConfig.java | 17 +++++ .../io/tehuti/metrics/stats/AsyncGauge.java | 3 +- .../tehuti/metrics/stats/AsyncGaugeTest.java | 66 +++++++++++++++++++ 4 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java diff --git a/gradle.properties b/gradle.properties index c464b9e..728edae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group=io.tehuti archivesBaseName=tehuti -version=0.11.0 +version=0.11.1 signing.enabled=false signing.keyId= diff --git a/src/main/java/io/tehuti/metrics/AsyncGaugeConfig.java b/src/main/java/io/tehuti/metrics/AsyncGaugeConfig.java index 1606052..896405d 100644 --- a/src/main/java/io/tehuti/metrics/AsyncGaugeConfig.java +++ b/src/main/java/io/tehuti/metrics/AsyncGaugeConfig.java @@ -7,6 +7,8 @@ * Configuration for AsyncGauge */ public class AsyncGaugeConfig { + public static final long DEFAULT_MAX_TIMEOUT_ERROR_CODE = -1L; + // Thread pool for metrics measurement; ideally these threads should be daemon threads private final ExecutorService metricsMeasurementExecutor; // The max time to wait for metrics measurement to complete @@ -16,9 +18,19 @@ public class AsyncGaugeConfig { // if this limit is exceeded, AsyncGauge will return the cached value private final long initialMetricsMeasurementTimeoutInMs; + // The error code returned by the measurement task when it is cancelled due to the max timeout limit + private final long maxTimeoutErrorCode; + public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor, long maxMetricsMeasurementTimeoutInMs, long initialMetricsMeasurementTimeoutInMs) { + this(metricsMeasurementExecutor, maxMetricsMeasurementTimeoutInMs, initialMetricsMeasurementTimeoutInMs, DEFAULT_MAX_TIMEOUT_ERROR_CODE); + } + + public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor, + long maxMetricsMeasurementTimeoutInMs, + long initialMetricsMeasurementTimeoutInMs, + long maxTimeoutErrorCode) { Utils.notNull(metricsMeasurementExecutor); if (maxMetricsMeasurementTimeoutInMs <= 0) { throw new IllegalArgumentException("maxMetricsMeasurementTimeoutInMs must be positive"); @@ -29,6 +41,7 @@ public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor, this.metricsMeasurementExecutor = metricsMeasurementExecutor; this.maxMetricsMeasurementTimeoutInMs = maxMetricsMeasurementTimeoutInMs; this.initialMetricsMeasurementTimeoutInMs = initialMetricsMeasurementTimeoutInMs; + this.maxTimeoutErrorCode = maxTimeoutErrorCode; } public ExecutorService getMetricsMeasurementExecutor() { @@ -42,4 +55,8 @@ public long getMaxMetricsMeasurementTimeoutInMs() { public long getInitialMetricsMeasurementTimeoutInMs() { return initialMetricsMeasurementTimeoutInMs; } + + public long getMaxTimeoutErrorCode() { + return maxTimeoutErrorCode; + } } diff --git a/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java b/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java index 097a893..007e44b 100644 --- a/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java +++ b/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java @@ -105,12 +105,13 @@ public double measure(MetricConfig config, long now) { if (System.currentTimeMillis() - lastMeasurementStartTimeInMs < asyncGaugeConfig.getMaxMetricsMeasurementTimeoutInMs()) { return cachedMeasurement; } else { + cachedMeasurement = asyncGaugeConfig.getMaxTimeoutErrorCode(); lastMeasurementFuture.cancel(true); String warningMessagePrefix = String.format( "The last measurement for metric %s is still running. " + "Cancel it to prevent OutOfMemory issue.", metricName); if (!REDUNDANT_LOG_FILTER.isRedundantLog(warningMessagePrefix)) { - LOGGER.warn(String.format("%s Return the cached value: %f", warningMessagePrefix, cachedMeasurement)); + LOGGER.warn(String.format("%s Return the error code: %f", warningMessagePrefix, cachedMeasurement)); } return submitNewMeasurementTask(config, now, asyncGaugeConfig); } diff --git a/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java b/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java new file mode 100644 index 0000000..fb8ea6b --- /dev/null +++ b/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java @@ -0,0 +1,66 @@ +package io.tehuti.metrics.stats; + +import static org.junit.Assert.assertEquals; + +import io.tehuti.metrics.AsyncGaugeConfig; +import io.tehuti.metrics.MetricConfig; +import java.util.concurrent.Executors; +import org.junit.Assert; +import org.junit.Test; + + +public class AsyncGaugeTest { + @Test + public void testTimeoutMeasurementReturnsErrorCode() throws InterruptedException { + // Create a AsyncGauge metric whose measurement will block forever + AsyncGauge gauge = new AsyncGauge((c, t) -> { + try { + // Block forever + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + // Interrupt will unblock the measurement + } + return 0.0; + }, "testMetric"); + // Set AsyncGaugeConfig with max timeout of 100 ms and initial timeout of 10 ms + AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(), 100, 10); + MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig); + // The first measurement should return 0 because the measurement task will not complete after the initial timeout + assertEquals("The first measurement should return 0 because the measurement task will not complete after the initial timeout", + 0.0, gauge.measure(metricConfig, System.currentTimeMillis()), 0.01); + // Wait for the max timeout + Thread.sleep(101); + /** + * The second measurement should return {@link AsyncGaugeConfig#DEFAULT_MAX_TIMEOUT_ERROR_CODE} because the + * measurement task will not complete after the max timeout + */ + assertEquals("The second measurement should return error code", + AsyncGaugeConfig.DEFAULT_MAX_TIMEOUT_ERROR_CODE, gauge.measure(metricConfig, System.currentTimeMillis()), 0.01); + } + + @Test(timeout = 10000) + public void testCallerOfAsyncGaugeWillNeverBeBlocked() { + // Create a AsyncGauge metric whose measurement will block forever even if it's interrupted + AsyncGauge gauge = new AsyncGauge((c, t) -> { + while (true) { + try { + // Block forever + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + // Continue the endless loop + } + } + }, "testMetric"); + // Set AsyncGaugeConfig with max timeout of 100 ms, initial timeout of 10 ms and a daemon thread pool + AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(r -> { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }), 100, 10); + MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig); + // Test that caller of AsyncGauge.measure() will never be blocked + gauge.measure(metricConfig, System.currentTimeMillis()); + // If the caller is blocked, the following line will never be executed + System.out.println("Caller of AsyncGauge.measure() is not blocked"); + } +}