forked from thunderhead-labs/hyperliquid-stats
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmetrics.py
71 lines (54 loc) · 2.29 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import inspect
import os
import time
from functools import wraps
from prom_utils import (
create_metric,
export_metrics,
start_prometheus_server, create_prometheus_labels,
)
"""
Important notes:
Prometheus metrics are mainly used to constantly monitor the health of the system.
Due to the nature of our usage with chainflip, where many metrics are one time events,
or don't decrease and timeout by default, we need to implement some custom logic to handle this.
When integrating new metrics, please consider the following:
- If the metric is a one time event, please make sure to reset so alert won't fire forever.
- If the metric is constantly increasing, please make sure to reset so alert won't fire forever.
"""
PORT = os.getenv("PORT", 9000)
start_prometheus_server(PORT)
is_online = create_metric("is_hyperliquid_stats_online", "gauge")
api_latency = create_metric("hyperliquid_stats_api_latency", "gauge", labels=["endpoint"])
api_failures = create_metric("hyperliquid_stats_api_failures", "gauge", labels=["endpoint"])
api_successes = create_metric("hyperliquid_stats_api_successes", "gauge", labels=["endpoint"])
# Metric update methods
def update_is_online(value: bool = True):
export_metrics(is_online, metric_value=int(value))
def update_api_latency(endpoint: str, latency: float):
labels = create_prometheus_labels(endpoint=endpoint)
export_metrics(api_latency, metric_value=latency, labels=labels)
def increment_api_failures(endpoint: str):
api_failures.labels(endpoint).inc()
def increment_api_successes(endpoint: str):
api_successes.labels(endpoint).inc()
# Helper decorators
def measure_api_latency(endpoint: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
increment_api_successes(endpoint)
try:
result = await func(*args, **kwargs)
if inspect.iscoroutine(result):
result = await result
except Exception as e:
increment_api_failures(endpoint)
print(f"Failed to resolve api {e}")
return None
latency = time.time() - start_time
update_api_latency(endpoint, latency)
return result
return wrapper
return decorator