Skip to content

Commit

Permalink
Merge branch 'main' into feat-grafana-map
Browse files Browse the repository at this point in the history
  • Loading branch information
talboren authored Jan 30, 2025
2 parents 63de0b2 + b8db4e4 commit dcabf32
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 47 deletions.
9 changes: 6 additions & 3 deletions keep/api/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes

from keep.api.core.config import config

Expand Down Expand Up @@ -55,8 +56,8 @@ def setup(app: FastAPI):

resource = Resource.create(
attributes={
"SERVICE.NAME": service_name,
"SERVICE_INSTANCE_ID": f"worker-{os.getpid()}",
ResourceAttributes.SERVICE_NAME: service_name,
ResourceAttributes.SERVICE_INSTANCE_ID: f"worker-{os.getpid()}",
}
)
provider = TracerProvider(resource=resource)
Expand Down Expand Up @@ -86,7 +87,9 @@ def setup(app: FastAPI):

if enable_cloud_trace_exporter:
logger.info("Cloud Trace exporter enabled.")
processor = BatchSpanProcessor(CloudTraceSpanExporter())
processor = BatchSpanProcessor(
CloudTraceSpanExporter(resource_regex="service.*")
)
provider.add_span_processor(processor)

trace.set_tracer_provider(provider)
Expand Down
172 changes: 129 additions & 43 deletions keep/providers/kibana_provider/kibana_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import datetime
import json
import uuid
from typing import Literal
from typing import Literal, Union
from urllib.parse import urlparse

import pydantic
import requests
from fastapi import HTTPException
from packaging.version import Version
from starlette.datastructures import FormData

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
Expand All @@ -37,14 +39,14 @@ class KibanaProviderAuthConfig:
"required": True,
"description": "Kibana Host",
"hint": "https://keep.kb.us-central1.gcp.cloud.es.io",
"validation": "any_http_url"
"validation": "any_http_url",
}
)
kibana_port: UrlPort = dataclasses.field(
metadata={
"required": False,
"description": "Kibana Port (defaults to 9243)",
"validation": "port"
"validation": "port",
},
default=9243,
)
Expand All @@ -64,6 +66,11 @@ class KibanaProvider(BaseProvider):
"id": "{{alert.id}}",
"fingerprint": "{{alert.id}}",
"url": "{{context.alertDetailsUrl}}",
"context.message": "{{context.message}}",
"context.hits": "{{context.hits}}",
"context.link": "{{context.link}}",
"context.query": "{{context.query}}",
"context.title": "{{context.title}}",
"context.cloud": "{{context.cloud}}",
"context.container": "{{context.container}}",
"context.group": "{{context.group}}",
Expand Down Expand Up @@ -153,12 +160,58 @@ def __init__(
super().__init__(context_manager, provider_id, config)

@staticmethod
def parse_event_raw_body(raw_body: bytes | dict) -> dict:
# tb: this is a f**king stupid hack because Kibana doesn't escape {{#toJson}} :(
if b'"payload": "{' in raw_body:
raw_body = raw_body.replace(b'"payload": "{', b'"payload": {')
raw_body = raw_body.replace(b'}",', b"},")
return json.loads(raw_body)
def parse_event_raw_body(raw_body: Union[bytes, dict, FormData]) -> dict:
"""
Parse the raw body from various input types into a dictionary.
Args:
raw_body: Can be bytes, dict, or FormData
Returns:
dict: Parsed event data
Raises:
ValueError: If the input type is not supported or parsing fails
"""
# Handle FormData
if hasattr(raw_body, "_list") and hasattr(
raw_body, "getlist"
): # Check if it's FormData
# Convert FormData to dict
form_dict = {}
for key, value in raw_body.items():
# Handle multiple values for the same key
existing_value = form_dict.get(key)
if existing_value is not None:
if isinstance(existing_value, list):
existing_value.append(value)
else:
form_dict[key] = [existing_value, value]
else:
form_dict[key] = value

# If there's a 'payload' field that's a string, try to parse it as JSON
if "payload" in form_dict and isinstance(form_dict["payload"], str):
try:
form_dict["payload"] = json.loads(form_dict["payload"])
except json.JSONDecodeError:
pass # Keep the original string if it's not valid JSON

return form_dict

# Handle bytes
if isinstance(raw_body, bytes):
# Handle the Kibana escape issue
if b'"payload": "{' in raw_body:
raw_body = raw_body.replace(b'"payload": "{', b'"payload": {')
raw_body = raw_body.replace(b'}",', b"},")
return json.loads(raw_body)

# Handle dict
if isinstance(raw_body, dict):
return raw_body

raise ValueError(f"Unsupported raw_body type: {type(raw_body)}")

def validate_scopes(self) -> dict[str, bool | str]:
"""
Expand Down Expand Up @@ -244,6 +297,17 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
keep_api_url (str): The URL of the Keep API
api_key (str): The API key of the Keep API
"""
# Check kibana version
kibana_version = (
self.request("GET", "api/status").get("version", {}).get("number")
)
rule_types = self.request("GET", "api/alerting/rule_types")

rule_types = {rule_type["id"]: rule_type for rule_type in rule_types}
# if not version, assume < 8 for backwards compatibility
if not kibana_version:
kibana_version = "7.0.0"

# First get all existing connectors and check if we're already installed:
connectors = self.request("GET", "api/actions/connectors")
connector_name = f"keep-{tenant_id}"
Expand All @@ -265,7 +329,10 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
# this means we already have a connector installed, so we just need to update it
config: dict = connector["config"]
config["url"] = keep_api_url
config["headers"] = {"X-API-KEY": api_key}
config["headers"] = {
"X-API-KEY": api_key,
"Content-Type": "application/json",
}
self.request(
"PUT",
f"api/actions/connector/{connector['id']}",
Expand All @@ -284,7 +351,10 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
"method": "post",
"url": keep_api_url,
"authType": None,
"headers": {"X-API-KEY": api_key},
"headers": {
"X-API-KEY": api_key,
"Content-Type": "application/json",
},
},
"secrets": {},
"connector_type_id": ".webhook",
Expand All @@ -305,6 +375,13 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
for alert_rule in alert_rules.get("data", []):
self.logger.info(f"Updating alert {alert_rule['id']}")
alert_actions = alert_rule.get("actions") or []

# kibana 8:
# pop any connector_type_id
if Version(kibana_version) > Version("8.0.0"):
for action in alert_actions:
action.pop("connector_type_id", None)

keep_action_exists = any(
iter(
[
Expand All @@ -318,18 +395,14 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
# This alert was already modified by us / manually added
self.logger.info(f"Alert {alert_rule['id']} already updated, skipping")
continue
for status in ["Alert", "Recovered", "No Data"]:

action_groups = rule_types.get(alert_rule["rule_type_id"], {}).get(
"action_groups", []
)
for action_group in action_groups:
alert_actions.append(
{
"group": (
"custom_threshold.fired"
if status == "Alert"
else (
"recovered"
if status == "Recovered"
else "custom_threshold.nodata"
)
),
"group": action_group.get("id"),
"id": connector_id,
"params": {"body": KibanaProvider.WEBHOOK_PAYLOAD},
"frequency": {
Expand All @@ -340,6 +413,7 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
"uuid": str(uuid.uuid4()),
}
)

try:
self.request(
"PUT",
Expand Down Expand Up @@ -443,10 +517,14 @@ def setup_webhook(

def validate_config(self):
if self.is_installed or self.is_provisioned:
host = self.config.authentication['kibana_host']
host = self.config.authentication["kibana_host"]
if not (host.startswith("http://") or host.startswith("https://")):
scheme = "http://" if ("localhost" in host or "127.0.0.1" in host) else "https://"
self.config.authentication['kibana_host'] = scheme + host
scheme = (
"http://"
if ("localhost" in host or "127.0.0.1" in host)
else "https://"
)
self.config.authentication["kibana_host"] = scheme + host

self.authentication_config = KibanaProviderAuthConfig(
**self.config.authentication
Expand Down Expand Up @@ -501,25 +579,22 @@ def _format_alert(
# If this is coming from Kibana Watcher
if "payload" in event:
return KibanaProvider.format_alert_from_watcher(event)
try:
labels = {
v.split("=", 1)[0]: v.split("=", 1)[1]
for v in event.get("ruleTags", "").split(",")
}
except Exception:
# Failed to extract labels from ruleTags
labels = {}

try:
labels.update(
{
v.split("=", 1)[0]: v.split("=", 1)[1]
for v in event.get("contextTags", "").split(",")
}
)
except Exception:
# Failed to enrich labels with contextTags
pass
labels = {}
tags = event.get("ruleTags", [])
for tag in tags:
# if the tag is a key=value pair
if "=" in tag:
key, value = tag.split("=", 1)
labels[key] = value

# same with contextTags
context_tags = event.get("contextTags", [])
for tag in context_tags:
# if the tag is a key=value pair
if "=" in tag:
key, value = tag.split("=", 1)
labels[key] = value

environment = labels.get("environment", "undefined")

Expand All @@ -530,8 +605,19 @@ def _format_alert(
event["severity"] = KibanaProvider.SEVERITIES_MAP.get(
event.get("severity"), AlertSeverity.INFO
)

if not event.get("url"):
event["url"] = event.get("ruleUrl")
# if still no url, popt it
if not event.get("url"):
event.pop("url", None)

return AlertDto(
environment=environment, labels=labels, source=["kibana"], **event
environment=environment,
labels=labels,
tags=tags,
source=["kibana"],
**event,
)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "keep"
version = "0.35.5"
version = "0.35.6"
description = "Alerting. for developers, by developers."
authors = ["Keep Alerting LTD"]
packages = [{include = "keep"}]
Expand Down

0 comments on commit dcabf32

Please sign in to comment.