Skip to content

Commit

Permalink
Set catalog on SchemaDeployer to overwrite the default `hive_metast…
Browse files Browse the repository at this point in the history
…ore` (#296)

Set catalog on `SchemaDeployer` to overwrite the default
`hive_metastore`

### Linked issues

Resolves #294
Needs #280 (tech debt to tackle later)
Progresses #278
Requires #287 for the CI to pass
  • Loading branch information
JCZuurmond authored Sep 26, 2024
1 parent 605498c commit 1d50c70
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 60 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"coverage[toml]>=6.5",
"pytest",
"pylint",
"databricks-labs-pytester>=0.2.1",
"pytest-xdist",
"pytest-cov>=4.0.0,<5.0.0",
"pytest-mock>=3.0.0,<4.0.0",
Expand Down
4 changes: 4 additions & 0 deletions src/databricks/labs/lsql/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
BadRequest,
DatabricksError,
DataLoss,
InternalError,
NotFound,
PermissionDenied,
Unknown,
)
from databricks.sdk.retries import retried
from databricks.sdk.service.compute import Language

from databricks.labs.lsql.core import Row, StatementExecutionExt
Expand Down Expand Up @@ -202,6 +204,8 @@ def __init__(self, ws: WorkspaceClient, warehouse_id, *, max_records_per_batch:
self._debug_truncate_bytes = debug_truncate_bytes if isinstance(debug_truncate_bytes, int) else 96
super().__init__(max_records_per_batch)

# InternalError is retried on for resilience on sporadic Databricks issues.
@retried(on=[InternalError], timeout=datetime.timedelta(minutes=1))
def execute(self, sql: str, *, catalog: str | None = None, schema: str | None = None) -> None:
logger.debug(f"[api][execute] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
self._sql.execute(sql, catalog=catalog, schema=schema)
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/lsql/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class StatementExecutionExt:
megabytes or gigabytes of data serialized in Apache Arrow format, and low result fetching latency, should use
the stateful Databricks SQL Connector for Python."""

def __init__( # pylint: disable=too-many-arguments
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
self,
ws: WorkspaceClient,
disposition: Disposition | None = None,
Expand Down
51 changes: 29 additions & 22 deletions src/databricks/labs/lsql/deployment.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,53 @@
import datetime as dt
import logging
import pkgutil
from typing import Any

from databricks.sdk.errors import InternalError
from databricks.sdk.retries import retried

from databricks.labs.lsql.backends import Dataclass, SqlBackend

logger = logging.getLogger(__name__)


class SchemaDeployer:
def __init__(self, sql_backend: SqlBackend, inventory_schema: str, mod: Any):
"""Deploy schema, tables, and views for a given inventory schema."""

def __init__(
self,
sql_backend: SqlBackend,
schema: str,
mod: Any,
*,
catalog: str = "hive_metastore",
) -> None:
self._sql_backend = sql_backend
self._inventory_schema = inventory_schema
self._schema = schema
self._module = mod
self._catalog = catalog

# InternalError are retried for resilience on sporadic Databricks issues
@retried(on=[InternalError], timeout=dt.timedelta(minutes=1))
def deploy_schema(self):
logger.info(f"Ensuring {self._inventory_schema} database exists")
self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.{self._inventory_schema}")

def delete_schema(self):
logger.info(f"deleting {self._inventory_schema} database")
def deploy_schema(self) -> None:
schema_full_name = f"{self._catalog}.{self._schema}"
logger.info(f"Ensuring {schema_full_name} database exists")
self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_full_name}")

self._sql_backend.execute(f"DROP SCHEMA IF EXISTS hive_metastore.{self._inventory_schema} CASCADE")
def delete_schema(self) -> None:
schema_full_name = f"{self._catalog}.{self._schema}"
logger.info(f"Deleting {schema_full_name} database")
self._sql_backend.execute(f"DROP SCHEMA IF EXISTS {schema_full_name} CASCADE")

def deploy_table(self, name: str, klass: Dataclass):
logger.info(f"Ensuring {self._inventory_schema}.{name} table exists")
self._sql_backend.create_table(f"hive_metastore.{self._inventory_schema}.{name}", klass)
def deploy_table(self, name: str, klass: Dataclass) -> None:
table_full_name = f"{self._catalog}.{self._schema}.{name}"
logger.info(f"Ensuring {table_full_name} table exists")
self._sql_backend.create_table(table_full_name, klass)

def deploy_view(self, name: str, relative_filename: str):
def deploy_view(self, name: str, relative_filename: str) -> None:
query = self._load(relative_filename)
logger.info(f"Ensuring {self._inventory_schema}.{name} view matches {relative_filename} contents")
ddl = f"CREATE OR REPLACE VIEW hive_metastore.{self._inventory_schema}.{name} AS {query}"
view_full_name = f"{self._catalog}.{self._schema}.{name}"
logger.info(f"Ensuring {view_full_name} view matches {relative_filename} contents")
ddl = f"CREATE OR REPLACE VIEW {view_full_name} AS {query}"
self._sql_backend.execute(ddl)

def _load(self, relative_filename: str) -> str:
data = pkgutil.get_data(self._module.__name__, relative_filename)
assert data is not None
sql = data.decode("utf-8")
sql = sql.replace("$inventory", f"hive_metastore.{self._inventory_schema}")
sql = sql.replace("$inventory", f"{self._catalog}.{self._schema}")
return sql
15 changes: 15 additions & 0 deletions tests/integration/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2

from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend

from . import views

INCORRECT_SCHEMA = """
from databricks.labs.lsql.backends import RuntimeBackend
from databricks.sdk.errors import NotFound
Expand Down Expand Up @@ -148,6 +151,18 @@ def test_statement_execution_backend_overrides(ws, env_or_skip):
assert len(rows) == 10


def test_statement_execution_backend_overwrites_table(ws, env_or_skip, make_random) -> None:
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
catalog = env_or_skip("TEST_CATALOG")
schema = env_or_skip("TEST_SCHEMA")

sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append")
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite")

rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo"))
assert rows == [Row(first="xyz", second=True)]


def test_runtime_backend_use_statements(ws):
product_info = ProductInfo.for_testing(SqlBackend)
installation = Installation.assume_user_home(ws, product_info.product_name())
Expand Down
33 changes: 10 additions & 23 deletions tests/integration/test_deployment.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
import pytest

from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.labs.lsql.deployment import SchemaDeployer

from . import views


@pytest.mark.xfail
def test_deploys_database(ws, env_or_skip, make_random):
# TODO: create per-project/per-scope catalog
schema = "default"
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
@pytest.mark.xfail(reason="Identity used in CI misses privileges to create UC resources")
def test_deploys_schema(ws, sql_backend, make_random, make_catalog) -> None:
"""Test deploying a full, minimal inventory schema with a single schema, table and view."""
catalog = make_catalog(name=f"lsql_test_{make_random()}")
schema_name = "lsql_test"
table_full_name = f"{catalog.name}.{schema_name}.foo"

deployer = SchemaDeployer(sql_backend, schema, views)
deployer = SchemaDeployer(sql_backend, schema_name, views, catalog=catalog.name)
deployer.deploy_schema()
deployer.deploy_table("foo", views.Foo)
deployer.deploy_view("some", "some.sql")

sql_backend.save_table(f"{schema}.foo", [views.Foo("abc", True)], views.Foo)
rows = list(sql_backend.fetch(f"SELECT * FROM {schema}.some"))
sql_backend.save_table(table_full_name, [views.Foo("abc", True)], views.Foo)

assert rows == [Row(name="abc", id=1)]


def test_overwrite(ws, env_or_skip, make_random):
schema = "default"
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
catalog = env_or_skip("TEST_CATALOG")
schema = env_or_skip("TEST_SCHEMA")

sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append")
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite")
rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo"))

assert rows == [Row(first="xyz", second=True)]
rows = list(sql_backend.fetch(f"SELECT * FROM {table_full_name}"))
assert rows == [Row(first="abc", second=1)]
71 changes: 57 additions & 14 deletions tests/unit/test_deployment.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
import logging
from dataclasses import dataclass

import pytest

from databricks.labs.lsql.backends import MockBackend
from databricks.labs.lsql.deployment import SchemaDeployer

from . import views


def test_deploys_view():
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
def test_deploys_schema(caplog, inventory_catalog: str) -> None:
mock_backend = MockBackend()
deployment = SchemaDeployer(
sql_backend=mock_backend,
inventory_schema="inventory",
schema="inventory",
mod=views,
catalog=inventory_catalog,
)

deployment.deploy_view("some", "some.sql")
with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
deployment.deploy_schema()

assert mock_backend.queries == [
"CREATE OR REPLACE VIEW hive_metastore.inventory.some AS SELECT\n id,\n name\nFROM hive_metastore.inventory.something"
]
assert mock_backend.queries == [f"CREATE SCHEMA IF NOT EXISTS {inventory_catalog}.inventory"]
assert f"Ensuring {inventory_catalog}.inventory database exists" in caplog.messages


@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
def test_deletes_schema(caplog, inventory_catalog: str) -> None:
mock_backend = MockBackend()
deployment = SchemaDeployer(
sql_backend=mock_backend,
schema="inventory",
mod=views,
catalog=inventory_catalog,
)

with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
deployment.delete_schema()

assert mock_backend.queries == [f"DROP SCHEMA IF EXISTS {inventory_catalog}.inventory CASCADE"]
assert f"Deleting {inventory_catalog}.inventory database" in caplog.messages


@dataclass
Expand All @@ -27,19 +49,40 @@ class Foo:
second: bool


def test_deploys_dataclass():
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
def test_deploys_dataclass(caplog, inventory_catalog: str) -> None:
mock_backend = MockBackend()
deployment = SchemaDeployer(
sql_backend=mock_backend,
inventory_schema="inventory",
schema="inventory",
mod=views,
catalog=inventory_catalog,
)
deployment.deploy_schema()
deployment.deploy_table("foo", Foo)
deployment.delete_schema()

with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
deployment.deploy_table("foo", Foo)

assert mock_backend.queries == [
f"CREATE TABLE IF NOT EXISTS {inventory_catalog}.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA",
]
assert f"Ensuring {inventory_catalog}.inventory.foo table exists" in caplog.messages


@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
def test_deploys_view(caplog, inventory_catalog: str) -> None:
mock_backend = MockBackend()
deployment = SchemaDeployer(
sql_backend=mock_backend,
schema="inventory",
mod=views,
catalog=inventory_catalog,
)

with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
deployment.deploy_view("some", "some.sql")

assert mock_backend.queries == [
"CREATE SCHEMA IF NOT EXISTS hive_metastore.inventory",
"CREATE TABLE IF NOT EXISTS hive_metastore.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA",
"DROP SCHEMA IF EXISTS hive_metastore.inventory CASCADE",
f"CREATE OR REPLACE VIEW {inventory_catalog}.inventory.some AS SELECT\n id,\n name\n"
f"FROM {inventory_catalog}.inventory.something"
]
assert f"Ensuring {inventory_catalog}.inventory.some view matches some.sql contents" in caplog.messages

0 comments on commit 1d50c70

Please sign in to comment.