-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanomaly_detection.py
58 lines (43 loc) · 1.95 KB
/
anomaly_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Databricks notebook source
DEFAULT_WHL_URL = "https://ml-team-public-read.s3.us-west-2.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_anomaly_detection-0.0.18-py3-none-any.whl"
dbutils.widgets.text("whl_override", DEFAULT_WHL_URL)
WHL_URL = dbutils.widgets.get("whl_override").strip() or DEFAULT_WHL_URL
# COMMAND ----------
get_ipython().run_line_magic("pip", f"install \"{WHL_URL}\"")
# COMMAND ----------
# restart python so library can be installed
dbutils.library.restartPython()
# COMMAND ----------
import json
from databricks.data_monitoring.anomalydetection.detection import run_anomaly_detection
from databricks.data_monitoring.anomalydetection.metric_config import (
FreshnessConfig, CompletenessConfig
)
dbutils.widgets.text("catalog_name", "my_catalog")
dbutils.widgets.text("schema_name", "my_schema")
dbutils.widgets.text("metric_configs", "[]")
dbutils.widgets.text("logging_table_name", "")
CATALOG_NAME = dbutils.widgets.get("catalog_name")
SCHEMA_NAME = dbutils.widgets.get("schema_name")
LOGGING_TABLE_NAME = dbutils.widgets.get("logging_table_name")
# Convert metric_configs JSON to objects
dict_list = json.loads(dbutils.widgets.get("metric_configs"))
decoded_configs = []
for config in dict_list:
metric_type = config.pop("metric_type", None)
if metric_type == "FreshnessConfig":
decoded_configs.append(FreshnessConfig.from_dict(config))
elif metric_type == "CompletenessConfig":
decoded_configs.append(CompletenessConfig.from_dict(config))
else:
raise ValueError(f"Unsupported metric_type: {metric_type}")
# COMMAND ----------
current_run_logging_table = run_anomaly_detection(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
metric_configs=decoded_configs,
logging_table_name=LOGGING_TABLE_NAME if len(LOGGING_TABLE_NAME) > 0 else None
)
# COMMAND ----------
# Display current run's logging table for all checks enabled
display(current_run_logging_table)