-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstreamlit-dashboard.py
112 lines (89 loc) · 3.2 KB
/
streamlit-dashboard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
A dashboard to visualize metrics and alerts.
Run locally with:
$ streamlit run streamlit-dashboard.py
"""
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
from utils import get_enabled_dagster_jobs, plot_time_series
from anomstack.config import specs
from anomstack.jinja.render import render
from anomstack.sql.read import read_sql
load_dotenv()
st.set_page_config(layout="wide")
@st.cache_data(ttl=60)
def get_data(sql: str, db: str) -> pd.DataFrame:
"""
Get data from the database.
"""
df = read_sql(sql, db=db)
return df
# Streamlit app
custom_css = """<style>a {text-decoration: none;}</style>"""
st.markdown(custom_css, unsafe_allow_html=True)
st.title("[Anomstack](https://github.com/andrewm4894/anomstack) Metrics Visualization")
# get metric batches of enabled jobs
enabled_jobs = get_enabled_dagster_jobs()
metric_batches = sorted(
[batch for batch in list(specs.keys()) if f"{batch}_ingest" in enabled_jobs]
)
# Create tabs for each metric batch
tabs = st.tabs(metric_batches)
for i, batch_selection in enumerate(metric_batches):
with tabs[i]:
with st.expander("Filters and Settings", expanded=False):
cols = st.columns([1, 6])
with cols[0]:
last_n = st.number_input(
"Last N:",
min_value=1,
value=5000,
help="Specify the number of recent records to fetch.",
key=f"last_n_{batch_selection}"
)
# get data
sql = render(
"dashboard_sql",
specs[batch_selection],
params={"alert_max_n": last_n}
)
db = specs[batch_selection]["db"]
df = get_data(sql, db)
if df.empty:
st.warning("No data found.")
continue
# data based inputs
metric_names = ["ALL"]
unique_metrics = sorted(
list(df[df["metric_batch"] == batch_selection]["metric_name"].unique())
)
metric_names.extend(unique_metrics)
with cols[1]:
metric_selection = st.selectbox(
f"Metric Name ({batch_selection}):",
metric_names,
key=f"metric_selection_{batch_selection}",
help="Select a metric to visualize."
)
# filter data and plot
if metric_selection == "ALL":
for metric in unique_metrics:
filtered_df = df[
(df["metric_batch"] == batch_selection)
& (df["metric_name"] == metric)
].sort_values(by="metric_timestamp")
# plot
fig = plot_time_series(filtered_df, metric)
st.plotly_chart(fig, use_container_width=True)
else:
filtered_df = df[
(df["metric_batch"] == batch_selection)
& (df["metric_name"] == metric_selection)
].sort_values(by="metric_timestamp")
# plot
fig = plot_time_series(filtered_df, metric_selection)
st.plotly_chart(fig, use_container_width=True)
# Display SQL query
with st.expander(f"Show SQL Query ({batch_selection})"):
st.code(sql, language="sql")