From ac9cc2c85692289e67ff630e370d7b6132a7c52a Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Wed, 16 Oct 2024 12:24:44 +0400 Subject: [PATCH] feat: User can set exact provider log level using environment variable (#2169) Co-authored-by: Tal Borenstein --- keep/contextmanager/contextmanager.py | 23 ++- keep/providers/base/base_provider.py | 8 +- tests/test_workflow_execution.py | 234 +++++++++++++++++++++++++- 3 files changed, 261 insertions(+), 4 deletions(-) diff --git a/keep/contextmanager/contextmanager.py b/keep/contextmanager/contextmanager.py index c46502f58..b044962c4 100644 --- a/keep/contextmanager/contextmanager.py +++ b/keep/contextmanager/contextmanager.py @@ -15,6 +15,7 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None): self.logger, self, tenant_id, workflow_id, workflow_execution_id ) self.workflow_id = workflow_id + self.workflow_execution_id = workflow_execution_id self.tenant_id = tenant_id self.steps_context = {} self.steps_context_size = 0 @@ -53,6 +54,7 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None): self.dependencies = set() self.workflow_execution_id = None self._api_key = None + self.__loggers = {} @property def api_key(self): @@ -73,9 +75,26 @@ def api_key(self): def set_execution_context(self, workflow_execution_id): self.workflow_execution_id = workflow_execution_id self.logger_adapter.workflow_execution_id = workflow_execution_id + for logger in self.__loggers.values(): + logger.workflow_execution_id = workflow_execution_id + + def get_logger(self, name=None): + if not name: + return self.logger_adapter - def get_logger(self): - return self.logger_adapter + if name in self.__loggers: + return self.__loggers[name] + + logger = logging.getLogger(name) + logger_adapter = WorkflowLoggerAdapter( + logger, + self, + self.tenant_id, + self.workflow_id, + self.workflow_execution_id, + ) + self.__loggers[name] = logger_adapter + return logger_adapter def set_event_context(self, event): self.event_context = event diff --git a/keep/providers/base/base_provider.py b/keep/providers/base/base_provider.py index 758c848de..0233b5837 100644 --- a/keep/providers/base/base_provider.py +++ b/keep/providers/base/base_provider.py @@ -66,7 +66,13 @@ def __init__( self.webhook_markdown = webhook_markdown self.provider_description = provider_description self.context_manager = context_manager - self.logger = context_manager.get_logger() + self.logger = context_manager.get_logger(self.provider_id) + self.logger.setLevel( + os.environ.get( + "KEEP_{}_PROVIDER_LOG_LEVEL".format(self.provider_id.upper()), + os.environ.get("LOG_LEVEL", "INFO"), + ) + ) self.validate_config() self.logger.debug( "Base provider initalized", extra={"provider": self.__class__.__name__} diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index 2faabce1b..8fb29a8f8 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -1,14 +1,19 @@ import asyncio +import logging import time +from collections import defaultdict from datetime import datetime, timedelta +from functools import partial +from unittest.mock import patch import pytest import pytz from keep.api.core.db import get_last_workflow_execution_by_workflow_id from keep.api.core.dependencies import SINGLE_TENANT_UUID +from keep.api.logging import WorkflowLoggerAdapter from keep.api.models.alert import AlertDto, AlertStatus, IncidentDto -from keep.api.models.db.workflow import Workflow +from keep.api.models.db.workflow import Workflow, WorkflowExecutionLog from keep.workflowmanager.workflowmanager import WorkflowManager from tests.fixtures.client import client, test_app # noqa @@ -44,6 +49,33 @@ """ +workflow_definition_with_two_providers = """workflow: +id: susu-and-sons +description: Just to test the logs of 2 providers +triggers: +- type: alert + filters: + - key: name + value: "server-is-hamburger" +steps: +- name: keep_step + provider: + type: keep + with: + filters: + - key: status + value: open +actions: +- name: console_action + provider: + type: console + with: + message: | + "Tier 1 Alert: {{ alert.name }} - {{ alert.description }} + Alert details: {{ alert }}" +""" + + @pytest.fixture(scope="module") def workflow_manager(): """ @@ -77,6 +109,25 @@ def setup_workflow(db_session): db_session.commit() +@pytest.fixture +def setup_workflow_with_two_providers(db_session): + """ + Fixture to set up a workflow in the database before each test. + It creates a Workflow object with the predefined workflow definition and adds it to the database. + """ + workflow = Workflow( + id="susu-and-sons", + name="susu-and-sons", + tenant_id=SINGLE_TENANT_UUID, + description="some stuff for unit testing", + created_by="tal@keephq.dev", + interval=0, + workflow_raw=workflow_definition_with_two_providers, + ) + db_session.add(workflow) + db_session.commit() + + @pytest.mark.parametrize( "test_app, test_case, alert_statuses, expected_tier, db_session", [ @@ -794,3 +845,184 @@ def wait_workflow_execution(workflow_id): assert workflow_execution_deleted.results["mock-action"] == [ '"deleted incident: incident"\n' ] + + + +logs_counter = {} + +def count_logs(instance, original_method): + log_levels = logging.getLevelNamesMapping() + def wrapper(*args, **kwargs): + level_name = original_method.__name__.upper() + max_level = instance.getEffectiveLevel() + current_level = log_levels[level_name] + if current_level >= max_level: + logs_counter.setdefault(instance.workflow_execution_id, defaultdict(int)) + logs_counter[instance.workflow_execution_id]["all"] += 1 + logs_counter[instance.workflow_execution_id][level_name] += 1 + + return original_method(*args, **kwargs) + + return wrapper + +def fake_workflow_adapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id): + adapter = WorkflowLoggerAdapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id) + + adapter.info = count_logs(adapter, adapter.info) + adapter.debug = count_logs(adapter, adapter.debug) + adapter.warning = count_logs(adapter, adapter.warning) + adapter.error = count_logs(adapter, adapter.error) + adapter.critical = count_logs(adapter, adapter.critical) + return adapter + + +@pytest.mark.parametrize( + "test_app, test_case, alert_statuses, expected_tier, db_session", + [ + ({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None), + ], + indirect=["test_app", "db_session"], +) +def test_workflow_execution_logs( + db_session, + test_app, + create_alert, + setup_workflow_with_two_providers, + workflow_manager, + test_case, + alert_statuses, + expected_tier, +): + with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter', + side_effect=fake_workflow_adapter),\ + patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True): + base_time = datetime.now(tz=pytz.utc) + + # Create alerts with specified statuses and timestamps + alert_statuses.reverse() + for time_diff, status in alert_statuses: + alert_status = ( + AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED + ) + create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff)) + + time.sleep(1) + # Create the current alert + current_alert = AlertDto( + id="grafana-1", + source=["grafana"], + name="server-is-hamburger", + status=AlertStatus.FIRING, + severity="critical", + fingerprint="fp1", + ) + + # Insert the current alert into the workflow manager + workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert]) + + # Wait for the workflow execution to complete + workflow_execution = None + count = 0 + status = None + while workflow_execution is None and count < 30 and status != "success": + workflow_execution = get_last_workflow_execution_by_workflow_id( + SINGLE_TENANT_UUID, "susu-and-sons" + ) + if workflow_execution is not None: + status = workflow_execution.status + time.sleep(1) + count += 1 + + # Check if the workflow execution was successful + assert workflow_execution is not None + assert workflow_execution.status == "success" + + logs = ( + db_session.query(WorkflowExecutionLog) + .filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id) + .all() + ) + + assert len(logs) == logs_counter[workflow_execution.id]["all"] + + +@pytest.mark.parametrize( + "test_app, test_case, alert_statuses, expected_tier, db_session", + [ + ({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None), + ], + indirect=["test_app", "db_session"], +) +def test_workflow_execution_logs_log_level_debug_console_provider( + db_session, + test_app, + create_alert, + setup_workflow_with_two_providers, + workflow_manager, + test_case, + alert_statuses, + expected_tier, + monkeypatch, +): + + logs_counts = {} + logs_level_counts = {} + for level in ["INFO", "DEBUG"]: + monkeypatch.setenv("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", level) + with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter', + side_effect=fake_workflow_adapter), \ + patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True): + base_time = datetime.now(tz=pytz.utc) + + # Create alerts with specified statuses and timestamps + alert_statuses.reverse() + for time_diff, status in alert_statuses: + alert_status = ( + AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED + ) + create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff)) + + time.sleep(1) + # Create the current alert + current_alert = AlertDto( + id="grafana-1-{}".format(level), + source=["grafana"], + name="server-is-hamburger", + status=AlertStatus.FIRING, + severity="critical", + fingerprint="fp1-{}".format(level), + ) + + # Insert the current alert into the workflow manager + workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert]) + + # Wait for the workflow execution to complete + workflow_execution = None + count = 0 + status = None + time.sleep(1) + while workflow_execution is None and count < 30 and status != "success": + workflow_execution = get_last_workflow_execution_by_workflow_id( + SINGLE_TENANT_UUID, "susu-and-sons" + ) + if workflow_execution is not None: + status = workflow_execution.status + time.sleep(1) + count += 1 + + # Check if the workflow execution was successful + assert workflow_execution is not None + assert workflow_execution.status == "success" + + logs_counts[workflow_execution.id] = logs_counter[workflow_execution.id]["all"] + logs_level_counts[level] = logs_counter[workflow_execution.id]["all"] + + for workflow_execution_id in logs_counts: + logs = ( + db_session.query(WorkflowExecutionLog) + .filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution_id) + .all() + ) + assert logs_counts[workflow_execution_id] == len(logs) + + assert logs_level_counts["DEBUG"] > logs_level_counts["INFO"]