Skip to content

Commit

Permalink
Dashboard As Code for Reconcile (#768)
Browse files Browse the repository at this point in the history
closes #708 

The existing dashboard has been broken into 2.
For features unsupported by LSQL, we are using overrides to implement
the required widgets.

Conditional formatting depends on
databrickslabs/lsql#299

Screenshots:

![REMORPH Reconciliation
Metrics](https://github.com/user-attachments/assets/af7f6341-cecf-42d9-96b4-931a0347ed85)
![REMORPH Aggregate Reconciliation
Metrics](https://github.com/user-attachments/assets/2c70b071-e373-4ab4-8923-7706ae60b6ce)

-- co-authored by @bishwajit-db and @sundarshankar89

---------

Co-authored-by: Bishwajit <[email protected]>
  • Loading branch information
sundarshankar89 and bishwajit-db authored Oct 2, 2024
1 parent 8405131 commit dfc50e1
Show file tree
Hide file tree
Showing 62 changed files with 1,689 additions and 183 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies = [
"databricks-sdk~=0.29.0",
"sqlglot==25.8.1",
"databricks-labs-blueprint[yaml]>=0.2.3",
"databricks-labs-lsql>=0.4.3",
"databricks-labs-lsql>=0.7.5",
"cryptography>=41.0.3",
]

Expand Down
177 changes: 117 additions & 60 deletions src/databricks/labs/remorph/deployment/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,140 @@
import json
import logging
from datetime import timedelta
from importlib.abc import Traversable
from typing import Any
from pathlib import Path

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.lsql.dashboards import DashboardMetadata, Dashboards
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.errors import (
InvalidParameterValue,
NotFound,
DeadlineExceeded,
InternalError,
ResourceAlreadyExists,
)
from databricks.sdk.retries import retried
from databricks.sdk.service.dashboards import Dashboard
from databricks.sdk.service.dashboards import LifecycleState, Dashboard

from databricks.labs.remorph.config import ReconcileConfig, ReconcileMetadataConfig

logger = logging.getLogger(__name__)


class DashboardDeployment:
_UPLOAD_TIMEOUT = timedelta(seconds=30)

def __init__(self, ws: WorkspaceClient, installation: Installation, install_state: InstallState):
def __init__(
self,
ws: WorkspaceClient,
installation: Installation,
install_state: InstallState,
):
self._ws = ws
self._installation = installation
self._install_state = install_state

def deploy(self, name: str, dashboard_file: Traversable, parameters: dict[str, Any] | None = None):
logger.debug(f"Deploying dashboard {name} from {dashboard_file.name}")
dashboard_data = self._substitute_params(dashboard_file, parameters or {})
dashboard = self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Dashboard deployed with dashboard_id {dashboard.dashboard_id}")
logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}")
self._install_state.save()

@retried(on=[DatabricksError], timeout=_UPLOAD_TIMEOUT)
def _update_or_create_dashboard(self, name: str, dashboard_data, dashboard_file) -> Dashboard:
if name in self._install_state.dashboards:
def deploy(
self,
folder: Path,
config: ReconcileConfig,
):
"""
Create dashboards from Dashboard metadata files.
The given folder is expected to contain subfolders each containing metadata for individual dashboards.
:param folder: Path to the base folder.
:param config: Configuration for reconciliation.
"""
logger.info(f"Deploying dashboards from base folder {folder}")
parent_path = f"{self._installation.install_folder()}/dashboards"
try:
self._ws.workspace.mkdirs(parent_path)
except ResourceAlreadyExists:
logger.info(f"Dashboard parent path already exists: {parent_path}")

valid_dashboard_refs = set()
for dashboard_folder in folder.iterdir():
if not dashboard_folder.is_dir():
continue
valid_dashboard_refs.add(self._dashboard_reference(dashboard_folder))
dashboard = self._update_or_create_dashboard(dashboard_folder, parent_path, config.metadata_config)
logger.info(
f"Dashboard deployed with URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}"
)
self._install_state.save()

self._remove_deprecated_dashboards(valid_dashboard_refs)

def _dashboard_reference(self, folder: Path) -> str:
return f"{folder.stem}".lower()

# InternalError and DeadlineExceeded are retried because of Lakeview internal issues
# These issues have been reported to and are resolved by the Lakeview team
# Keeping the retry for resilience
@retried(on=[InternalError, DeadlineExceeded], timeout=timedelta(minutes=3))
def _update_or_create_dashboard(
self,
folder: Path,
ws_parent_path: str,
config: ReconcileMetadataConfig,
) -> Dashboard:
logging.info(f"Reading dashboard folder {folder}")
metadata = DashboardMetadata.from_path(folder).replace_database(
catalog=config.catalog,
catalog_to_replace="remorph",
database=config.schema,
database_to_replace="reconcile",
)

metadata.display_name = self._name_with_prefix(metadata.display_name)
reference = self._dashboard_reference(folder)
dashboard_id = self._install_state.dashboards.get(reference)
if dashboard_id is not None:
try:
dashboard_id = self._install_state.dashboards[name]
logger.info(f"Updating dashboard with id={dashboard_id}")
updated_dashboard = self._ws.lakeview.update(
dashboard_id,
display_name=self._name_with_prefix(name),
serialized_dashboard=dashboard_data,
)
return updated_dashboard
except InvalidParameterValue:
del self._install_state.dashboards[name]
logger.warning(f"Dashboard {name} does not exist anymore for some reason.")
return self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Creating new dashboard {name}")
new_dashboard = self._ws.lakeview.create(
display_name=self._name_with_prefix(name),
parent_path=self._install_state.install_folder(),
serialized_dashboard=dashboard_data,
dashboard_id = self._handle_existing_dashboard(dashboard_id, metadata.display_name)
except (NotFound, InvalidParameterValue):
logger.info(f"Recovering invalid dashboard: {metadata.display_name} ({dashboard_id})")
try:
dashboard_path = f"{ws_parent_path}/{metadata.display_name}.lvdash.json"
self._ws.workspace.delete(dashboard_path) # Cannot recreate dashboard if file still exists
logger.debug(
f"Deleted dangling dashboard {metadata.display_name} ({dashboard_id}): {dashboard_path}"
)
except NotFound:
pass
dashboard_id = None # Recreate the dashboard if it's reference is corrupted (manually)

dashboard = Dashboards(self._ws).create_dashboard(
metadata,
dashboard_id=dashboard_id,
parent_path=ws_parent_path,
warehouse_id=self._ws.config.warehouse_id,
publish=True,
)
assert new_dashboard.dashboard_id is not None
self._install_state.dashboards[name] = new_dashboard.dashboard_id
return new_dashboard

def _substitute_params(self, dashboard_file: Traversable, parameters: dict[str, Any]) -> str:
if not parameters:
return dashboard_file.read_text()

with dashboard_file.open() as f:
dashboard_data = json.load(f)

for dataset in dashboard_data.get("datasets", []):
for param in dataset.get("parameters", []):
if param["keyword"] in parameters:
param["defaultSelection"] = {
"values": {
"dataType": "STRING",
"values": [
{"value": parameters[param["keyword"]]},
],
},
}

return json.dumps(dashboard_data)
assert dashboard.dashboard_id is not None
self._install_state.dashboards[reference] = dashboard.dashboard_id
return dashboard

def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
return f"[{prefix.upper()}] {name}"

def _handle_existing_dashboard(self, dashboard_id: str, display_name: str) -> str | None:
dashboard = self._ws.lakeview.get(dashboard_id)
if dashboard.lifecycle_state is None:
raise NotFound(f"Dashboard life cycle state: {display_name} ({dashboard_id})")
if dashboard.lifecycle_state == LifecycleState.TRASHED:
logger.info(f"Recreating trashed dashboard: {display_name} ({dashboard_id})")
return None # Recreate the dashboard if it is trashed (manually)
return dashboard_id # Update the existing dashboard

def _remove_deprecated_dashboards(self, valid_dashboard_refs: set[str]):
for ref, dashboard_id in self._install_state.dashboards.items():
if ref not in valid_dashboard_refs:
try:
logger.info(f"Removing dashboard_id={dashboard_id}, as it is no longer needed.")
del self._install_state.dashboards[ref]
self._ws.lakeview.trash(dashboard_id)
except (InvalidParameterValue, NotFound):
logger.warning(f"Dashboard `{dashboard_id}` doesn't exist anymore for some reason.")
continue
9 changes: 4 additions & 5 deletions src/databricks/labs/remorph/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.labs.blueprint.upgrades import Upgrades
from databricks.labs.blueprint.wheels import WheelsV2
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.errors.platform import InvalidParameterValue

from databricks.labs.remorph.config import RemorphConfigs
from databricks.labs.remorph.deployment.recon import ReconDeployment
from databricks.labs.blueprint.wheels import WheelsV2

from databricks.sdk.errors.platform import InvalidParameterValue
from databricks.labs.blueprint.upgrades import Upgrades


logger = logging.getLogger("databricks.labs.remorph.install")

Expand Down Expand Up @@ -48,6 +46,7 @@ def _upload_wheel(self):
def install(self, config: RemorphConfigs):
wheel_paths: list[str] = self._upload_wheel()
if config.reconcile:
logger.info("Installing Remorph reconcile Metadata components.")
self._recon_deployment.install(config.reconcile, wheel_paths)
self._apply_upgrades()

Expand Down
46 changes: 9 additions & 37 deletions src/databricks/labs/remorph/deployment/recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.wheels import ProductInfo
from databricks.labs.blueprint.wheels import find_project_root
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue, NotFound

Expand All @@ -17,7 +18,6 @@

_RECON_PREFIX = "Reconciliation"
RECON_JOB_NAME = f"{_RECON_PREFIX} Runner"
RECON_METRICS_DASHBOARD_NAME = f"{_RECON_PREFIX} Metrics"


class ReconDeployment:
Expand All @@ -41,6 +41,7 @@ def __init__(

def install(self, recon_config: ReconcileConfig | None, wheel_paths: list[str]):
if not recon_config:
logger.warning("Recon Config is empty.")
return
logger.info("Installing reconcile components.")
self._deploy_tables(recon_config)
Expand Down Expand Up @@ -87,50 +88,21 @@ def _deploy_tables(self, recon_config: ReconcileConfig):

def _deploy_dashboards(self, recon_config: ReconcileConfig):
logger.info("Deploying reconciliation dashboards.")
self._deploy_recon_metrics_dashboard(RECON_METRICS_DASHBOARD_NAME, recon_config)
for dashboard_name, dashboard_id in self._get_deprecated_dashboards():
try:
logger.info(f"Removing dashboard_id={dashboard_id}, as it is no longer needed.")
del self._install_state.dashboards[dashboard_name]
self._ws.lakeview.trash(dashboard_id)
except (InvalidParameterValue, NotFound):
logger.warning(f"Dashboard `{dashboard_name}` doesn't exist anymore for some reason.")
continue

def _deploy_recon_metrics_dashboard(self, name: str, recon_config: ReconcileConfig):
dashboard_params = {
"catalog": recon_config.metadata_config.catalog,
"schema": recon_config.metadata_config.schema,
}

reconcile_dashboard_path = "reconcile/dashboards/Remorph-Reconciliation.lvdash.json"
dashboard_file = files(databricks.labs.remorph.resources).joinpath(reconcile_dashboard_path)
logger.info(f"Creating Reconciliation Dashboard `{name}`")
self._dashboard_deployer.deploy(name, dashboard_file, parameters=dashboard_params)
dashboard_base_dir = find_project_root(__file__) / "src/databricks/labs/remorph/resources/reconcile/dashboards"
self._dashboard_deployer.deploy(dashboard_base_dir, recon_config)

def _get_dashboards(self) -> list[tuple[str, str]]:
return [
(dashboard_name, dashboard_id)
for dashboard_name, dashboard_id in self._install_state.dashboards.items()
if dashboard_name.startswith(_RECON_PREFIX)
]

def _get_deprecated_dashboards(self) -> list[tuple[str, str]]:
return [
(dashboard_name, dashboard_id)
for dashboard_name, dashboard_id in self._install_state.dashboards.items()
if dashboard_name.startswith(_RECON_PREFIX) and dashboard_name != RECON_METRICS_DASHBOARD_NAME
]
return list(self._install_state.dashboards.items())

def _remove_dashboards(self):
logger.info("Removing reconciliation dashboards.")
for dashboard_name, dashboard_id in self._get_dashboards():
for dashboard_ref, dashboard_id in self._get_dashboards():
try:
logger.info(f"Removing dashboard {dashboard_name} with dashboard_id={dashboard_id}.")
del self._install_state.dashboards[dashboard_name]
logger.info(f"Removing dashboard with id={dashboard_id}.")
del self._install_state.dashboards[dashboard_ref]
self._ws.lakeview.trash(dashboard_id)
except (InvalidParameterValue, NotFound):
logger.warning(f"Dashboard `{dashboard_name}` doesn't exist anymore for some reason.")
logger.warning(f"Dashboard with id={dashboard_id} doesn't exist anymore for some reason.")
continue

def _deploy_jobs(self, recon_config: ReconcileConfig, remorph_wheel_path: str):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Aggregates Reconcile Table Metrics
### It provides the following information:

* Mismatch
* Missing in Source
* Missing in Target
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
columns:
- recon_id
- dd_recon_id
type: MULTI_SELECT
title: Recon Id
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- executed_by
type: MULTI_SELECT
title: Executed by
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- start_ts
title: Started At
type: DATE_RANGE_PICKER
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- source_type
type: MULTI_SELECT
title: Source Type
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- source_table
type: MULTI_SELECT
title: Source Table Name
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- target_table
type: MULTI_SELECT
title: Target Table Name
width: 2
Loading

0 comments on commit dfc50e1

Please sign in to comment.