Skip to content

Commit

Permalink
AsyncGauge returns error code for when measurement breaches max timeo…
Browse files Browse the repository at this point in the history
…ut (#28)

By default, the error code is -1, but it will be configurable.
  • Loading branch information
huangminchn authored Nov 15, 2023
1 parent 826119e commit 421e287
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
group=io.tehuti
archivesBaseName=tehuti
version=0.11.0
version=0.11.1

signing.enabled=false
signing.keyId=
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/tehuti/metrics/AsyncGaugeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* Configuration for AsyncGauge
*/
public class AsyncGaugeConfig {
public static final long DEFAULT_MAX_TIMEOUT_ERROR_CODE = -1L;

// Thread pool for metrics measurement; ideally these threads should be daemon threads
private final ExecutorService metricsMeasurementExecutor;
// The max time to wait for metrics measurement to complete
Expand All @@ -16,9 +18,19 @@ public class AsyncGaugeConfig {
// if this limit is exceeded, AsyncGauge will return the cached value
private final long initialMetricsMeasurementTimeoutInMs;

// The error code returned by the measurement task when it is cancelled due to the max timeout limit
private final long maxTimeoutErrorCode;

public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor,
long maxMetricsMeasurementTimeoutInMs,
long initialMetricsMeasurementTimeoutInMs) {
this(metricsMeasurementExecutor, maxMetricsMeasurementTimeoutInMs, initialMetricsMeasurementTimeoutInMs, DEFAULT_MAX_TIMEOUT_ERROR_CODE);
}

public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor,
long maxMetricsMeasurementTimeoutInMs,
long initialMetricsMeasurementTimeoutInMs,
long maxTimeoutErrorCode) {
Utils.notNull(metricsMeasurementExecutor);
if (maxMetricsMeasurementTimeoutInMs <= 0) {
throw new IllegalArgumentException("maxMetricsMeasurementTimeoutInMs must be positive");
Expand All @@ -29,6 +41,7 @@ public AsyncGaugeConfig(ExecutorService metricsMeasurementExecutor,
this.metricsMeasurementExecutor = metricsMeasurementExecutor;
this.maxMetricsMeasurementTimeoutInMs = maxMetricsMeasurementTimeoutInMs;
this.initialMetricsMeasurementTimeoutInMs = initialMetricsMeasurementTimeoutInMs;
this.maxTimeoutErrorCode = maxTimeoutErrorCode;
}

public ExecutorService getMetricsMeasurementExecutor() {
Expand All @@ -42,4 +55,8 @@ public long getMaxMetricsMeasurementTimeoutInMs() {
public long getInitialMetricsMeasurementTimeoutInMs() {
return initialMetricsMeasurementTimeoutInMs;
}

public long getMaxTimeoutErrorCode() {
return maxTimeoutErrorCode;
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/tehuti/metrics/stats/AsyncGauge.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ public double measure(MetricConfig config, long now) {
if (System.currentTimeMillis() - lastMeasurementStartTimeInMs < asyncGaugeConfig.getMaxMetricsMeasurementTimeoutInMs()) {
return cachedMeasurement;
} else {
cachedMeasurement = asyncGaugeConfig.getMaxTimeoutErrorCode();
lastMeasurementFuture.cancel(true);
String warningMessagePrefix = String.format(
"The last measurement for metric %s is still running. " + "Cancel it to prevent OutOfMemory issue.",
metricName);
if (!REDUNDANT_LOG_FILTER.isRedundantLog(warningMessagePrefix)) {
LOGGER.warn(String.format("%s Return the cached value: %f", warningMessagePrefix, cachedMeasurement));
LOGGER.warn(String.format("%s Return the error code: %f", warningMessagePrefix, cachedMeasurement));
}
return submitNewMeasurementTask(config, now, asyncGaugeConfig);
}
Expand Down
66 changes: 66 additions & 0 deletions src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.tehuti.metrics.stats;

import static org.junit.Assert.assertEquals;

import io.tehuti.metrics.AsyncGaugeConfig;
import io.tehuti.metrics.MetricConfig;
import java.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;


public class AsyncGaugeTest {
@Test
public void testTimeoutMeasurementReturnsErrorCode() throws InterruptedException {
// Create a AsyncGauge metric whose measurement will block forever
AsyncGauge gauge = new AsyncGauge((c, t) -> {
try {
// Block forever
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// Interrupt will unblock the measurement
}
return 0.0;
}, "testMetric");
// Set AsyncGaugeConfig with max timeout of 100 ms and initial timeout of 10 ms
AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(), 100, 10);
MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig);
// The first measurement should return 0 because the measurement task will not complete after the initial timeout
assertEquals("The first measurement should return 0 because the measurement task will not complete after the initial timeout",
0.0, gauge.measure(metricConfig, System.currentTimeMillis()), 0.01);
// Wait for the max timeout
Thread.sleep(101);
/**
* The second measurement should return {@link AsyncGaugeConfig#DEFAULT_MAX_TIMEOUT_ERROR_CODE} because the
* measurement task will not complete after the max timeout
*/
assertEquals("The second measurement should return error code",
AsyncGaugeConfig.DEFAULT_MAX_TIMEOUT_ERROR_CODE, gauge.measure(metricConfig, System.currentTimeMillis()), 0.01);
}

@Test(timeout = 10000)
public void testCallerOfAsyncGaugeWillNeverBeBlocked() {
// Create a AsyncGauge metric whose measurement will block forever even if it's interrupted
AsyncGauge gauge = new AsyncGauge((c, t) -> {
while (true) {
try {
// Block forever
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// Continue the endless loop
}
}
}, "testMetric");
// Set AsyncGaugeConfig with max timeout of 100 ms, initial timeout of 10 ms and a daemon thread pool
AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}), 100, 10);
MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig);
// Test that caller of AsyncGauge.measure() will never be blocked
gauge.measure(metricConfig, System.currentTimeMillis());
// If the caller is blocked, the following line will never be executed
System.out.println("Caller of AsyncGauge.measure() is not blocked");
}
}

0 comments on commit 421e287

Please sign in to comment.