diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java index c261056282..34b84065c3 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java @@ -185,7 +185,7 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) { if (jobDataDto.isError()) { builder.errorMsg(jobDataDto.getErrorMsg()); - } else if (exceptions != null && ExceptionRule.isException(id, exceptions)) { + } else if (exceptions != null && ExceptionRule.isException(exceptions)) { // The error message is too long to send an alarm, // and only the first line of abnormal information is used String err = Optional.ofNullable(exceptions.getRootException()) @@ -198,8 +198,8 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) { } if (checkpoints != null) { - builder.checkpointCostTime(CheckpointsRule.checkpointTime(id, checkpoints)) - .isCheckpointFailed(CheckpointsRule.checkFailed(id, checkpoints)); + builder.checkpointCostTime(CheckpointsRule.checkpointTime(checkpoints)) + .isCheckpointFailed(CheckpointsRule.checkFailed(checkpoints)); if (checkpoints.getCounts() != null) { builder.checkpointFailedCount(checkpoints.getCounts().getNumberFailedCheckpoints()) .checkpointCompleteCount(checkpoints.getCounts().getNumberCompletedCheckpoints()); diff --git a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/CheckpointsRule.java b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/CheckpointsRule.java index 6a2150bf4a..9fe75993da 100644 --- a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/CheckpointsRule.java +++ b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/CheckpointsRule.java @@ -20,94 +20,24 @@ package org.dinky.alert.rules; import org.dinky.data.flink.checkpoint.CheckPointOverView; +import org.dinky.utils.TimeUtil; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import java.time.Duration; +import java.time.LocalDateTime; import lombok.extern.slf4j.Slf4j; @Slf4j public class CheckpointsRule { - private static final Logger logger = LoggerFactory.getLogger(CheckpointsRule.class); - - private static final LoadingCache checkpointsCache = - CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null)); - - /** - * Checks if a checkpoint has expired. - * - * @param latest The latest checkpoint node. - * @param jobInstanceID The key used to identify the checkpoint. - * @param ckKey The checkpoint key to check for expiration. - * @return True if the checkpoint has expired, false otherwise. - */ - private static boolean isExpire(CheckPointOverView latest, String jobInstanceID, String ckKey) { - logger.debug("checkpointTime key: {} ,checkpoints: {}, key: {}", jobInstanceID, latest, ckKey); - - CheckPointOverView his = (CheckPointOverView) checkpointsCache.getIfPresent(jobInstanceID); - - switch (ckKey) { - case "completed": - if (his != null) { - CheckPointOverView.CompletedCheckpointStatistics completedCheckpointStatistics = - his.getLatestCheckpoints().getCompletedCheckpointStatistics(); - if (completedCheckpointStatistics != null) { - return Objects.equals(completedCheckpointStatistics.getStatus(), "completed"); - } - } - return false; - case "failed": - CheckPointOverView.FailedCheckpointStatistics failedCheckpointStatistics = null; - if (his != null) { - failedCheckpointStatistics = his.getLatestCheckpoints().getFailedCheckpointStatistics(); - } - long failureTimestamp = 0; - CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = latest.getLatestCheckpoints(); - if (latestLatestCheckpoints != null - && latestLatestCheckpoints.getFailedCheckpointStatistics() != null) { - failureTimestamp = latestLatestCheckpoints - .getFailedCheckpointStatistics() - .getTriggerTimestamp(); - } - if (null == latestLatestCheckpoints || 0 == failureTimestamp) { - return true; - } - long latestTime = - latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp(); - checkpointsCache.put(jobInstanceID, latest); - if (his != null) { - long hisTime = 0; - if (failedCheckpointStatistics != null) { - hisTime = failedCheckpointStatistics.getTriggerTimestamp(); - } - return hisTime == latestTime || System.currentTimeMillis() - latestTime > 60000; - } - return false; - - default: - return false; - } - } - /** * Retrieves the checkpoint time for a specific key. * - * @param key The key used to identify the checkpoint. * @param checkpoints The checkpoints object containing relevant data. * @return The checkpoint time, or null if the checkpoint has expired. */ - public static Long checkpointTime(String key, CheckPointOverView checkpoints) { - if (isExpire(checkpoints, key, "completed")) { - return -1L; - } + public static Long checkpointTime(CheckPointOverView checkpoints) { + CheckPointOverView.LatestCheckpoints checkpointsLatestCheckpoints = checkpoints.getLatestCheckpoints(); if (null == checkpointsLatestCheckpoints || null == checkpointsLatestCheckpoints.getCompletedCheckpointStatistics()) { @@ -122,11 +52,19 @@ public static Long checkpointTime(String key, CheckPointOverView checkpoints) { /** * Checks if a specific checkpoint has failed. * - * @param key The key used to identify the checkpoint. * @param checkpoints The checkpoints object containing relevant data. * @return True if the checkpoint has failed, null if it has expired. */ - public static Boolean checkFailed(String key, CheckPointOverView checkpoints) { - return !isExpire(checkpoints, key, "failed"); + public static Boolean checkFailed(CheckPointOverView checkpoints) { + CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = checkpoints.getLatestCheckpoints(); + if (latestLatestCheckpoints != null && latestLatestCheckpoints.getFailedCheckpointStatistics() != null) { + long failureTimestamp = + latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp(); + LocalDateTime localDateTime = TimeUtil.toLocalDateTime(failureTimestamp); + LocalDateTime now = LocalDateTime.now(); + long diff = Duration.between(localDateTime, now).toMinutes(); + return diff <= 2; + } + return false; } } diff --git a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/ExceptionRule.java b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/ExceptionRule.java index fbc397ffec..cc0eeef11a 100644 --- a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/ExceptionRule.java +++ b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/rules/ExceptionRule.java @@ -20,37 +20,33 @@ package org.dinky.alert.rules; import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail; +import org.dinky.utils.TimeUtil; -import java.util.concurrent.TimeUnit; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import java.time.Duration; +import java.time.LocalDateTime; public class ExceptionRule { - private static final LoadingCache hisTime = - CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null)); - /** * Executes a certain operation based on the provided key and exceptions object. * This method is stored within the database, is called through SPEL, and is not an executable method - * @param jobinstanceId The key used to identify the operation. * @param exceptions The exceptions object containing relevant data. * @return True if the operation should be executed, false otherwise. */ - public static Boolean isException(String jobinstanceId, FlinkJobExceptionsDetail exceptions) { + public static Boolean isException(FlinkJobExceptionsDetail exceptions) { - // If the exception is the same as the previous one, it will not be reported again if (exceptions.getTimestamp() == null) { return false; } long timestamp = exceptions.getTimestamp(); - Long hisTimeIfPresent = hisTime.getIfPresent(jobinstanceId); - if (hisTimeIfPresent != null && hisTimeIfPresent == timestamp) { + LocalDateTime localDateTime = TimeUtil.toLocalDateTime(timestamp); + LocalDateTime now = LocalDateTime.now(); + long diff = Duration.between(localDateTime, now).toMinutes(); + + // If the exception is older than 2 minutes, we don't care about it anymore. + if (diff >= 2) { return false; } - hisTime.put(jobinstanceId, timestamp); if (exceptions.getRootException() != null) { return !exceptions.getRootException().isEmpty(); } else {