Skip to content

Commit

Permalink
fix: add out-of-incident expansion for new alert batches
Browse files Browse the repository at this point in the history
  • Loading branch information
GlebBerjoskin committed Sep 16, 2024
1 parent f291959 commit c712acb
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 75 deletions.
264 changes: 190 additions & 74 deletions ee/experimental/incident_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,53 +156,121 @@ def get_component_first_seen_time(component: Set[str], alerts: List[Alert]) -> d
return min(alert.timestamp for alert in alerts if alert.fingerprint in component)


def process_component(component: Set[str], incidents: List[Incident], alerts: List[Alert],
tenant_id: str, incident_similarity_threshold: float, min_incident_size: int,
incident_validity_threshold: timedelta) -> Tuple[str, bool]:
logger.info(
f"Processing alert graph component with {len(component)} nodes. Min incident size: {min_incident_size}",
extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME})

if len(component) < min_incident_size:
return None, False

for incident in incidents:
if is_similar_incident(incident, component, incident_similarity_threshold):
current_time = get_component_first_seen_time(component, alerts)
if is_incident_accepting_updates(incident, current_time, incident_validity_threshold):
logger.info(
f"Incident {incident.id} is similar to the alert graph component. Merging.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
# def process_component(component: Set[str], incidents: List[Incident], alerts: List[Alert],
# tenant_id: str, incident_similarity_threshold: float, min_incident_size: int,
# incident_validity_threshold: timedelta) -> Tuple[str, bool]:
# logger.info(
# f"Processing alert graph component with {len(component)} nodes. Min incident size: {min_incident_size}",
# extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME})

# if len(component) < min_incident_size:
# return None, False

# for incident in incidents:
# if is_similar_incident(incident, component, incident_similarity_threshold):
# current_time = get_component_first_seen_time(component, alerts)
# if is_incident_accepting_updates(incident, current_time, incident_validity_threshold):
# logger.info(
# f"Incident {incident.id} is similar to the alert graph component. Merging.",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

# existing_alert_ids = set([alert.id for alert in incident.alerts])
# return update_existing_incident_inmem(
# incident, [alert for alert in alerts if alert.fingerprint in component and not alert.id in existing_alert_ids])

# logger.info(
# f"No incident is similar to the alert graph component. Creating new incident.",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
# return create_new_incident_inmem(component, alerts, tenant_id)

def process_graph_component(component: Set[str], batch_incidents: List[Incident], batch_alerts: List[Alert], batch_fingerprints: Set[str],
tenant_id: str, min_incident_size: int, incident_validity_threshold: timedelta) -> Tuple[str, bool]:
is_component_merged = False
for incident in batch_incidents:
incident_fingerprints = set(alert.fingerprint for alert in incident.alerts)
if incident_fingerprints.issubset(component):
if not incident_fingerprints.intersection(batch_fingerprints):
continue
logger.info(f"Found possible extension for incident {incident.id}",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

amendment_time = get_component_first_seen_time(component, batch_alerts)
if is_incident_accepting_updates(incident, amendment_time, incident_validity_threshold):
logger.info(f"Incident {incident.id} is accepting updates.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

existing_alert_ids = set([alert.id for alert in incident.alerts])
return update_existing_incident_inmem(
incident, [alert for alert in alerts if alert.fingerprint in component and not alert.id in existing_alert_ids])

logger.info(
f"No incident is similar to the alert graph component. Creating new incident.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
return create_new_incident_inmem(component, alerts, tenant_id)
appendable_alerts = [alert for alert in batch_alerts if alert.fingerprint in component and not alert.id in existing_alert_ids]

logger.info(f"Appending {len(appendable_alerts)} alerts to incident {incident.id}",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
is_component_merged = True
return update_existing_incident_inmem(incident, appendable_alerts)
else:
logger.info(f"Incident {incident.id} is not accepting updates. Aborting merge operation.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

if not is_component_merged:
if len(component) >= min_incident_size:
logger.info(f"Creating new incident with {len(component)} alerts",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
return create_new_incident_inmem(component, batch_alerts, tenant_id)
else:
return None, False


def process_alert_batch(batch_alerts: List[Alert], batch_incidents: list[Incident], tenant_id: str, min_incident_size: int,
incident_validity_threshold: timedelta, pmi_values, fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold) -> Tuple[str, bool]:

batch_fingerprints = set([alert.fingerprint for alert in batch_alerts])

amended_fingerprints = set(batch_fingerprints)
for incident in batch_incidents:
incident_fingerprints = set(alert.fingerprint for alert in incident.alerts)

amended_fingerprints = incident_fingerprints.union(batch_fingerprints)

logger.info("Building alert graph", extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME})
amended_graph = create_graph(tenant_id, list(amended_fingerprints), pmi_values,
fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold)

logger.info("Analyzing alert graph", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
batch_incident_ids_for_processing = []
batch_new_incidents = []
batch_updated_incidents = []

for component in nx.connected_components(amended_graph):
incident, is_updated = process_graph_component(component, batch_incidents, batch_alerts, batch_fingerprints, tenant_id, min_incident_size, incident_validity_threshold)
if incident:
batch_incident_ids_for_processing.append(incident.id)
if is_updated:
batch_updated_incidents.append(incident)
else:
batch_new_incidents.append(incident)

return batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents



def process_graph_components(graph: nx.Graph, incidents: List[Incident], alerts: List[Alert], tenant_id: str, incident_similarity_threshold: float,
min_incident_size: int, incident_validity_threshold: timedelta) -> Tuple[List[str], List[Incident], List[Incident]]:
# def process_graph_components(graph: nx.Graph, incidents: List[Incident], alerts: List[Alert], tenant_id: str, incident_similarity_threshold: float,
# min_incident_size: int, incident_validity_threshold: timedelta) -> Tuple[List[str], List[Incident], List[Incident]]:

incident_ids_for_processing = []
new_incidents = []
updated_incident_ids = []
# incident_ids_for_processing = []
# new_incidents = []
# updated_incident_ids = []

for component in nx.connected_components(graph):
incident, is_updated = process_component(
component, incidents, alerts, tenant_id, incident_similarity_threshold, min_incident_size, incident_validity_threshold)
if incident:
incident_ids_for_processing.append(incident.id)
# for component in nx.connected_components(graph):
# incident, is_updated = process_component(
# component, incidents, alerts, tenant_id, incident_similarity_threshold, min_incident_size, incident_validity_threshold)
# if incident:
# incident_ids_for_processing.append(incident.id)

if is_updated:
updated_incident_ids.append(incident)
else:
new_incidents.append(incident)
# if is_updated:
# updated_incident_ids.append(incident)
# else:
# new_incidents.append(incident)

return incident_ids_for_processing, new_incidents, updated_incident_ids
# return incident_ids_for_processing, new_incidents, updated_incident_ids


async def generate_update_incident_summary(ctx, tenant_id: str, incident_id: str):
Expand Down Expand Up @@ -307,7 +375,7 @@ async def mine_incidents_and_create_objects(
Returns:
Dict[str, List[Incident]]: a dictionary containing the created incidents
"""
"""
# obtain tenant_config
if not general_temp_dir:
general_temp_dir = os.environ.get(
Expand Down Expand Up @@ -416,53 +484,101 @@ async def mine_incidents_and_create_objects(
if len(batch_alerts) == 0:
continue

batch_fingerprints = list(set([alert.fingerprint for alert in batch_alerts]))
# batch_fingerprints = list(set([alert.fingerprint for alert in batch_alerts]))

batch_incidents = get_last_incidents_inmem(incidents, datetime.fromtimestamp(batch_end_ts[batch_idx]),
datetime.fromtimestamp(batch_start_ts[batch_idx]) - incident_validity_threshold)

logger.info(
f"Found {len(batch_incidents)} incidents that accept updates by {datetime.fromtimestamp(batch_start_ts[batch_idx])}.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
for incident in batch_incidents:
incident_fingerprints = set(
alert.fingerprint for alert in incident.alerts)
if len(incident_fingerprints.intersection(set(batch_fingerprints))) == 0:
continue

incident_fingerprints = list(incident_fingerprints)
appendable_alerts = [
alert for alert in batch_alerts if alert.fingerprint in incident_fingerprints]
appendable_alerts.sort(key=lambda x: x.timestamp)

current_time = get_component_first_seen_time(incident_fingerprints, batch_alerts)
if is_incident_accepting_updates(incident, current_time, incident_validity_threshold):
# here
existing_alert_ids = set([alert.id for alert in incident.alerts])

batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents = process_alert_batch(batch_alerts, batch_incidents, tenant_id, min_incident_size, incident_validity_threshold, pmi_values, fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold)

# amended_fingerprints = set(batch_fingerprints)
# for incident in batch_incidents:
# incident_fingerprints = set(alert.fingerprint for alert in incident.alerts)

# amended_fingerprints = incident_fingerprints.union(amended_fingerprints)

# logger.info("Building alert graph", extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME})
# amended_graph = create_graph(tenant_id, list(amended_fingerprints), pmi_values,
# fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold)

# logger.info("Analyzing alert graph", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
# batch_incident_ids_for_processing = []
# batch_new_incidents = []
# batch_updated_incidents = []

# for component in nx.connected_components(amended_graph):
# is_component_merged = False
# for incident in batch_incidents:
# incident_fingerprints = set(alert.fingerprint for alert in incident.alerts)
# if incident_fingerprints.issubset(component):
# if not incident_fingerprints.intersection(batch_fingerprints):
# continue
# if batch_idx == 15370:
# pass
# logger.info(f"Found possible extension for incident {incident.id}",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

# amendment_time = get_component_first_seen_time(component, batch_alerts)
# if is_incident_accepting_updates(incident, amendment_time, incident_validity_threshold):
# logger.info(f"Incident {incident.id} is accepting updates.",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

# existing_alert_ids = set([alert.id for alert in incident.alerts])
# appendable_alerts = [alert for alert in batch_alerts if alert.fingerprint in component and not alert.id in existing_alert_ids]

# logger.info(f"Appending {len(appendable_alerts)} alerts to incident {incident.id}",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
# is_component_merged = True
# batch_incident_ids_for_processing.append(incident.id)
# batch_updated_incidents.append(incident)
# else:
# logger.info(f"Incident {incident.id} is not accepting updates. Aborting merge operation.",
# extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

# # else:

# # update_existing_incident_inmem(incident, appendable_alerts)

# if not is_component_merged:
# if len(component) >= min_incident_size:
# incident, is_updated = create_new_incident_inmem(component, batch_alerts, tenant_id)
# batch_incident_ids_for_processing.append(incident.id)
# batch_new_incidents.append(incident)

# # incident_fingerprints = list(incident_fingerprints)
# # appendable_alerts = [
# # alert for alert in batch_alerts if alert.fingerprint in incident_fingerprints]
# # appendable_alerts.sort(key=lambda x: x.timestamp)

# # current_time = get_component_first_seen_time(incident_fingerprints, batch_alerts)
# # if is_incident_accepting_updates(incident, current_time, incident_validity_threshold):
# # existing_alert_ids = set([alert.id for alert in incident.alerts])

appendable_alerts = [alert for alert in appendable_alerts if not alert.id in existing_alert_ids]
logger.info(
f"Incident {incident.id} is accepting updates. Appending {len(appendable_alerts)} alerts.",
extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})
# # appendable_alerts = [alert for alert in appendable_alerts if not alert.id in existing_alert_ids]
# # logger.info(
# # f"Incident {incident.id} is accepting updates. Appending {len(appendable_alerts)} alerts.",
# # extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

update_existing_incident_inmem(incident, appendable_alerts)
# # update_existing_incident_inmem(incident, appendable_alerts)

logger.info("Building alert graph", extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME})

batch_graph = create_graph(
tenant_id,
batch_fingerprints,
pmi_values,
fingerpint2idx,
pmi_threshold,
delete_nodes,
knee_threshold)
# # batch_graph = create_graph(
# # tenant_id,
# # batch_fingerprints,
# # pmi_values,
# # fingerpint2idx,
# # pmi_threshold,
# # delete_nodes,
# # knee_threshold)

logger.info("Analyzing alert graph", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME})

batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents \
= process_graph_components(batch_graph, batch_incidents, batch_alerts, tenant_id, \
incident_similarity_threshold, min_incident_size, incident_validity_threshold)
# # batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents \
# # = process_graph_components(batch_graph, batch_incidents, batch_alerts, tenant_id, \
# # incident_similarity_threshold, min_incident_size, incident_validity_threshold)

new_incident_ids.extend([incident.id for incident in batch_new_incidents])
incidents.extend(batch_new_incidents)
Expand Down
2 changes: 1 addition & 1 deletion keep/api/arq_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_arq_worker(queue_name: str) -> Worker:
"ARQ_KEEP_RESULT", cast=int, default=3600
) # duration to keep job results for

if config("ARQ_EXPIRES"):
if config("ARQ_EXPIRES", default=False):
logger.info(f"ARQ_EXPIRES is set to {config('ARQ_EXPIRES')}. Warning: this hyperparameter needs to be set to a high value (our default is 3600000ms) to handle longer-running AI tasks.")
else:
logger.info(f"ARQ_EXPIRES is not set. Defaulting to {3600*1000 if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI else 3600}ms")
Expand Down

0 comments on commit c712acb

Please sign in to comment.