diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java index a83755069..016d8ed1b 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java @@ -164,6 +164,13 @@ public void refreshEventQueues() { } }); + Map> eventToQueueSize = getQueueSizes(); + eventToQueueSize.forEach( + (event, queueMap) -> { + Map.Entry queueSize = queueMap.entrySet().iterator().next(); + Monitors.recordEventQueueDepth(queueSize.getKey(), queueSize.getValue()); + }); + LOGGER.debug("Event queues: {}", eventToQueueMap.keySet()); LOGGER.debug("Stored queue: {}", events); LOGGER.debug("Removed queue: {}", removed); diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index cbe529ae4..097537739 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -235,6 +235,10 @@ public static void recordQueueDepth(String taskType, long size, String ownerApp) StringUtils.defaultIfBlank(ownerApp, "unknown")); } + public static void recordEventQueueDepth(String queueType, long size) { + gauge(classQualifier, "event_queue_depth", size, "queueType", queueType); + } + public static void recordTaskInProgress(String taskType, long size, String ownerApp) { gauge( classQualifier,