Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Fix duplicate exception alert #3109

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {

if (jobDataDto.isError()) {
builder.errorMsg(jobDataDto.getErrorMsg());
} else if (exceptions != null && ExceptionRule.isException(id, exceptions)) {
} else if (exceptions != null && ExceptionRule.isException(exceptions)) {
// The error message is too long to send an alarm,
// and only the first line of abnormal information is used
String err = Optional.ofNullable(exceptions.getRootException())
Expand All @@ -198,8 +198,8 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
}

if (checkpoints != null) {
builder.checkpointCostTime(CheckpointsRule.checkpointTime(id, checkpoints))
.isCheckpointFailed(CheckpointsRule.checkFailed(id, checkpoints));
builder.checkpointCostTime(CheckpointsRule.checkpointTime(checkpoints))
.isCheckpointFailed(CheckpointsRule.checkFailed(checkpoints));
if (checkpoints.getCounts() != null) {
builder.checkpointFailedCount(checkpoints.getCounts().getNumberFailedCheckpoints())
.checkpointCompleteCount(checkpoints.getCounts().getNumberCompletedCheckpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,94 +20,24 @@
package org.dinky.alert.rules;

import org.dinky.data.flink.checkpoint.CheckPointOverView;
import org.dinky.utils.TimeUtil;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.time.Duration;
import java.time.LocalDateTime;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CheckpointsRule {

private static final Logger logger = LoggerFactory.getLogger(CheckpointsRule.class);

private static final LoadingCache<String, Object> checkpointsCache =
CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null));

/**
* Checks if a checkpoint has expired.
*
* @param latest The latest checkpoint node.
* @param jobInstanceID The key used to identify the checkpoint.
* @param ckKey The checkpoint key to check for expiration.
* @return True if the checkpoint has expired, false otherwise.
*/
private static boolean isExpire(CheckPointOverView latest, String jobInstanceID, String ckKey) {
logger.debug("checkpointTime key: {} ,checkpoints: {}, key: {}", jobInstanceID, latest, ckKey);

CheckPointOverView his = (CheckPointOverView) checkpointsCache.getIfPresent(jobInstanceID);

switch (ckKey) {
case "completed":
if (his != null) {
CheckPointOverView.CompletedCheckpointStatistics completedCheckpointStatistics =
his.getLatestCheckpoints().getCompletedCheckpointStatistics();
if (completedCheckpointStatistics != null) {
return Objects.equals(completedCheckpointStatistics.getStatus(), "completed");
}
}
return false;
case "failed":
CheckPointOverView.FailedCheckpointStatistics failedCheckpointStatistics = null;
if (his != null) {
failedCheckpointStatistics = his.getLatestCheckpoints().getFailedCheckpointStatistics();
}
long failureTimestamp = 0;
CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = latest.getLatestCheckpoints();
if (latestLatestCheckpoints != null
&& latestLatestCheckpoints.getFailedCheckpointStatistics() != null) {
failureTimestamp = latestLatestCheckpoints
.getFailedCheckpointStatistics()
.getTriggerTimestamp();
}
if (null == latestLatestCheckpoints || 0 == failureTimestamp) {
return true;
}
long latestTime =
latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp();
checkpointsCache.put(jobInstanceID, latest);
if (his != null) {
long hisTime = 0;
if (failedCheckpointStatistics != null) {
hisTime = failedCheckpointStatistics.getTriggerTimestamp();
}
return hisTime == latestTime || System.currentTimeMillis() - latestTime > 60000;
}
return false;

default:
return false;
}
}

/**
* Retrieves the checkpoint time for a specific key.
*
* @param key The key used to identify the checkpoint.
* @param checkpoints The checkpoints object containing relevant data.
* @return The checkpoint time, or null if the checkpoint has expired.
*/
public static Long checkpointTime(String key, CheckPointOverView checkpoints) {
if (isExpire(checkpoints, key, "completed")) {
return -1L;
}
public static Long checkpointTime(CheckPointOverView checkpoints) {

CheckPointOverView.LatestCheckpoints checkpointsLatestCheckpoints = checkpoints.getLatestCheckpoints();
if (null == checkpointsLatestCheckpoints
|| null == checkpointsLatestCheckpoints.getCompletedCheckpointStatistics()) {
Expand All @@ -122,11 +52,19 @@ public static Long checkpointTime(String key, CheckPointOverView checkpoints) {
/**
* Checks if a specific checkpoint has failed.
*
* @param key The key used to identify the checkpoint.
* @param checkpoints The checkpoints object containing relevant data.
* @return True if the checkpoint has failed, null if it has expired.
*/
public static Boolean checkFailed(String key, CheckPointOverView checkpoints) {
return !isExpire(checkpoints, key, "failed");
public static Boolean checkFailed(CheckPointOverView checkpoints) {
CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = checkpoints.getLatestCheckpoints();
if (latestLatestCheckpoints != null && latestLatestCheckpoints.getFailedCheckpointStatistics() != null) {
long failureTimestamp =
latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp();
LocalDateTime localDateTime = TimeUtil.toLocalDateTime(failureTimestamp);
LocalDateTime now = LocalDateTime.now();
long diff = Duration.between(localDateTime, now).toMinutes();
return diff <= 2;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,33 @@
package org.dinky.alert.rules;

import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.utils.TimeUtil;

import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.time.Duration;
import java.time.LocalDateTime;

public class ExceptionRule {

private static final LoadingCache<String, Long> hisTime =
CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null));

/**
* Executes a certain operation based on the provided key and exceptions object.
* This method is stored within the database, is called through SPEL, and is not an executable method
* @param jobinstanceId The key used to identify the operation.
* @param exceptions The exceptions object containing relevant data.
* @return True if the operation should be executed, false otherwise.
*/
public static Boolean isException(String jobinstanceId, FlinkJobExceptionsDetail exceptions) {
public static Boolean isException(FlinkJobExceptionsDetail exceptions) {

// If the exception is the same as the previous one, it will not be reported again
if (exceptions.getTimestamp() == null) {
return false;
}
long timestamp = exceptions.getTimestamp();
Long hisTimeIfPresent = hisTime.getIfPresent(jobinstanceId);
if (hisTimeIfPresent != null && hisTimeIfPresent == timestamp) {
LocalDateTime localDateTime = TimeUtil.toLocalDateTime(timestamp);
LocalDateTime now = LocalDateTime.now();
long diff = Duration.between(localDateTime, now).toMinutes();

// If the exception is older than 2 minutes, we don't care about it anymore.
if (diff >= 2) {
return false;
}
hisTime.put(jobinstanceId, timestamp);
if (exceptions.getRootException() != null) {
return !exceptions.getRootException().isEmpty();
} else {
Expand Down
Loading