From a50499f2be2d8fffbf0e653cada28b48a5262b19 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Wed, 13 Mar 2024 17:03:50 -0700 Subject: [PATCH] Optimization for slow AsyncGauge execution (#31) * Optimization for slow AsyncGauge execution This PR introduces a dynamic way to track slow AsyncGauge metric execution and tries not to block the caller thread as much as possible. In the high-level, this PR introduces a `AsyncGaugeExecutor`, which implements the following strategy: 1. There are two executors and one for regular metrics and the other one is for slow metrics. 2. All the metric evaluations are triggered by the caller. 3. If the actual metric execution time exceeds the configured slow metric threshold, it will be moved to slow metric tracking map, which indicates the following behaviors: a. The next metric measurement call will return the cached value immediately. b. The submitted measurable will be executed asynchronously. c. If the actual measurement runtime latency becomes lower than the slow metric threshold, it will be moved out of slow metric tracking map. 4. If the actual metric execution time belows the configured slow metric threshold, the following behaviors will be observed: a. After submitting the measurable to the regular executor, it will wait up to configured {@link AsyncGaugeExecutor#initialMetricsMeasurementTimeoutInMs} to collect the latest result. b. If it can't collect the latest value in step #a, the next call will examine the previous execution to decide whether it should be put into the slow metric tracking map or not. 5. There is an async thread to clean up inactive metrics from slow metric tracking map to avoid the accumulation of garbage because of metric deletion. There are several config params of `AsyncGaugeExecutor` and the user can tune it according to the actual load pattern, and the caller can construct a global `AsyncGaugeExecutor` and pass it to `MetricsRepository` via `MetricConfig`. * Addressed comments * Addressed new comments --- gradle.properties | 2 +- .../java/io/tehuti/metrics/MetricConfig.java | 12 +- .../io/tehuti/metrics/MetricsRepository.java | 13 +- .../io/tehuti/metrics/stats/AsyncGauge.java | 388 ++++++++++++++---- .../tehuti/metrics/stats/AsyncGaugeTest.java | 106 ++++- src/test/resources/log4j.properties | 2 +- 6 files changed, 431 insertions(+), 92 deletions(-) diff --git a/gradle.properties b/gradle.properties index 1c796d6..cc85612 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group=io.tehuti archivesBaseName=tehuti -version=0.11.3 +version=0.11.4 signing.enabled=false signing.keyId= diff --git a/src/main/java/io/tehuti/metrics/MetricConfig.java b/src/main/java/io/tehuti/metrics/MetricConfig.java index 2348fa2..77c6e42 100644 --- a/src/main/java/io/tehuti/metrics/MetricConfig.java +++ b/src/main/java/io/tehuti/metrics/MetricConfig.java @@ -29,19 +29,19 @@ public class MetricConfig { private long timeWindowMs; private TimeUnit unit; private long expirationAge; - private AsyncGaugeConfig asyncGaugeConfig; + private AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor; public MetricConfig() { - this(AsyncGauge.DEFAULT_ASYNC_GAUGE_CONFIG); + this(AsyncGauge.DEFAULT_ASYNC_GAUGE_EXECUTOR); } - public MetricConfig(AsyncGaugeConfig asyncGaugeConfig) { + public MetricConfig(AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor) { super(); this.quota = null; this.samples = 2; this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.unit = TimeUnit.SECONDS; - this.asyncGaugeConfig = asyncGaugeConfig; + this.asyncGaugeExecutor = asyncGaugeExecutor; updateExpirationAge(); } @@ -93,7 +93,7 @@ public long expirationAge() { return this.expirationAge; } - public AsyncGaugeConfig getAsyncGaugeConfig() { - return asyncGaugeConfig; + public AsyncGauge.AsyncGaugeExecutor getAsyncGaugeExecutor() { + return this.asyncGaugeExecutor; } } diff --git a/src/main/java/io/tehuti/metrics/MetricsRepository.java b/src/main/java/io/tehuti/metrics/MetricsRepository.java index 5bdc109..40312d3 100644 --- a/src/main/java/io/tehuti/metrics/MetricsRepository.java +++ b/src/main/java/io/tehuti/metrics/MetricsRepository.java @@ -13,6 +13,7 @@ package io.tehuti.metrics; import io.tehuti.metrics.stats.AsyncGauge; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -248,8 +249,8 @@ public Metric getMetric(String name) { return this.metrics.get(name); } - public AsyncGaugeConfig getAsyncGaugeConfig() { - return this.config.getAsyncGaugeConfig(); + public AsyncGauge.AsyncGaugeExecutor getAsyncGaugeExecutor() { + return this.config.getAsyncGaugeExecutor(); } /** @@ -261,9 +262,11 @@ public AsyncGaugeConfig getAsyncGaugeConfig() { public void close() { for (MetricsReporter reporter : this.reporters) reporter.close(); - AsyncGaugeConfig asyncGaugeConfig = getAsyncGaugeConfig(); - if (asyncGaugeConfig != null) { - asyncGaugeConfig.getMetricsMeasurementExecutor().shutdownNow(); + AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor = getAsyncGaugeExecutor(); + try { + asyncGaugeExecutor.close(); + } catch (IOException e) { + throw new RuntimeException(e); } } } diff --git a/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java b/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java index f921e41..5e25718 100644 --- a/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java +++ b/src/main/java/io/tehuti/metrics/stats/AsyncGauge.java @@ -6,10 +6,22 @@ import io.tehuti.metrics.NamedMeasurableStat; import io.tehuti.utils.DaemonThreadFactory; import io.tehuti.utils.RedundantLogFilter; -import java.util.concurrent.CompletableFuture; +import io.tehuti.utils.SystemTime; +import io.tehuti.utils.Time; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.log4j.LogManager; @@ -21,20 +33,16 @@ * return the cached value. This is useful for metrics that are expensive to measure. */ public class AsyncGauge implements NamedMeasurableStat { - private static final Logger LOGGER = LogManager.getLogger(AsyncGauge.class); - private static final RedundantLogFilter REDUNDANT_LOG_FILTER = RedundantLogFilter.getRedundantLogFilter(); private final String metricName; private double cachedMeasurement = 0.0; - private long lastMeasurementStartTimeInMs = System.currentTimeMillis(); - private CompletableFuture lastMeasurementFuture = null; + private long lastMeasurementStartTimeInMs = -1; + private long lastMeasurementLatencyInMs = -1; + private Future lastMeasurementFuture = null; private final Measurable measurable; - public static final AsyncGaugeConfig DEFAULT_ASYNC_GAUGE_CONFIG = - new AsyncGaugeConfig(Executors.newFixedThreadPool(10, - new DaemonThreadFactory("Default_Async_Gauge_Executor")), - TimeUnit.MINUTES.toMillis(1), - 500); + public static final AsyncGaugeExecutor DEFAULT_ASYNC_GAUGE_EXECUTOR = + new AsyncGaugeExecutor.Builder().build(); public AsyncGauge(Measurable measurable, String metricName) { this.measurable = measurable; @@ -60,89 +68,329 @@ public void record(double value, long now) { */ @Override public double measure(MetricConfig config, long now) { - AsyncGaugeConfig asyncGaugeConfig = config.getAsyncGaugeConfig(); - if (asyncGaugeConfig == null) { - asyncGaugeConfig = DEFAULT_ASYNC_GAUGE_CONFIG; + AsyncGaugeExecutor asyncGaugeExecutor = config.getAsyncGaugeExecutor(); + if (asyncGaugeExecutor == null) { + asyncGaugeExecutor = DEFAULT_ASYNC_GAUGE_EXECUTOR; } - // If the thread pool is shutdown, return the cached value - if (asyncGaugeConfig.getMetricsMeasurementExecutor().isShutdown()) { - return cachedMeasurement; + return asyncGaugeExecutor.measure(this, config, now); + } + + /** + * In the high-level, {@link AsyncGaugeExecutor} works as follows: + * 1. There are two executors and one for regular metrics and the other one is for slow metrics. + * 2. All the metric evaluations are triggered by the caller. + * 3. If the actual metric execution time exceeds the configured slow metric threshold, it will be moved to slow metric tracking map, + * which indicates the following behaviors: + * a. The next metric measurement call will return the cached value immediately. + * b. The submitted measurable will be executed asynchronously. + * c. If the actual measurement runtime latency becomes lower than the slow metric threshold, it will be moved out + * of slow metric tracking map. + * 4. If the actual metric execution time belows the configured slow metric threshold, the following behaviors will be observed: + * a. After submitting the measurable to the regular executor, it will wait up to configured {@link AsyncGaugeExecutor#initialMetricsMeasurementTimeoutInMs} + * to collect the latest result. + * b. If it can't collect the latest value in step #a, the next call will examine the previous execution to decide + * whether it should be put into the slow metric tracking map or not. + * + * 5. There is an async thread to clean up inactive metrics from slow metric tracking map to avoid the accumulation of garbage + * because of metric deletion. + */ + public static class AsyncGaugeExecutor implements Closeable { + + public static class Builder { + private int metricMeasurementThreadCount = 3; + private int slowMetricMeasurementThreadCount = 7; + private long initialMetricsMeasurementTimeoutInMs = 500; + private int slowMetricThresholdMs = 100; + private long maxMetricsMeasurementTimeoutInMs = TimeUnit.MINUTES.toMillis(1); + private int maxTimeoutErrorCode = -1; + private long inactiveSlowMetricCleanupThresholdInMs = TimeUnit.MINUTES.toMillis(30); + private Time time = new SystemTime(); + private long inactiveSlowMetricCleanupIntervalInMs = TimeUnit.MINUTES.toMillis(5); + + public Builder setMetricMeasurementThreadCount(int metricMeasurementThreadCount) { + this.metricMeasurementThreadCount = metricMeasurementThreadCount; + return this; + } + + public Builder setSlowMetricMeasurementThreadCount(int slowMetricMeasurementThreadCount) { + this.slowMetricMeasurementThreadCount = slowMetricMeasurementThreadCount; + return this; + } + + public Builder setInitialMetricsMeasurementTimeoutInMs(long initialMetricsMeasurementTimeoutInMs) { + this.initialMetricsMeasurementTimeoutInMs = initialMetricsMeasurementTimeoutInMs; + return this; + } + + public Builder setMaxMetricsMeasurementTimeoutInMs(long maxMetricsMeasurementTimeoutInMs) { + this.maxMetricsMeasurementTimeoutInMs = maxMetricsMeasurementTimeoutInMs; + return this; + } + + public Builder setMaxTimeoutErrorCode(int maxTimeoutErrorCode) { + this.maxTimeoutErrorCode = maxTimeoutErrorCode; + return this; + } + + public Builder setSlowMetricThresholdMs(int slowMetricThresholdMs) { + this.slowMetricThresholdMs = slowMetricThresholdMs; + return this; + } + + public Builder setInactiveSlowMetricCleanupThresholdInMs(long inactiveSlowMetricCleanupThresholdInMs) { + this.inactiveSlowMetricCleanupThresholdInMs = inactiveSlowMetricCleanupThresholdInMs; + return this; + } + + public Builder setInactiveSlowMetricCleanupIntervalInMs(long inactiveSlowMetricCleanupIntervalInMs) { + this.inactiveSlowMetricCleanupIntervalInMs = inactiveSlowMetricCleanupIntervalInMs; + return this; + } + + public Builder setTime(Time time) { + this.time = time; + return this; + } + + public AsyncGaugeExecutor build() { + return new AsyncGaugeExecutor(this); + } + } + + + private static final RedundantLogFilter REDUNDANT_LOG_FILTER = RedundantLogFilter.getRedundantLogFilter(); + + private static final Logger LOGGER = LogManager.getLogger(AsyncGaugeExecutor.class); + + + // Thread pool for the execution of regular metrics + private final ExecutorService metricsMeasurementExecutor; + // Thread pool for the execution of slow metrics + private final ExecutorService slowMetricsMeasurementExecutor; + // The max time to wait for metrics measurement to complete + // If this limit is exceeded, the measurement task in the thread pool will be cancelled + private final long maxMetricsMeasurementTimeoutInMs; + // After the metrics measurement task is submitted to the thread pool, it will try to wait for this amount of time; + // if this limit is exceeded, AsyncGauge will return the cached value + private final long initialMetricsMeasurementTimeoutInMs; + + // The error code returned by the measurement task when it is cancelled due to the max timeout limit + private final int maxTimeoutErrorCode; + + // If the execution time of the metric exceeds the threshold, the metric will be moved to slow metric set + private final int slowMetricThresholdMs; + + // This is used to decide the threshold to clean up inactive slow metrics + private final long inactiveSlowMetricCleanupThresholdInMs; + + private final ConcurrentHashMap slowAsyncGaugeAccessMap = new ConcurrentHashMap<>(); + private final Time time; + + private final ScheduledExecutorService inactiveSlowMetricCleanupExecutor = + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("Inactive_Slow_AsyncGauge_Cleanup")); + + public AsyncGaugeExecutor(Builder builder) { + if (builder.metricMeasurementThreadCount <= 0) { + throw new IllegalArgumentException("metricMeasurementThreadCount must be positive"); + } + if (builder.slowMetricMeasurementThreadCount <= 0) { + throw new IllegalArgumentException("slowMetricMeasurementThreadCount must be positive"); + } + if (builder.maxMetricsMeasurementTimeoutInMs <= 0) { + throw new IllegalArgumentException("maxMetricsMeasurementTimeoutInMs must be positive"); + } + if (builder.initialMetricsMeasurementTimeoutInMs <= 0) { + throw new IllegalArgumentException("initialMetricsMeasurementTimeoutInMs must be positive"); + } + if (builder.slowMetricThresholdMs <= 0) { + throw new IllegalArgumentException("slowMetricThresholdMs must be positive"); + } + if (builder.slowMetricThresholdMs > builder.maxMetricsMeasurementTimeoutInMs) { + throw new IllegalArgumentException("slowMetricThresholdMs: " + builder.slowMetricThresholdMs + + " must be smaller than or equal to maxMetricsMeasurementTimeoutInMs: "+ builder.maxMetricsMeasurementTimeoutInMs); + } + if (builder.inactiveSlowMetricCleanupThresholdInMs <= 0) { + throw new IllegalArgumentException("slowMetricCleanupThresholdInMs must be positive"); + } + if (builder.inactiveSlowMetricCleanupIntervalInMs <= 0) { + throw new IllegalArgumentException("inactiveSlowMetricCleanupIntervalInMs must be positive"); + } + this.metricsMeasurementExecutor = new ThreadPoolExecutor(builder.metricMeasurementThreadCount, + builder.metricMeasurementThreadCount, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(10000), // 10k items to avoid OOM + new DaemonThreadFactory("Async_Gauge_Executor"));; + this.slowMetricsMeasurementExecutor = new ThreadPoolExecutor(builder.slowMetricMeasurementThreadCount, + builder.slowMetricMeasurementThreadCount, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(10000), // 10k items to avoid OOM + new DaemonThreadFactory("Slow_Async_Gauge_Executor"));; + this.maxMetricsMeasurementTimeoutInMs = builder.maxMetricsMeasurementTimeoutInMs; + this.initialMetricsMeasurementTimeoutInMs = builder.initialMetricsMeasurementTimeoutInMs; + this.maxTimeoutErrorCode = builder.maxTimeoutErrorCode; + this.slowMetricThresholdMs = builder.slowMetricThresholdMs; + this.inactiveSlowMetricCleanupThresholdInMs = builder.inactiveSlowMetricCleanupThresholdInMs; + this.time = builder.time; + + // Schedule the inactive metrics cleanup + inactiveSlowMetricCleanupExecutor.scheduleWithFixedDelay( + getInActiveSlowMetricCleanupRunnable(), + builder.inactiveSlowMetricCleanupIntervalInMs, + builder.inactiveSlowMetricCleanupIntervalInMs, + TimeUnit.MILLISECONDS + ); } - if (lastMeasurementFuture == null) { - // First time running measurement; or previous measurement finished fast enough - return submitNewMeasurementTask(config, now, asyncGaugeConfig); - } else { + public Map getSlowAsyncGaugeAccessMap() { + return Collections.unmodifiableMap(slowAsyncGaugeAccessMap); + } + + private Runnable getInActiveSlowMetricCleanupRunnable() { + return () -> { + Iterator> iterator = slowAsyncGaugeAccessMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (time.milliseconds() - entry.getValue() >= inactiveSlowMetricCleanupThresholdInMs) { + LOGGER.info("Removing inactive slow async gauge metric from slow metric tracking: " + entry.getKey().metricName); + iterator.remove(); + } + } + }; + } + + public synchronized double measure(AsyncGauge asyncGauge, MetricConfig config, long now) { + boolean isSlowMetric = slowAsyncGaugeAccessMap.containsKey(asyncGauge); + + if (isSlowMetric && slowMetricsMeasurementExecutor.isShutdown() + || !isSlowMetric && metricsMeasurementExecutor.isShutdown()) { + return asyncGauge.cachedMeasurement; + } + + if (asyncGauge.lastMeasurementFuture == null) { + // First time running measurement; or previous measurement finished fast enough + return submitNewMeasurementTask(asyncGauge, config, now, isSlowMetric); + } // If the last measurement future exists, meaning the last measurement didn't finish fast enough. In this case: // 1. If the last measurement future is done, update the cached value, log which metric measurement is slow. // 2. If the last measurement future is still running, cancel it to prevent OutOfMemory issue, and log. - if (lastMeasurementFuture.isDone()) { + if (asyncGauge.lastMeasurementFuture.isDone()) { try { - cachedMeasurement = lastMeasurementFuture.get(); - long measurementTimeInMs = System.currentTimeMillis() - lastMeasurementStartTimeInMs; - String warningMessagePrefix = String.format("The measurement for metric %s", metricName); - if (!REDUNDANT_LOG_FILTER.isRedundantLog(warningMessagePrefix)) { - LOGGER.warn(String.format("%s took %d ms; the metric value is %f", warningMessagePrefix, measurementTimeInMs, - cachedMeasurement)); + /** + * Get the measurement duration of last execution to see whether we should move the metric to {@link #slowAsyncGaugeSet} or not. + */ + if (asyncGauge.lastMeasurementLatencyInMs > slowMetricThresholdMs) { + if (!isSlowMetric) { + LOGGER.warn(String.format("The measurement for metric %s took %d ms; the metric value is %f, moved this metric to slow metric tracking", + asyncGauge.metricName, asyncGauge.lastMeasurementLatencyInMs, asyncGauge.cachedMeasurement)); + isSlowMetric = true; + slowAsyncGaugeAccessMap.put(asyncGauge, time.milliseconds()); + } + } else if (isSlowMetric) { + slowAsyncGaugeAccessMap.remove(asyncGauge); + isSlowMetric = false; + LOGGER.info("The measurement for metric: " + asyncGauge.metricName + " took " + + asyncGauge.lastMeasurementLatencyInMs + " ms, moved this metric out of slow metric tracking"); } + asyncGauge.cachedMeasurement = asyncGauge.lastMeasurementFuture.get(); } catch (ExecutionException e) { - String errorMessage = String.format("Failed to get a done measurement future for metric %s. ", metricName); + String errorMessage = String.format("Failed to get a done measurement future for metric %s. ", asyncGauge.metricName); if (!REDUNDANT_LOG_FILTER.isRedundantLog(errorMessage)) { LOGGER.error(errorMessage, e); } // Update the cached value to a negative value to indicate the measurement failure - cachedMeasurement = -1.0; + asyncGauge.cachedMeasurement = -1.0; } catch (InterruptedException e) { - throw new RuntimeException("Metric measurement is interrupted for metric " + metricName, e); + throw new RuntimeException("Metric measurement is interrupted for metric " + asyncGauge.metricName, e); } // Always try to get the freshest measurement value // Reason: let's say the initial wait time is 500ms, and the previous measurement took 600ms. In this case, if we // return the previous measurement value, it would be 59 seconds stale, assuming the measurement interval is 1 minute. - return submitNewMeasurementTask(config, now, asyncGaugeConfig); - } else { - // If the last measurement future is still running but hasn't exceeded the max timeout, keep it running and return the cached value. - // Otherwise, cancel the last measurement future to prevent OutOfMemory issue, and submit a new measurement task. - if (System.currentTimeMillis() - lastMeasurementStartTimeInMs < asyncGaugeConfig.getMaxMetricsMeasurementTimeoutInMs()) { - return cachedMeasurement; - } else { - cachedMeasurement = asyncGaugeConfig.getMaxTimeoutErrorCode(); - lastMeasurementFuture.cancel(true); - String warningMessagePrefix = String.format( - "The last measurement for metric %s is still running. " + "Cancel it to prevent OutOfMemory issue.", - metricName); - if (!REDUNDANT_LOG_FILTER.isRedundantLog(warningMessagePrefix)) { - LOGGER.warn(String.format("%s Return the error code: %f", warningMessagePrefix, cachedMeasurement)); + return submitNewMeasurementTask(asyncGauge, config, now, isSlowMetric); + } + // If the last measurement future is still running but hasn't exceeded the max timeout, keep it running and return the cached value. + // Otherwise, cancel the last measurement future to prevent OutOfMemory issue, and submit a new measurement task. + if (asyncGauge.lastMeasurementStartTimeInMs < 0 // Measurable hasn't been executed yet + || time.milliseconds() - asyncGauge.lastMeasurementStartTimeInMs < maxMetricsMeasurementTimeoutInMs) { + return asyncGauge.cachedMeasurement; + } + asyncGauge.cachedMeasurement = maxTimeoutErrorCode; + asyncGauge.lastMeasurementFuture.cancel(true); + String warningMessagePrefix = String.format( + "The last measurement for metric %s is still running. " + "Cancel it to prevent OutOfMemory issue.", + asyncGauge.metricName); + if (!REDUNDANT_LOG_FILTER.isRedundantLog(warningMessagePrefix)) { + LOGGER.warn(String.format("%s Return the error code: %f, and put it in slow metric set", + warningMessagePrefix, asyncGauge.cachedMeasurement)); + } + if (!isSlowMetric) { + slowAsyncGaugeAccessMap.put(asyncGauge, time.milliseconds()); + isSlowMetric = true; + } + return submitNewMeasurementTask(asyncGauge, config, now, isSlowMetric); + } + + private double submitNewMeasurementTask(AsyncGauge asyncGauge, MetricConfig config, long now, boolean isSlowMetric) { + try { + // Reset the tracking for the new measurement task + asyncGauge.lastMeasurementStartTimeInMs = -1; + asyncGauge.lastMeasurementLatencyInMs = -1; + asyncGauge.lastMeasurementFuture = null; + asyncGauge.lastMeasurementFuture = (isSlowMetric ? slowMetricsMeasurementExecutor : metricsMeasurementExecutor).submit( + () -> { + try { + asyncGauge.lastMeasurementStartTimeInMs = time.milliseconds(); + return asyncGauge.measurable.measure(config, now); + } finally { + asyncGauge.lastMeasurementLatencyInMs = time.milliseconds() - asyncGauge.lastMeasurementStartTimeInMs; + } + } + ); + + if (isSlowMetric) { + // No wait for slow metrics + return asyncGauge.cachedMeasurement; + } + /** + * Try to wait for the CompletableFuture for {@link AsyncGaugeConfig#initialMetricsMeasurementTimeoutInMs}. + * If it times out, return the cached value; otherwise, update the cached value and return the latest result. + */ + asyncGauge.cachedMeasurement = asyncGauge.lastMeasurementFuture.get(initialMetricsMeasurementTimeoutInMs, TimeUnit.MILLISECONDS); + asyncGauge.lastMeasurementFuture = null; + return asyncGauge.cachedMeasurement; + } catch (RejectedExecutionException e) { + // The queue is saturated. + return -1; + } catch (TimeoutException e) { + // If the thread pool is shutdown or the measurement takes longer than 500ms, return the cached value + return asyncGauge.cachedMeasurement; + } catch (ExecutionException e) { + String errorMessage = String.format("Error when measuring value for metric %s.", asyncGauge.metricName); + if (!REDUNDANT_LOG_FILTER.isRedundantLog(errorMessage)) { + LOGGER.error(errorMessage, e); + } + asyncGauge.lastMeasurementFuture = null; + asyncGauge.cachedMeasurement = -1; + return asyncGauge.cachedMeasurement; + } catch (InterruptedException e) { + throw new RuntimeException("Metric measurement is interrupted for metric " + asyncGauge.metricName, e); + } finally { + if (asyncGauge.lastMeasurementFuture == null) { + /** + * The latest execution finished and check whether we should put it into slow metric set or not. + */ + if (asyncGauge.lastMeasurementLatencyInMs > slowMetricThresholdMs && !isSlowMetric) { + LOGGER.warn("Putting metric: " + asyncGauge.metricName + " into slow metric tracking"); + slowAsyncGaugeAccessMap.put(asyncGauge, time.milliseconds()); } - return submitNewMeasurementTask(config, now, asyncGaugeConfig); } } } - } - private double submitNewMeasurementTask(MetricConfig config, long now, AsyncGaugeConfig asyncGaugeConfig) { - try { - // Submit a new measurement task for the current minute - lastMeasurementStartTimeInMs = System.currentTimeMillis(); - lastMeasurementFuture = - CompletableFuture.supplyAsync(() -> this.measurable.measure(config, now), asyncGaugeConfig.getMetricsMeasurementExecutor()); - /** - * Try to wait for the CompletableFuture for {@link AsyncGaugeConfig#initialMetricsMeasurementTimeoutInMs}. - * If it times out, return the cached value; otherwise, update the cached value and return the latest result. - */ - cachedMeasurement = lastMeasurementFuture.get(asyncGaugeConfig.getInitialMetricsMeasurementTimeoutInMs(), TimeUnit.MILLISECONDS); - lastMeasurementFuture = null; - return cachedMeasurement; - } catch (RejectedExecutionException | TimeoutException e) { - // If the thread pool is shutdown or the measurement takes longer than 500ms, return the cached value - return cachedMeasurement; - } catch (ExecutionException e) { - String errorMessage = String.format("Error when measuring value for metric %s.", metricName); - if (!REDUNDANT_LOG_FILTER.isRedundantLog(errorMessage)) { - LOGGER.error(errorMessage, e); - } - return cachedMeasurement; - } catch (InterruptedException e) { - throw new RuntimeException("Metric measurement is interrupted for metric " + metricName, e); + @Override + public void close() throws IOException { + this.metricsMeasurementExecutor.shutdownNow(); + this.slowMetricsMeasurementExecutor.shutdownNow(); + this.inactiveSlowMetricCleanupExecutor.shutdownNow(); } } } diff --git a/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java b/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java index fb8ea6b..96ec715 100644 --- a/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java +++ b/src/test/java/io/tehuti/metrics/stats/AsyncGaugeTest.java @@ -4,7 +4,9 @@ import io.tehuti.metrics.AsyncGaugeConfig; import io.tehuti.metrics.MetricConfig; -import java.util.concurrent.Executors; +import io.tehuti.utils.MockTime; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; @@ -23,8 +25,14 @@ public void testTimeoutMeasurementReturnsErrorCode() throws InterruptedException return 0.0; }, "testMetric"); // Set AsyncGaugeConfig with max timeout of 100 ms and initial timeout of 10 ms - AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(), 100, 10); - MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig); + AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor = new AsyncGauge.AsyncGaugeExecutor.Builder() + .setMaxMetricsMeasurementTimeoutInMs(100) + .setInitialMetricsMeasurementTimeoutInMs(10) + .setMetricMeasurementThreadCount(1) + .setSlowMetricThresholdMs(50) + .setSlowMetricMeasurementThreadCount(1) + .build(); + MetricConfig metricConfig = new MetricConfig(asyncGaugeExecutor); // The first measurement should return 0 because the measurement task will not complete after the initial timeout assertEquals("The first measurement should return 0 because the measurement task will not complete after the initial timeout", 0.0, gauge.measure(metricConfig, System.currentTimeMillis()), 0.01); @@ -52,15 +60,95 @@ public void testCallerOfAsyncGaugeWillNeverBeBlocked() { } }, "testMetric"); // Set AsyncGaugeConfig with max timeout of 100 ms, initial timeout of 10 ms and a daemon thread pool - AsyncGaugeConfig asyncGaugeConfig = new AsyncGaugeConfig(Executors.newSingleThreadExecutor(r -> { - Thread thread = new Thread(r); - thread.setDaemon(true); - return thread; - }), 100, 10); - MetricConfig metricConfig = new MetricConfig(asyncGaugeConfig); + AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor = new AsyncGauge.AsyncGaugeExecutor.Builder() + .setMaxMetricsMeasurementTimeoutInMs(100) + .setInitialMetricsMeasurementTimeoutInMs(10) + .setMetricMeasurementThreadCount(1) + .setSlowMetricThresholdMs(50) + .setSlowMetricMeasurementThreadCount(1) + .build(); + MetricConfig metricConfig = new MetricConfig(asyncGaugeExecutor); // Test that caller of AsyncGauge.measure() will never be blocked gauge.measure(metricConfig, System.currentTimeMillis()); // If the caller is blocked, the following line will never be executed System.out.println("Caller of AsyncGauge.measure() is not blocked"); } + + @Test + public void testAsyncGaugeExecutorWithFastSlowMetrics() throws InterruptedException { + AtomicInteger callTimes = new AtomicInteger(0); + MockTime mockTime = new MockTime(); + AtomicInteger sleepTimeInMs = new AtomicInteger(0); + + AsyncGauge fastMetric = new AsyncGauge( + (ignored1, ignored2) -> { + mockTime.sleep(sleepTimeInMs.get()); + return callTimes.incrementAndGet(); + } , + "fast_metric"); + AsyncGauge.AsyncGaugeExecutor asyncGaugeExecutor = new AsyncGauge.AsyncGaugeExecutor.Builder() + .setMaxMetricsMeasurementTimeoutInMs(100) + .setInitialMetricsMeasurementTimeoutInMs(10) + .setMetricMeasurementThreadCount(1) + .setSlowMetricThresholdMs(50) + .setSlowMetricMeasurementThreadCount(1) + .setInactiveSlowMetricCleanupThresholdInMs(100) + .setInactiveSlowMetricCleanupIntervalInMs(100) + .setTime(mockTime) + .build(); + + MetricConfig config = new MetricConfig(asyncGaugeExecutor); + Assert.assertEquals(1.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + // Intentionally slow down the metric collection to mark this metric as a slow metric + sleepTimeInMs.set(200); + Assert.assertEquals(2.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Assert.assertEquals(1, asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().size()); + // Even the metric is being marked as slow, we should be able to collect the value. + // Next try, we should get the cached value and the following retry should get a more recent value + Assert.assertEquals(2.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + // Sleep for some time to allow the async task to finish + Thread.sleep(100); + Assert.assertEquals(3.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Assert.assertEquals(1, asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().size()); + // Reduce the metric collection time + sleepTimeInMs.set(10); + Thread.sleep(100); + Assert.assertEquals(5.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Thread.sleep(100); + Assert.assertEquals(6.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Assert.assertTrue(asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().isEmpty()); + + + sleepTimeInMs.set(200); + Assert.assertEquals(7.0d, fastMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Assert.assertEquals(1, asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().size()); + mockTime.sleep(1000); + // Wait for the metric cleanup thread to clear inactive slow metric + Thread.sleep(1000); + Assert.assertTrue(asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().isEmpty()); + + // Cancel long-running metric + AtomicBoolean longRunningMetricInterrupted = new AtomicBoolean(false); + AsyncGauge longRunningMetric = new AsyncGauge( + (ignored1, ignored2) -> { + try { + Thread.sleep(100000); // 100s + } catch (InterruptedException e) { + longRunningMetricInterrupted.set(true); + return 0; + } + return 1; + } , + "long_running_metric" + ); + mockTime.sleep(1000); + // Return cached value + Assert.assertEquals(0, longRunningMetric.measure(config, System.currentTimeMillis()), 0.0001d); + // The long-running metric should time out in next invocation + mockTime.sleep(1000); + // Return error code because of timeout + Assert.assertEquals(-1, longRunningMetric.measure(config, System.currentTimeMillis()), 0.0001d); + Assert.assertTrue("Long running metric should be interrupted after hitting timeout", longRunningMetricInterrupted.get()); + Assert.assertEquals(1, asyncGaugeExecutor.getSlowAsyncGaugeAccessMap().size()); + } } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 0e79852..03c8626 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.io.tehuti=ERROR +log4j.logger.io.tehuti=INFO