Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visualize Dataset statistics in metadata panel #1472

Merged
merged 40 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f86ac6f
initial draft with hooks
ravi-kumar-pilla Jul 31, 2023
0370d56
modify test for formatFileSize
ravi-kumar-pilla Jul 31, 2023
e6e3898
add file size to the api response using fsspec
ravi-kumar-pilla Aug 1, 2023
6812664
update unit test for metadata panel
ravi-kumar-pilla Aug 2, 2023
ccafa98
remove print statements and update stats file
ravi-kumar-pilla Aug 2, 2023
a37459c
update get file size to not consider empty dir
ravi-kumar-pilla Aug 2, 2023
8c6965f
fixing linting and format errors
ravi-kumar-pilla Aug 2, 2023
83bf822
Merge branch 'main' of https://github.com/kedro-org/kedro-viz into fe…
ravi-kumar-pilla Aug 2, 2023
9466ea6
fix format and lint errors
ravi-kumar-pilla Aug 2, 2023
cf90243
fix pytest errors
ravi-kumar-pilla Aug 2, 2023
5a08448
add test cases and add fix for circle ci builds
ravi-kumar-pilla Aug 3, 2023
bb5a342
resolve PR comments
ravi-kumar-pilla Aug 4, 2023
9d66c92
fixing PR comments and add additional support for MemoryDataset
ravi-kumar-pilla Aug 4, 2023
3103894
update stats and modify file_size extraction
ravi-kumar-pilla Aug 4, 2023
bbbeb7d
fix lint and format errors
ravi-kumar-pilla Aug 4, 2023
72b8c74
fix lint errors
ravi-kumar-pilla Aug 4, 2023
36760b4
fix lint errors
ravi-kumar-pilla Aug 4, 2023
dd65977
fix lint errors
ravi-kumar-pilla Aug 4, 2023
86dd8ac
fix lint errors
ravi-kumar-pilla Aug 4, 2023
37f7059
fix lint errors
ravi-kumar-pilla Aug 4, 2023
3c231a0
Merge branch 'main' into feature/viz-size-datasets
tynandebold Aug 7, 2023
c3ff3e1
fix for PR comments
ravi-kumar-pilla Aug 8, 2023
5b1f7e4
add test coverage for transcoded data node
ravi-kumar-pilla Aug 8, 2023
f7a4dc1
address PR comments
ravi-kumar-pilla Aug 8, 2023
d84f01f
fix lint errors
ravi-kumar-pilla Aug 8, 2023
3b55684
modify test cases for hooks and utils
ravi-kumar-pilla Aug 9, 2023
18d9974
add matplotlib in requirements file for e2e tests
ravi-kumar-pilla Aug 9, 2023
beb1e5e
Merge branch 'main' into feature/viz-size-datasets
ravi-kumar-pilla Aug 9, 2023
4f3e77f
add design change for overflow
ravi-kumar-pilla Aug 10, 2023
b332dc2
Merge branch 'feature/viz-size-datasets' of https://github.com/kedro-…
ravi-kumar-pilla Aug 10, 2023
8ddfffe
add design change for overflow
ravi-kumar-pilla Aug 10, 2023
cf21083
remove matplotlib from requirements and fix metadata suggestions
ravi-kumar-pilla Aug 10, 2023
4213cf7
add release notes for visualizing dataset stats
ravi-kumar-pilla Aug 10, 2023
57c139b
add release notes for displaying dataset stats
ravi-kumar-pilla Aug 10, 2023
f917c22
hooks update based on Nok's comments
ravi-kumar-pilla Aug 11, 2023
7b88fc9
fix lint and format checks
ravi-kumar-pilla Aug 11, 2023
2d823da
modify stats based on Nok's comments
ravi-kumar-pilla Aug 11, 2023
381dfa4
fix lint and format
ravi-kumar-pilla Aug 11, 2023
6dd02ff
fixed failing unit test
ravi-kumar-pilla Aug 12, 2023
ac65d0d
update code based on Nok's suggestion
ravi-kumar-pilla Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cypress/tests/ui/flowchart/flowchart.cy.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,28 @@ describe('Flowchart DAG', () => {
.should('exist')
.and('have.text', `Oops, there's nothing to see here`);
});

it('verifies that users can open and see the dataset statistics in the metadata panel for datasets. #TC-51', () => {
const dataNodeText = 'Companies';

// Assert before action
cy.get('[data-label="Dataset statistics:]').should('not.exist');

// Action
cy.get('.pipeline-node > .pipeline-node__text')
.contains(dataNodeText)
.click({ force: true });

// Assert after action
cy.get('[data-label="Dataset statistics:"]').should('exist');
cy.get('[data-test=profiler-value-rows]')
.invoke('text')
.should((rowsValue) => expect(parseInt(rowsValue)).to.be.eq(77096));
cy.get('[data-test=profiler-value-columns]')
.invoke('text')
.should((colsValue) => expect(parseInt(colsValue)).to.be.eq(5));
cy.get('[data-test=profiler-value-file_size]')
.invoke('text')
.should((fileSizeValue) => expect(fileSizeValue).to.be.eq('1.8MB'));
});
});
17 changes: 17 additions & 0 deletions demo-project/stats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"companies": { "rows": 77096, "columns": 5 },
"reviews": { "rows": 77096, "columns": 10 },
"shuttles": { "rows": 77096, "columns": 13 },
"ingestion.int_typed_companies": { "rows": 77096, "columns": 5 },
"ingestion.int_typed_shuttles@pandas2": { "rows": 77096, "columns": 13 },
"ingestion.prm_agg_companies": { "rows": 50098, "columns": 5 },
"ingestion.int_typed_reviews": { "rows": 55790, "columns": 11 },
"prm_spine_table": { "rows": 29768, "columns": 3 },
"prm_shuttle_company_reviews": { "rows": 29768, "columns": 27 },
"feature_engineering.feat_static_features": { "rows": 29768, "columns": 12 },
"feature_engineering.feat_derived_features": { "rows": 29768, "columns": 3 },
"feature_importance_output": { "rows": 15, "columns": 2 },
"model_input_table": { "rows": 29768, "columns": 12 },
"X_train": { "rows": 23814, "columns": 11 },
"X_test": { "rows": 5954, "columns": 11 }
}
2 changes: 2 additions & 0 deletions package/kedro_viz/api/rest/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class DataNodeMetadataAPIResponse(BaseAPIResponse):
tracking_data: Optional[Dict]
run_command: Optional[str]
preview: Optional[Dict]
profiler: Optional[Dict]

class Config:
schema_extra = {
Expand All @@ -130,6 +131,7 @@ class TranscodedDataNodeMetadataAPIReponse(BaseAPIResponse):
original_type: str
transcoded_types: List[str]
run_command: Optional[str]
profiler: Optional[Dict]


class ParametersNodeMetadataAPIResponse(BaseAPIResponse):
Expand Down
6 changes: 4 additions & 2 deletions package/kedro_viz/api/rest/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ async def get_single_node_metadata(node_id: str):
return TaskNodeMetadata(node)

if isinstance(node, DataNode):
return DataNodeMetadata(node)
dataset_stats = data_access_manager.get_dataset_stats(node)
return DataNodeMetadata(node, dataset_stats if dataset_stats else {})
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(node, TranscodedDataNode):
return TranscodedDataNodeMetadata(node)
dataset_stats = data_access_manager.get_dataset_stats(node)
return TranscodedDataNodeMetadata(node, dataset_stats if dataset_stats else {})
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

return ParametersNodeMetadata(node)

Expand Down
27 changes: 27 additions & 0 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self):
)
self.runs = RunsRepository()
self.tracking_datasets = TrackingDatasetsRepository()
self.dataset_stats = {}

def set_db_session(self, db_session_class: sessionmaker):
"""Set db session on repositories that need it."""
Expand Down Expand Up @@ -91,6 +92,32 @@ def add_pipelines(self, pipelines: Dict[str, KedroPipeline]):
# Add the registered pipeline and its components to their repositories
self.add_pipeline(registered_pipeline_id, pipeline)

def add_dataset_stats(self, stats_dict: Union[Dict[str, int], None]):
"""Add dataset statistics (eg. rows, columns) as a dictionary.
This will help in showing the relevant stats in the metadata panel

Args:
stats_dict: A dictionary object loaded from stats.json file in the kedro project
"""
self.dataset_stats = stats_dict

def get_dataset_stats(
self, data_node: Union[DataNode, TranscodedDataNode]
) -> Union[Dict[str, int], None]:
"""Returns the dataset statistics for the data node if found else returns None

Args:
The data node for which we need the statistics
"""
if (
not data_node
or not self.dataset_stats
or data_node.name not in self.dataset_stats
):
return None

return self.dataset_stats[data_node.name]
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
"""Iterate through all the nodes and datasets in a "registered" pipeline
and add them to relevant repositories. Take care of extracting other relevant information
Expand Down
40 changes: 36 additions & 4 deletions package/kedro_viz/integrations/kedro/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,27 @@ def _bootstrap(project_path: Path):
return


def get_dataset_stats(project_path: Path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_dataset_stats(project_path: Path):
def get_dataset_stats(project_path: Path) -> dict:

"""Return the stats saved at stats.json"""
import json as json_lib
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

stats_file_path = project_path / "stats.json"

if not stats_file_path.exists():
return None

with open(stats_file_path, encoding="utf8") as stats_file:
stats = json_lib.load(stats_file)
return stats


def load_data(
project_path: Path,
env: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
) -> Tuple[DataCatalog, Dict[str, Pipeline], BaseSessionStore]:
) -> Tuple[
DataCatalog, Dict[str, Pipeline], BaseSessionStore, Optional[Dict[str, int]]
]:
"""Load data from a Kedro project.
Args:
project_path: the path whether the Kedro project is located.
Expand Down Expand Up @@ -91,7 +107,13 @@ def load_data(
# in case user doesn't have an active session down the line when it's first accessed.
# Useful for users who have `get_current_session` in their `register_pipelines()`.
pipelines_dict = dict(pipelines)
return catalog, pipelines_dict, session_store
stats_dict = (
dict(get_dataset_stats(project_path))
if get_dataset_stats(project_path) is not None
else {}
)
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

return catalog, pipelines_dict, session_store, stats_dict
elif KEDRO_VERSION.match(">=0.17.1"):
from kedro.framework.session import KedroSession

Expand All @@ -103,8 +125,13 @@ def load_data(
) as session:
context = session.load_context()
session_store = session._store
stats_dict = (
dict(get_dataset_stats(project_path))
if get_dataset_stats(project_path) is not None
else {}
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved
)

return context.catalog, context.pipelines, session_store
return context.catalog, context.pipelines, session_store, stats_dict
else:
# Since Viz is only compatible with kedro>=0.17.0, this just matches 0.17.0
from kedro.framework.session import KedroSession
Expand All @@ -120,8 +147,13 @@ def load_data(
) as session:
context = session.load_context()
session_store = session._store
stats_dict = (
dict(get_dataset_stats(project_path))
if get_dataset_stats(project_path) is not None
else {}
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved
)

return context.catalog, context.pipelines, session_store
return context.catalog, context.pipelines, session_store, stats_dict


# The dataset type is available as an attribute if and only if the import from kedro
Expand Down
66 changes: 66 additions & 0 deletions package/kedro_viz/integrations/kedro/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# pylint: disable=broad-exception-caught
"""`kedro_viz.integrations.kedro.hooks` defines hooks to add additional
functionalities for a kedro run."""

import logging
from collections import defaultdict
from typing import Any

from kedro.framework.hooks import hook_impl

logger = logging.getLogger(__name__)


class DatasetStatsHook:
"""Hook to collect dataset statistics during a kedro run
and save it to a JSON file"""

def __init__(self):
self._stats = defaultdict(dict)

@hook_impl
def after_dataset_loaded(self, dataset_name: str, data: Any):
"""Hook to be invoked after a dataset is loaded from the catalog.
Once the dataset is loaded, extract the required dataset statistics
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

Args:
dataset_name: name of the dataset that was saved to the catalog.
data: the actual data that was saved to the catalog.
"""
try:
import pandas as pd # pylint: disable=import-outside-toplevel

if isinstance(data, pd.DataFrame):
self._stats[dataset_name] = {}
self._stats[dataset_name]["rows"] = int(data.shape[0])
self._stats[dataset_name]["columns"] = int(data.shape[1])

except ImportError as exc: # pragma: no cover
logger.warning("%s : %s", exc.__class__.__name__, exc.msg)

except Exception as exc: # pragma: no cover
logger.error(
"Error creating the stats for the dataset %s : %s", dataset_name, exc
merelcht marked this conversation as resolved.
Show resolved Hide resolved
)

@hook_impl
def after_pipeline_run(self):
"""Hook to be invoked after a pipeline runs.
Once the pipeline run completes, write the dataset
statistics to stats.json file

"""
try:
import json as json_lib # pylint: disable=import-outside-toplevel

with open("stats.json", "w", encoding="utf8") as file:
json_lib.dump(self._stats, file)

except ImportError as exc: # pragma: no cover
logger.warning("%s : %s", exc.__class__.__name__, exc.msg)

except Exception as exc: # pragma: no cover
logger.error("Error writing the stats for the pipeline: %s", exc)


dataset_stats_hook = DatasetStatsHook()
42 changes: 39 additions & 3 deletions package/kedro_viz/models/flowchart.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kedro.pipeline.node import Node as KedroNode
from kedro.pipeline.pipeline import TRANSCODING_SEPARATOR, _strip_transcoding

from .utils import get_dataset_type
from .utils import get_dataset_type, get_file_size
noklam marked this conversation as resolved.
Show resolved Hide resolved

try:
# kedro 0.18.11 onwards
Expand Down Expand Up @@ -541,6 +541,7 @@ class DataNodeMetadata(GraphNodeMetadata):

# the underlying data node to which this metadata belongs
data_node: InitVar[DataNode]
dataset_stats: InitVar[Dict]

# the optional plot data if the underlying dataset has a plot.
# currently only applicable for PlotlyDataSet
Expand All @@ -557,12 +558,28 @@ class DataNodeMetadata(GraphNodeMetadata):

preview: Optional[Dict] = field(init=False, default=None)

profiler: Optional[Dict] = field(init=False, default=None)

# TODO: improve this scheme.
def __post_init__(self, data_node: DataNode):
def __post_init__(self, data_node: DataNode, dataset_stats: Dict):
self.type = data_node.dataset_type
dataset = cast(AbstractDataset, data_node.kedro_obj)
dataset_description = dataset._describe()
self.filepath = _parse_filepath(dataset_description)
self.profiler = dataset_stats
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

# TODO: Can we use _describe method of kedro-dataset plugin to get the file size
# by adding a file_size key to the return dict
# self.profiler["file_size"] = (
# dataset_description["file_size"]
# if "file_size" in dataset_description
# else 0
# )

# TODO: This will help to read the file size if the file path is present.
# If not, this will return 0. Not sure if this works for remote files
# Use fsspec to get the file size
self.profiler["file_size"] = get_file_size(self.filepath)

# Run command is only available if a node is an output, i.e. not a free input
if not data_node.is_free_input:
Expand Down Expand Up @@ -615,10 +632,15 @@ class TranscodedDataNodeMetadata(GraphNodeMetadata):

transcoded_types: List[str] = field(init=False)

profiler: Optional[Dict] = field(init=False, default=None)

# the underlying data node to which this metadata belongs
transcoded_data_node: InitVar[TranscodedDataNode]
dataset_stats: InitVar[Dict]

def __post_init__(self, transcoded_data_node: TranscodedDataNode):
def __post_init__(
self, transcoded_data_node: TranscodedDataNode, dataset_stats: Dict
):
original_version = transcoded_data_node.original_version

self.original_type = get_dataset_type(original_version)
Expand All @@ -629,6 +651,20 @@ def __post_init__(self, transcoded_data_node: TranscodedDataNode):

dataset_description = original_version._describe()
self.filepath = _parse_filepath(dataset_description)
self.profiler = dataset_stats

# TODO: Can we use _describe method of kedro-dataset plugin to get the file size
# by adding a file_size key to the return dict
# self.profiler["file_size"] = (
# dataset_description["file_size"]
# if "file_size" in dataset_description
# else 0
# )

# TODO: This will help to read the file size if the file path is present.
# If not, this will return 0. Not sure if this works for remote files
# Use fsspec to get the file size
self.profiler["file_size"] = get_file_size(self.filepath)

if not transcoded_data_node.is_free_input:
self.run_command = (
Expand Down
Loading