diff --git a/docs/deployment/configuration.mdx b/docs/deployment/configuration.mdx index eaa04d439..ddf404912 100644 --- a/docs/deployment/configuration.mdx +++ b/docs/deployment/configuration.mdx @@ -25,6 +25,7 @@ General configuration variables control the core behavior of the Keep server. Th | **KEEP_API_URL** | Specifies the Keep API URL | No | Constructed from HOST and PORT | Valid URL | | **KEEP_STORE_RAW_ALERTS** | Enables storing of raw alerts | No | "false" | "true" or "false" | | **TENANT_CONFIGURATION_RELOAD_TIME** | Time in minutes to reload tenant configurations | No | 5 | Positive integer | +| **KEEP_LIVE_DEMO_MODE** | Keep will simulate incoming alerts and other activity | No | "false" | "true" or "false" | ### Logging and Environment diff --git a/keep/api/config.py b/keep/api/config.py index c420860d0..474206812 100644 --- a/keep/api/config.py +++ b/keep/api/config.py @@ -13,13 +13,14 @@ keep.api.logging.setup_logging() logger = logging.getLogger(__name__) +LIVE_DEMO_MODE = os.environ.get("KEEP_LIVE_DEMO_MODE", "false").lower() == "true" + def on_starting(server=None): """This function is called by the gunicorn server when it starts""" logger.info("Keep server starting") migrate_db() - launch_uptime_reporting() # Create single tenant if it doesn't exist if AUTH_TYPE in [ @@ -54,4 +55,14 @@ def on_starting(server=None): public_url = ngrok_connection.public_url logger.info(f"ngrok tunnel: {public_url}") os.environ["KEEP_API_URL"] = public_url + logger.info("Keep server started") + + launch_uptime_reporting() + + if LIVE_DEMO_MODE: + logger.info("Launching Keep in demo mode.") + from keep.api.core.demo_mode_runner import launch_demo_mode + launch_demo_mode() + else: + logger.info("Not launching Keep in demo mode.") diff --git a/keep/api/core/demo_mode_runner.py b/keep/api/core/demo_mode_runner.py new file mode 100644 index 000000000..095c02f6c --- /dev/null +++ b/keep/api/core/demo_mode_runner.py @@ -0,0 +1,485 @@ +import datetime +import logging +import os +import random +import threading +import time +from datetime import timezone + +import requests +from dateutil import parser +from requests.models import PreparedRequest + +from keep.api.core.db import get_session_sync +from keep.api.core.dependencies import SINGLE_TENANT_UUID +from keep.api.logging import CONFIG +from keep.api.models.db.topology import TopologyServiceInDto +from keep.api.tasks.process_topology_task import process_topology +from keep.api.utils.tenant_utils import get_or_create_api_key +from keep.providers.providers_factory import ProvidersFactory +# import json + +logging.config.dictConfig(CONFIG) + +logger = logging.getLogger(__name__) + +# file_path = '/Users/matvey/Desktop/keep-oss/keep/pr.json' +# def read_json_file(file_path): +# with open(file_path, 'r') as file: +# return json.load(file) +# pr_json = read_json_file(file_path) + +correlation_rules_to_create = [ + { + "sqlQuery": {"sql": "((name like :name_1))", "params": {"name_1": "%mq%"}}, + "groupDescription": "This rule groups all alerts related to MQ.", + "ruleName": "Message Queue Buckle Up", + "celQuery": '(name.contains("mq"))', + "timeframeInSeconds": 86400, + "timeUnit": "hours", + "groupingCriteria": [], + "requireApprove": False, + "resolveOn": "never", + }, + { + "sqlQuery": { + "sql": "((name like :name_1) or (name = :name_2) or (name like :name_3))", + "params": { + "name_1": "%network_latency_high%", + "name_2": "high_cpu_usage", + "name_3": "%database_connection_failure%", + }, + }, + "groupDescription": "This rule groups alerts from multiple sources.", + "ruleName": "Application issue caused by DB load", + "celQuery": '(name.contains("network_latency_high")) || (name == "high_cpu_usage") || (name.contains("database_connection_failure"))', + "timeframeInSeconds": 86400, + "timeUnit": "hours", + "groupingCriteria": [], + "requireApprove": False, + "resolveOn": "never", + }, +] + +services_to_create = [ + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="api", + display_name="API Service", + environment="prod", + description="The main API service", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.1", + mac_address="", + category="Python", + manufacturer="", + dependencies={ + "db": "SQL", + "queue": "AMQP", + }, + application_ids=[], + updated_at="2024-11-18T09:23:46", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="ui", + display_name="Platform", + environment="prod", + description="The user interface (aka Platform)", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.2", + mac_address="", + category="nextjs", + manufacturer="", + dependencies={ + "api": "HTTP/S", + }, + application_ids=[], + updated_at="2024-11-18T09:29:25", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="db", + display_name="DB", + environment="prod", + description="Production Database", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.3", + mac_address="", + category="postgres", + manufacturer="", + dependencies={}, + application_ids=[], + updated_at="2024-11-18T09:30:44", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="queue", + display_name="Kafka", + environment="prod", + description="Production Queue", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.4", + mac_address="", + category="Kafka", + manufacturer="", + dependencies={ + "processor": "AMQP", + }, + application_ids=[], + updated_at="2024-11-18T09:31:31", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="processor", + display_name="Processor", + environment="prod", + description="Processing Service", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.5", + mac_address="", + category="go", + manufacturer="", + dependencies={ + "storage": "HTTP/S", + }, + application_ids=[], + updated_at="2024-11-18T10:02:20", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="backoffice", + display_name="Backoffice", + environment="prod", + description="Backoffice UI to control configuration", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="172.1.1.0", + mac_address="", + category="nextjs", + manufacturer="", + dependencies={ + "api": "HTTP/S", + }, + application_ids=[], + updated_at="2024-11-18T10:11:31", + ), + TopologyServiceInDto( + source_provider_id="Prod-Datadog", + repository="keephq/keep", + tags=[], + service="storage", + display_name="Storage", + environment="prod", + description="Storage Service", + team="keep", + email="support@keephq.dev", + slack="https://slack.keephq.dev", + ip_address="10.0.0.8", + mac_address="", + category="python", + manufacturer="", + dependencies={}, + application_ids=[], + updated_at="2024-11-18T10:13:56", + ), +] + +application_to_create = { + "name": "Main App", + "description": "It is the most critical business process ever imaginable.", + "services": [ + {"name": "API Service", "service": "api"}, + {"name": "DB", "service": "db"}, + {"name": "Kafka", "service": "queue"}, + {"name": "Processor", "service": "processor"}, + {"name": "Storage", "service": "storage"}, + ], +} + + +def get_or_create_topology(keep_api_key, keep_api_url): + services_existing = requests.get( + f"{keep_api_url}/topology", + headers={"x-api-key": keep_api_key}, + ) + services_existing.raise_for_status() + services_existing = services_existing.json() + + # Creating services + + if len(services_existing) == 0: + process_topology( + SINGLE_TENANT_UUID, services_to_create, "Prod-Datadog", "datadog" + ) + + # Create application + applications_existing = requests.get( + f"{keep_api_url}/topology/applications", + headers={"x-api-key": keep_api_key}, + ) + applications_existing.raise_for_status() + applications_existing = applications_existing.json() + + if len(applications_existing) == 0: + # Pull services again to get their ids + services_existing = requests.get( + f"{keep_api_url}/topology", + headers={"x-api-key": keep_api_key}, + ) + services_existing.raise_for_status() + services_existing = services_existing.json() + + # Update application_to_create with existing services ids + for service in application_to_create["services"]: + for existing_service in services_existing: + if service["name"] == existing_service["display_name"]: + service["id"] = existing_service["id"] + + response = requests.post( + f"{keep_api_url}/topology/applications", + headers={"x-api-key": keep_api_key}, + json=application_to_create, + ) + response.raise_for_status() + + +def get_or_create_correlation_rules(keep_api_key, keep_api_url): + correlation_rules_existing = requests.get( + f"{keep_api_url}/rules", + headers={"x-api-key": keep_api_key}, + ) + correlation_rules_existing.raise_for_status() + correlation_rules_existing = correlation_rules_existing.json() + + if len(correlation_rules_existing) == 0: + for correlation_rule in correlation_rules_to_create: + response = requests.post( + f"{keep_api_url}/rules", + headers={"x-api-key": keep_api_key}, + json=correlation_rule, + ) + response.raise_for_status() + + +def remove_old_incidents(keep_api_key, keep_api_url): + consider_old_timedelta = datetime.timedelta(minutes=30) + incidents_existing = requests.get( + f"{keep_api_url}/incidents", + headers={"x-api-key": keep_api_key}, + ) + incidents_existing.raise_for_status() + incidents_existing = incidents_existing.json()["items"] + + for incident in incidents_existing: + if parser.parse(incident["creation_time"]).replace(tzinfo=timezone.utc) < ( + datetime.datetime.now() - consider_old_timedelta + ).astimezone(timezone.utc): + incident_id = incident["id"] + response = requests.delete( + f"{keep_api_url}/incidents/{incident_id}", + headers={"x-api-key": keep_api_key}, + ) + response.raise_for_status() + + +def get_existing_installed_providers(keep_api_key, keep_api_url): + response = requests.get( + f"{keep_api_url}/providers", + headers={"x-api-key": keep_api_key}, + ) + response.raise_for_status() + return response.json()['installed_providers'] + +def simulate_alerts( + keep_api_url=None, + keep_api_key=None, + sleep_interval=5, + demo_correlation_rules=False, + demo_topology=False, +): + logger.info("Simulating alerts...") + + GENERATE_DEDUPLICATIONS = True + + providers = [ + "prometheus", + "grafana", + "cloudwatch", + "datadog", + ] + + providers_to_randomize_fingerprint_for = [ + "cloudwatch", + "datadog", + ] + + provider_classes = { + provider: ProvidersFactory.get_provider_class(provider) + for provider in providers + } + + while True: + try: + logger.info(f"Demo thread: Checking if server is up at {keep_api_url}...") + response = requests.get(keep_api_url) + response.raise_for_status() + break + except requests.exceptions.RequestException: + logger.info("Demo thread: API is not up yet. Waiting...") + time.sleep(5) + + existing_installed_providers = get_existing_installed_providers(keep_api_key, keep_api_url) + # existing_installed_providers = pr_json['installed_providers'] + logger.info(f"Existing installed providers: {existing_installed_providers}") + existing_providers_to_their_ids = {} + for existing_provider in existing_installed_providers: + if existing_provider['type'] in providers: + existing_providers_to_their_ids[existing_provider['type']] = existing_provider['id'] + + logger.info(f"Existing installed existing_providers_to_their_ids: {existing_providers_to_their_ids}") + + if demo_correlation_rules: + logger.info("Creating correlation rules...") + get_or_create_correlation_rules(keep_api_key, keep_api_url) + logger.info("Correlation rules created.") + if demo_topology: + logger.info("Creating topology...") + get_or_create_topology(keep_api_key, keep_api_url) + logger.info("Topology created.") + + while True: + try: + logger.info("Looping to send alerts...") + + logger.info("Removing old incidents...") + remove_old_incidents(keep_api_key, keep_api_url) + logger.info("Old incidents removed.") + + send_alert_url_params = {} + + # choose provider + provider_type = random.choice(providers) + send_alert_url = "{}/alerts/event/{}".format(keep_api_url, provider_type) + + if provider_type in existing_providers_to_their_ids: + send_alert_url_params["provider_id"] = existing_providers_to_their_ids[provider_type] + logger.info(f"Provider type: {provider_type}, send_alert_url_params now are: {send_alert_url_params}") + + provider = provider_classes[provider_type] + alert = provider.simulate_alert() + + if provider_type in providers_to_randomize_fingerprint_for: + send_alert_url_params["fingerprint"] = "".join( + random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=10) + ) + + # Determine number of times to send the same alert + num_iterations = 1 + if GENERATE_DEDUPLICATIONS: + num_iterations = random.randint(1, 3) + + for _ in range(num_iterations): + logger.info("Sending alert: {}".format(alert)) + try: + env = random.choice(["production", "staging", "development"]) + if not "provider_id" in send_alert_url_params: + send_alert_url_params["provider_id"] = f"{provider_type}-{env}" + prepared_request = PreparedRequest() + prepared_request.prepare_url(send_alert_url, send_alert_url_params) + logger.info(f"Sending alert to {prepared_request.url} with url params {send_alert_url_params}") + response = requests.post( + prepared_request.url, + headers={"x-api-key": keep_api_key}, + json=alert, + ) + response.raise_for_status() # Raise an HTTPError for bad responses + except requests.exceptions.RequestException as e: + logger.error("Failed to send alert: {}".format(e)) + time.sleep(sleep_interval) + continue + + if not response.ok: + logger.error("Failed to send alert: {}".format(response.text)) + else: + logger.info("Alert sent successfully") + except Exception as e: + logger.exception( + "Error in simulate_alerts", extra={"exception_str": str(e)} + ) + + logger.info( + "Sleeping for {} seconds before next iteration".format(sleep_interval) + ) + time.sleep(sleep_interval) + + +def launch_demo_mode(use_thread: bool = True): + """ + Running async demo in the backgound. + """ + logger.info("Demo mode launched.") + + keep_api_url = os.environ.get( + "KEEP_API_URL", "http://localhost:" + str(os.environ.get("PORT", 8080)) + ) + keep_api_key = os.environ.get("KEEP_READ_ONLY_BYPASS_KEY") + keep_sleep_interval = int(os.environ.get("KEEP_SLEEP_INTERVAL", 5)) + if keep_api_key is None: + with get_session_sync() as session: + keep_api_key = get_or_create_api_key( + session=session, + tenant_id=SINGLE_TENANT_UUID, + created_by="system", + unique_api_key_id="simulate_alerts", + system_description="Simulate Alerts API key", + ) + if use_thread: + thread = threading.Thread( + target=simulate_alerts, + kwargs={ + "keep_api_key": keep_api_key, + "keep_api_url": keep_api_url, + "sleep_interval": keep_sleep_interval, + "demo_correlation_rules": True, + "demo_topology": True, + }, + ) + thread.daemon = True + thread.start() + else: + simulate_alerts( + keep_api_key=keep_api_key, + keep_api_url=keep_api_url, + sleep_interval=keep_sleep_interval, + demo_correlation_rules=True, + demo_topology=True, + ) + logger.info("Demo mode initialized.") + + +if __name__ == "__main__": + launch_demo_mode(use_thread=False) diff --git a/keep/providers/cloudwatch_provider/alerts_mock.py b/keep/providers/cloudwatch_provider/alerts_mock.py index 5fe2ec41e..1fc2377fe 100644 --- a/keep/providers/cloudwatch_provider/alerts_mock.py +++ b/keep/providers/cloudwatch_provider/alerts_mock.py @@ -11,7 +11,7 @@ } }, "parameters": { - "Message.AlarmName": ["HighCPUUsage-1", "HighCPUUsage-2", "HighCPUUsage-3"], + "Message.AlarmName": ["HighCPUUsage", "HighCPUUsageOnAPod", "PodRecycled"], }, }, } diff --git a/keep/providers/datadog_provider/alerts_mock.py b/keep/providers/datadog_provider/alerts_mock.py index 21205eab0..0ff032a3b 100644 --- a/keep/providers/datadog_provider/alerts_mock.py +++ b/keep/providers/datadog_provider/alerts_mock.py @@ -11,8 +11,8 @@ }, "parameters": { "tags": [ - "environment:production,team:backend,monitor", - "environment:staging,team:backend,monitor", + "environment:production,team:backend,monitor,service:api", + "environment:staging,team:backend,monitor,service:api", ], "priority": ["P2", "P3", "P4"], }, @@ -29,8 +29,8 @@ }, "parameters": { "tags": [ - "environment:production,team:analytics,monitor", - "environment:staging,team:database,monitor", + "environment:production,team:analytics,monitor,service:api", + "environment:staging,team:database,monitor,service:api", ], "priority": ["P1", "P3", "P4"], }, diff --git a/keep/providers/grafana_provider/alerts_mock.py b/keep/providers/grafana_provider/alerts_mock.py index bcb940077..4d9d0357a 100644 --- a/keep/providers/grafana_provider/alerts_mock.py +++ b/keep/providers/grafana_provider/alerts_mock.py @@ -1,6 +1,7 @@ ALERTS = { "database_connection_failure": { "severity": "critical", + "service": "api", "title": "Database Connection Failure", "alerts": [ { @@ -48,6 +49,7 @@ ], }, "high_memory_usage": { + "service": "api", "payload": { "condition": "B", "data": [ @@ -92,6 +94,7 @@ }, }, "network_latency_high": { + "service": "db", "payload": { "condition": "C", "data": [ diff --git a/keep/providers/prometheus_provider/alerts_mock.py b/keep/providers/prometheus_provider/alerts_mock.py index 1287f1a68..d29197074 100644 --- a/keep/providers/prometheus_provider/alerts_mock.py +++ b/keep/providers/prometheus_provider/alerts_mock.py @@ -11,7 +11,7 @@ }, "parameters": { "labels.host": ["host1", "host2", "host3"], - "labels.service": ["calendar-producer-java-otel-api-dd", "kafka"], + "labels.service": ["calendar-producer-java-otel-api-dd", "kafka", "api", "queue", "db"], "labels.instance": ["instance1", "instance2", "instance3"], }, }, @@ -20,11 +20,12 @@ "summary": "Message queue is over 33% capacity", "labels": { "severity": "warning", + "customer_id": "acme" }, }, "parameters": { "labels.queue": ["queue1", "queue2", "queue3"], - "labels.service": ["calendar-producer-java-otel-api-dd", "kafka"], + "labels.service": ["calendar-producer-java-otel-api-dd", "kafka", "queue"], "labels.mq_manager": ["mq_manager1", "mq_manager2", "mq_manager3"], }, }, @@ -37,20 +38,20 @@ }, "parameters": { "labels.host": ["host1", "host2", "host3"], - "labels.service": ["calendar-producer-java-otel-api-dd", "kafka"], + "labels.service": ["calendar-producer-java-otel-api-dd", "kafka", "api", "queue", "db"], "labels.instance": ["instance1", "instance2", "instance3"], }, }, "network_latency_high": { "payload": { - "summary": "Network latency is higher than normal", + "summary": "Network latency is higher than normal for customer_id:acme", "labels": { "severity": "info", }, }, "parameters": { "labels.host": ["host1", "host2", "host3"], - "labels.service": ["calendar-producer-java-otel-api-dd", "kafka"], + "labels.service": ["calendar-producer-java-otel-api-dd", "kafka", "api", "queue", "db"], "labels.instance": ["instance1", "instance2", "instance3"], }, }, diff --git a/scripts/simulate_alerts.py b/scripts/simulate_alerts.py index fa1d0cc35..56f273efb 100644 --- a/scripts/simulate_alerts.py +++ b/scripts/simulate_alerts.py @@ -1,13 +1,8 @@ import logging import os -import random -import time -import requests +from keep.api.core.demo_mode_runner import simulate_alerts -from keep.providers.providers_factory import ProvidersFactory - -# configure logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s", @@ -18,51 +13,17 @@ def main(): - GENERATE_DEDUPLICATIONS = True - SLEEP_INTERVAL = float(os.environ.get("SLEEP_INTERVAL", 0.2)) # Configurable sleep interval from env variable + SLEEP_INTERVAL = float( + os.environ.get("SLEEP_INTERVAL", 0.2) + ) # Configurable sleep interval from env variable keep_api_key = os.environ.get("KEEP_API_KEY") keep_api_url = os.environ.get("KEEP_API_URL") or "http://localhost:8080" - if keep_api_key is None or keep_api_url is None: - raise Exception("KEEP_API_KEY and KEEP_API_URL must be set") - - providers = ["prometheus", "grafana"] - provider_classes = { - provider: ProvidersFactory.get_provider_class(provider) - for provider in providers - } - while True: - # choose provider - provider_type = random.choice(providers) - send_alert_url = "{}/alerts/event/{}".format(keep_api_url, provider_type) - provider = provider_classes[provider_type] - alert = provider.simulate_alert() - - # Determine number of times to send the same alert - num_iterations = 1 - if GENERATE_DEDUPLICATIONS: - num_iterations = random.randint(1, 3) - - for _ in range(num_iterations): - logger.info("Sending alert: {}".format(alert)) - try: - env = random.choice(["production", "staging", "development"]) - response = requests.post( - send_alert_url + f"?provider_id={provider_type}-{env}", - headers={"x-api-key": keep_api_key}, - json=alert, - ) - response.raise_for_status() # Raise an HTTPError for bad responses - except requests.exceptions.RequestException as e: - logger.error("Failed to send alert: {}".format(e)) - time.sleep(SLEEP_INTERVAL) - continue - - if response.status_code != 202: - logger.error("Failed to send alert: {}".format(response.text)) - else: - logger.info("Alert sent successfully") - - time.sleep(SLEEP_INTERVAL) # Wait for the configured interval before sending the next alert + simulate_alerts( + keep_api_key=keep_api_key, + keep_api_url=keep_api_url, + sleep_interval=SLEEP_INTERVAL, + demo_correlation_rules=False, + ) if __name__ == "__main__":