Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3634 add snapshot to the report #1703

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -30,6 +31,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/api/lineage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from elementary.utils.pydantic_shim import BaseModel, validator

NodeUniqueIdType = str
NodeType = Literal["seed", "model", "source", "exposure"]
NodeType = Literal["snapshot", "seed", "model", "source", "exposure"]
NodeSubType = Literal["table", "view"]


Expand Down
32 changes: 30 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -36,6 +38,7 @@
class ModelsAPI(APIClient):
_ARTIFACT_TYPE_DIR_MAP = {
SeedSchema: "seeds",
SnapshotSchema: "snapshots",
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
Expand Down Expand Up @@ -133,6 +136,18 @@ def get_seeds(self) -> Dict[str, NormalizedSeedSchema]:
seeds[seed_unique_id] = normalized_seed
return seeds

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshot_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshot_results:
for snapshot_result in snapshot_results:
logger.info(f"3 ----- snapshot_result: {snapshot_result}")
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
logger.info(f"4 ----- snapshot_unique_id: {snapshot_unique_id}")
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_models(
self, exclude_elementary_models: bool = False
) -> Dict[str, NormalizedModelSchema]:
Expand Down Expand Up @@ -239,6 +254,12 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSeedSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: ModelSchema
Expand All @@ -258,15 +279,20 @@ def _normalize_dbt_artifact_dict(
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, SnapshotSchema, ModelSchema, ExposureSchema, SourceSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
SnapshotSchema: NormalizedSnapshotSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
Expand Down Expand Up @@ -308,7 +334,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -41,6 +42,11 @@ class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema):
artifact_type: str = Field("seed", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema):
artifact_type: str = Field("model", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -86,6 +95,8 @@ def get_report_data(
lineage_node_ids: List[str] = []
seeds = models_api.get_seeds()
lineage_node_ids.extend(seeds.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
models = models_api.get_models(exclude_elementary_models)
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, exposures, seeds, snapshots
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -209,8 +221,9 @@ def _serialize_models(
sources: Dict[str, NormalizedSourceSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **exposures, **seeds, **snapshots)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
'seed' as type
from {{ ref('elementary', 'dbt_seeds') }}
union all
select
unique_id,
null as depends_on_nodes,
null as materialization,
'snapshot' as type
from {{ ref('elementary', 'dbt_snapshots') }}
union all
select
unique_id,
depends_on_nodes,
Expand Down
24 changes: 24 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{% macro get_snapshots() %}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
unique_id,
database_name,
schema_name,
case when alias is not null then alias
else name end as table_name,
owner as owners,
tags,
package_name,
description,
original_path as full_path
from {{ ref('elementary', 'dbt_snapshots') }}
)

select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{% endmacro %}
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -43,6 +44,16 @@ def get_seeds(self) -> List[SeedSchema]:
seeds = [SeedSchema(**seed) for seed in seeds]
return seeds

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_models(self, exclude_elementary_models: bool = False) -> List[ModelSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_models",
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def load_meta(cls, meta):
ArtifactSchemaType = TypeVar("ArtifactSchemaType", bound=ArtifactSchema)


class SnapshotSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
table_name: str


class SeedSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
Expand Down
Loading