Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): memory consumption and performance improvements #2908

Merged
merged 18 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,5 @@ oauth2.cfg
scripts/keep_slack_bot.py
keepnew.db
providers_cache.json

tests/provision/*
12 changes: 5 additions & 7 deletions keep/api/alert_deduplicator/alert_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
DeduplicationRuleRequestDto,
)
from keep.providers.providers_factory import ProvidersFactory
from keep.searchengine.searchengine import SearchEngine

DEFAULT_RULE_UUID = "00000000-0000-0000-0000-000000000000"

Expand All @@ -42,7 +41,6 @@ class AlertDeduplicator:
def __init__(self, tenant_id):
self.logger = logging.getLogger(__name__)
self.tenant_id = tenant_id
self.search_engine = SearchEngine(self.tenant_id)

def _apply_deduplication_rule(
self, alert: AlertDto, rule: DeduplicationRuleDto
Expand Down Expand Up @@ -264,7 +262,7 @@ def _get_default_full_deduplication_rule(
ingested=0,
dedup_ratio=0.0,
enabled=True,
is_provisioned=False
is_provisioned=False,
)

def get_deduplications(self) -> list[DeduplicationRuleDto]:
Expand Down Expand Up @@ -502,15 +500,15 @@ def update_deduplication_rule(
rule_dto = self.create_deduplication_rule(rule, updated_by)
self.logger.info("Default rule updated")
return rule_dto

rule_before_update = get_deduplication_rule_by_id(self.tenant_id, rule_id)

if not rule_before_update:
raise HTTPException(
status_code=404,
detail="Deduplication rule not found",
)

if rule_before_update.is_provisioned:
raise HTTPException(
status_code=409,
Expand Down Expand Up @@ -557,7 +555,7 @@ def delete_deduplication_rule(self, rule_id: str) -> bool:
status_code=404,
detail="Deduplication rule not found",
)

if deduplication_rule_to_be_deleted.is_provisioned:
raise HTTPException(
status_code=409,
Expand Down
8 changes: 6 additions & 2 deletions keep/api/core/demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ async def simulate_alerts_async(
logger.info(
"Sleeping for {} seconds before next iteration".format(sleep_interval)
)
await asyncio.sleep(sleep_interval)
# await asyncio.sleep(sleep_interval)
talboren marked this conversation as resolved.
Show resolved Hide resolved


def launch_demo_mode_thread(
Expand Down Expand Up @@ -623,11 +623,14 @@ async def simulate_alerts_worker(worker_id, keep_api_key, rps=1):
url, alert = await REQUESTS_QUEUE.get()

async with session.post(url, json=alert, headers=headers) as response:
response_time = time.time() - start
total_requests += 1
if not response.ok:
logger.error("Failed to send alert: {}".format(response.text))
else:
logger.info("Alert sent successfully")
logger.info(
f"Alert sent successfully in {response_time:.3f} seconds"
)

if rps:
delay = 1 / rps - (time.time() - start)
Expand All @@ -639,6 +642,7 @@ async def simulate_alerts_worker(worker_id, keep_api_key, rps=1):
worker_id,
total_requests / (time.time() - total_start),
)
logger.info("Total requests: %d", total_requests)


if __name__ == "__main__":
Expand Down
54 changes: 53 additions & 1 deletion keep/api/core/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from prometheus_client import Counter, Gauge, Summary
from prometheus_client import Counter, Gauge, Histogram, Summary

PROMETHEUS_MULTIPROC_DIR = os.environ.get("PROMETHEUS_MULTIPROC_DIR", "/tmp/prometheus")
os.makedirs(PROMETHEUS_MULTIPROC_DIR, exist_ok=True)
Expand Down Expand Up @@ -37,3 +37,55 @@
labelnames=["pid"],
multiprocess_mode="livesum",
)

### WORKFLOWS
METRIC_PREFIX = "keep_workflows_"

# Workflow execution metrics
workflow_executions_total = Counter(
f"{METRIC_PREFIX}executions_total",
"Total number of workflow executions",
labelnames=["tenant_id", "workflow_id", "trigger_type"],
)

workflow_execution_errors_total = Counter(
f"{METRIC_PREFIX}execution_errors_total",
"Total number of workflow execution errors",
labelnames=["tenant_id", "workflow_id", "error_type"],
)

workflow_execution_status = Counter(
f"{METRIC_PREFIX}execution_status_total",
"Total number of workflow executions by status",
labelnames=["tenant_id", "workflow_id", "status"],
)

# Workflow performance metrics
workflow_execution_duration = Histogram(
f"{METRIC_PREFIX}execution_duration_seconds",
"Time spent executing workflows",
labelnames=["tenant_id", "workflow_id"],
buckets=(1, 5, 10, 30, 60, 120, 300, 600), # 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m
)

workflow_execution_step_duration = Histogram(
f"{METRIC_PREFIX}execution_step_duration_seconds",
"Time spent executing individual workflow steps",
labelnames=["tenant_id", "workflow_id", "step_name"],
buckets=(0.1, 0.5, 1, 2, 5, 10, 30, 60),
)

# Workflow state metrics
workflows_running = Gauge(
f"{METRIC_PREFIX}running",
"Number of currently running workflows",
labelnames=["tenant_id"],
multiprocess_mode="livesum",
)

workflow_queue_size = Gauge(
f"{METRIC_PREFIX}queue_size",
"Number of workflows waiting to be executed",
labelnames=["tenant_id"],
multiprocess_mode="livesum",
)
15 changes: 14 additions & 1 deletion keep/api/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def process(self, msg, kwargs):


LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
KEEP_LOG_FILE = os.environ.get("KEEP_LOG_FILE")

LOG_FORMAT_OPEN_TELEMETRY = "open_telemetry"
LOG_FORMAT_DEVELOPMENT_TERMINAL = "dev_terminal"
Expand Down Expand Up @@ -234,7 +235,7 @@ def format(self, record):
},
"dev_terminal": {
"()": DevTerminalFormatter,
"format": "%(asctime)s - %(levelname)s - %(message)s",
"format": "%(asctime)s - %(thread)s %(threadName)s %(levelname)s - %(message)s",
},
},
"handlers": {
Expand Down Expand Up @@ -369,6 +370,18 @@ def _log(


def setup_logging():
# Add file handler if KEEP_LOG_FILE is set
if KEEP_LOG_FILE:
CONFIG["handlers"]["file"] = {
"level": "DEBUG",
"formatter": ("json"),
"class": "logging.FileHandler",
"filename": KEEP_LOG_FILE,
"mode": "a",
}
# Add file handler to root logger
CONFIG["loggers"][""]["handlers"].append("file")

logging.config.dictConfig(CONFIG)
uvicorn_error_logger = logging.getLogger("uvicorn.error")
uvicorn_error_logger.__class__ = CustomizedUvicornLogger
34 changes: 20 additions & 14 deletions keep/api/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,26 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


def get_protocol_from_endpoint(endpoint):
parsed_url = urlparse(endpoint)
if parsed_url.scheme == "http":
return HTTPOTLPSpanExporter
elif parsed_url.scheme == "grpc":
return GRPCOTLPSpanExporter
else:
raise ValueError(f"Unsupported protocol: {parsed_url.scheme}")
parsed_url = urlparse(endpoint)
if parsed_url.scheme == "http":
return HTTPOTLPSpanExporter
elif parsed_url.scheme == "grpc":
return GRPCOTLPSpanExporter
else:
raise ValueError(f"Unsupported protocol: {parsed_url.scheme}")


def setup(app: FastAPI):
logger = logging.getLogger(__name__)
# Configure the OpenTelemetry SDK
service_name = os.environ.get("OTEL_SERVICE_NAME", os.environ.get("SERVICE_NAME", "keep-api"))
otlp_collector_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", os.environ.get("OTLP_ENDPOINT", False))
service_name = os.environ.get(
"OTEL_SERVICE_NAME", os.environ.get("SERVICE_NAME", "keep-api")
)
otlp_collector_endpoint = os.environ.get(
"OTEL_EXPORTER_OTLP_ENDPOINT", os.environ.get("OTLP_ENDPOINT", False)
)
otlp_traces_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)
otlp_logs_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", None)
otlp_metrics_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", None)
Expand All @@ -45,21 +51,21 @@ def setup(app: FastAPI):

resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)

if otlp_collector_endpoint:

logger.info(f"OTLP endpoint set to {otlp_collector_endpoint}")

if otlp_traces_endpoint:
logger.info(f"OTLP Traces endpoint set to {otlp_traces_endpoint}")
SpanExporter = get_protocol_from_endpoint(otlp_traces_endpoint)
processor = BatchSpanProcessor(
SpanExporter(endpoint=otlp_traces_endpoint)
)
processor = BatchSpanProcessor(SpanExporter(endpoint=otlp_traces_endpoint))
provider.add_span_processor(processor)

if metrics_enabled.lower() == "true" and otlp_metrics_endpoint:
logger.info(f"Metrics enabled. OTLP Metrics endpoint set to {otlp_metrics_endpoint}")
logger.info(
f"Metrics enabled. OTLP Metrics endpoint set to {otlp_metrics_endpoint}"
)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=otlp_metrics_endpoint)
)
Expand Down
Loading
Loading