-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Visualize Dataset statistics in metadata panel #1472
Changes from 21 commits
f86ac6f
0370d56
e6e3898
6812664
ccafa98
a37459c
8c6965f
83bf822
9466ea6
cf90243
5a08448
bb5a342
9d66c92
3103894
bbbeb7d
72b8c74
36760b4
dd65977
86dd8ac
37f7059
3c231a0
c3ff3e1
5b1f7e4
f7a4dc1
d84f01f
3b55684
18d9974
beb1e5e
4f3e77f
b332dc2
8ddfffe
cf21083
4213cf7
57c139b
f917c22
7b88fc9
2d823da
381dfa4
6dd02ff
ac65d0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
{ | ||
"companies": { "rows": 77096, "columns": 5, "file_size": 1810602 }, | ||
"reviews": { "rows": 77096, "columns": 10, "file_size": 2937144 }, | ||
"shuttles": { "rows": 77096, "columns": 13, "file_size": 4195290 }, | ||
"ingestion.int_typed_companies": { | ||
"rows": 77096, | ||
"columns": 5, | ||
"file_size": 550616 | ||
}, | ||
"ingestion.int_typed_shuttles@pandas1": { "file_size": 1235685 }, | ||
"ingestion.int_typed_shuttles@pandas2": { | ||
"rows": 77096, | ||
"columns": 13, | ||
"file_size": 1235685 | ||
}, | ||
"ingestion.int_typed_reviews": { | ||
"rows": 55790, | ||
"columns": 11, | ||
"file_size": 1335600 | ||
}, | ||
"prm_shuttle_company_reviews": { | ||
"rows": 29768, | ||
"columns": 27, | ||
"file_size": 1020356 | ||
}, | ||
"prm_spine_table": { "rows": 29768, "columns": 3, "file_size": 655994 }, | ||
"feature_engineering.feat_weighting_metrics": { "file_size": 0 }, | ||
"feature_engineering.feat_scaling_metrics": { "file_size": 0 }, | ||
"feature_importance_output": { "rows": 15, "columns": 2, "file_size": 455 }, | ||
"model_input_table": { "rows": 29768, "columns": 12, "file_size": 787351 }, | ||
"train_evaluation.linear_regression.regressor": { "file_size": 843 }, | ||
"train_evaluation.random_forest.regressor": { "file_size": 175421422 }, | ||
"reporting.cancellation_policy_breakdown": { "file_size": 8744 }, | ||
"reporting.price_histogram": { "file_size": 216598 }, | ||
"reporting.feature_importance": { "file_size": 8553 }, | ||
"reporting.cancellation_policy_grid": { "file_size": 3116 }, | ||
"reporting.confusion_matrix": { "file_size": 14748 }, | ||
"train_evaluation.linear_regression.r2_score": { "file_size": 37 }, | ||
"train_evaluation.random_forest.r2_score": { "file_size": 36 }, | ||
"train_evaluation.linear_regression.experiment_params": { "file_size": 102 }, | ||
"train_evaluation.random_forest.experiment_params": { "file_size": 338 }, | ||
"params:ingestion.typing.reviews.columns_as_floats": { | ||
"rows": 1, | ||
"file_size": 88 | ||
}, | ||
"ingestion.prm_agg_companies": { "rows": 50098, "columns": 5 }, | ||
"params:feature_engineering.feature.derived": { "rows": 3, "file_size": 88 }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a parameters? What does rows: 3 means? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs further discussion, on how to show parameter info. I added the rows as size of the list or dict. But, I think it does not make sense when it is termed as rows. Removing this for the POC |
||
"params:feature_engineering.feature.static": { "rows": 9, "file_size": 184 }, | ||
"feature_engineering.feat_static_features": { "rows": 29768, "columns": 12 }, | ||
"feature_engineering.feat_derived_features": { "rows": 29768, "columns": 3 }, | ||
"params:split_options": { "rows": 3, "file_size": 232 }, | ||
"X_train": { "rows": 23814, "columns": 11 }, | ||
"y_train": { "rows": 23814, "file_size": 381024 }, | ||
"params:train_evaluation.model_options.linear_regression": { | ||
"rows": 3, | ||
"file_size": 232 | ||
}, | ||
"params:train_evaluation.model_options.random_forest": { | ||
"rows": 3, | ||
"file_size": 232 | ||
}, | ||
"X_test": { "rows": 5954, "columns": 11 }, | ||
"y_test": { "rows": 5954, "file_size": 95264 } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,7 @@ def __init__(self): | |
) | ||
self.runs = RunsRepository() | ||
self.tracking_datasets = TrackingDatasetsRepository() | ||
self.dataset_stats = {} | ||
|
||
def set_db_session(self, db_session_class: sessionmaker): | ||
"""Set db session on repositories that need it.""" | ||
|
@@ -91,6 +92,25 @@ def add_pipelines(self, pipelines: Dict[str, KedroPipeline]): | |
# Add the registered pipeline and its components to their repositories | ||
self.add_pipeline(registered_pipeline_id, pipeline) | ||
|
||
def add_dataset_stats(self, stats_dict: Dict): | ||
"""Add dataset statistics (eg. rows, columns, file_size) as a dictionary. | ||
This will help in showing the relevant stats in the metadata panel | ||
|
||
Args: | ||
stats_dict: A dictionary object loaded from stats.json file in the kedro project | ||
""" | ||
|
||
self.dataset_stats = stats_dict | ||
|
||
def get_dataset_stats(self, data_node: Union[DataNode, TranscodedDataNode]) -> Dict: | ||
"""Returns the dataset statistics for the data node if found else returns None | ||
|
||
Args: | ||
The data node for which we need the statistics | ||
""" | ||
|
||
return self.dataset_stats.get(data_node.name, {}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be an empty dict. This method is called in the router.py where I was checking for None and initializing the stats to empty dict. Instead this way, we always get a dict for a data node |
||
|
||
def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): | ||
"""Iterate through all the nodes and datasets in a "registered" pipeline | ||
and add them to relevant repositories. Take care of extracting other relevant information | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -6,6 +6,8 @@ | |||||
# pylint: disable=missing-function-docstring, no-else-return | ||||||
|
||||||
import base64 | ||||||
import json as json_lib | ||||||
import logging | ||||||
from pathlib import Path | ||||||
from typing import Any, Dict, Optional, Tuple | ||||||
|
||||||
|
@@ -26,11 +28,13 @@ | |||||
plotly, | ||||||
tracking, | ||||||
) | ||||||
|
||||||
from kedro.io import DataCatalog | ||||||
from kedro.io.core import get_filepath_str | ||||||
from kedro.pipeline import Pipeline | ||||||
from semver import VersionInfo | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
KEDRO_VERSION = VersionInfo.parse(__version__) | ||||||
|
||||||
|
||||||
|
@@ -54,11 +58,34 @@ def _bootstrap(project_path: Path): | |||||
return | ||||||
|
||||||
|
||||||
def get_dataset_stats(project_path: Path): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"""Return the stats saved at stats.json | ||||||
|
||||||
Args: | ||||||
project_path: the path where the Kedro project is located. | ||||||
""" | ||||||
try: | ||||||
stats_file_path = project_path / "stats.json" | ||||||
|
||||||
if not stats_file_path.exists(): | ||||||
return {} | ||||||
|
||||||
with open(stats_file_path, encoding="utf8") as stats_file: | ||||||
stats = json_lib.load(stats_file) | ||||||
return stats | ||||||
|
||||||
except Exception as exc: # pylint: disable=broad-exception-caught | ||||||
logger.warning( | ||||||
"Unable to get dataset stats from project path %s : %s", project_path, exc | ||||||
merelcht marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) | ||||||
return {} | ||||||
|
||||||
|
||||||
def load_data( | ||||||
project_path: Path, | ||||||
env: Optional[str] = None, | ||||||
extra_params: Optional[Dict[str, Any]] = None, | ||||||
) -> Tuple[DataCatalog, Dict[str, Pipeline], BaseSessionStore]: | ||||||
) -> Tuple[DataCatalog, Dict[str, Pipeline], BaseSessionStore, Dict]: | ||||||
"""Load data from a Kedro project. | ||||||
Args: | ||||||
project_path: the path whether the Kedro project is located. | ||||||
|
@@ -91,7 +118,9 @@ def load_data( | |||||
# in case user doesn't have an active session down the line when it's first accessed. | ||||||
# Useful for users who have `get_current_session` in their `register_pipelines()`. | ||||||
pipelines_dict = dict(pipelines) | ||||||
return catalog, pipelines_dict, session_store | ||||||
stats_dict = dict(get_dataset_stats(project_path)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't |
||||||
|
||||||
return catalog, pipelines_dict, session_store, stats_dict | ||||||
elif KEDRO_VERSION.match(">=0.17.1"): | ||||||
from kedro.framework.session import KedroSession | ||||||
|
||||||
|
@@ -103,8 +132,9 @@ def load_data( | |||||
) as session: | ||||||
context = session.load_context() | ||||||
session_store = session._store | ||||||
stats_dict = dict(get_dataset_stats(project_path)) | ||||||
|
||||||
return context.catalog, context.pipelines, session_store | ||||||
return context.catalog, context.pipelines, session_store, stats_dict | ||||||
else: | ||||||
# Since Viz is only compatible with kedro>=0.17.0, this just matches 0.17.0 | ||||||
from kedro.framework.session import KedroSession | ||||||
|
@@ -120,8 +150,9 @@ def load_data( | |||||
) as session: | ||||||
context = session.load_context() | ||||||
session_store = session._store | ||||||
stats_dict = dict(get_dataset_stats(project_path)) | ||||||
|
||||||
return context.catalog, context.pipelines, session_store | ||||||
return context.catalog, context.pipelines, session_store, stats_dict | ||||||
|
||||||
|
||||||
# The dataset type is available as an attribute if and only if the import from kedro | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# pylint: disable=broad-exception-caught | ||
# pylint: disable=protected-access | ||
"""`kedro_viz.integrations.kedro.hooks` defines hooks to add additional | ||
functionalities for a kedro run.""" | ||
|
||
import json as json_lib | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we keep this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in some files we have json from Kedro datasets. To distinguish the library import from kedro datasets, I used this convention through out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have json from a dataset doesn't it make more sense to call that |
||
import logging | ||
import sys | ||
from collections import defaultdict | ||
from typing import Any | ||
|
||
import pandas as pd | ||
ravi-kumar-pilla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from kedro.framework.hooks import hook_impl | ||
from kedro.io import DataCatalog | ||
|
||
from kedro_viz.integrations.kedro.utils import stats_order | ||
from kedro_viz.models.utils import get_file_size | ||
|
||
try: | ||
# kedro 0.18.11 onwards | ||
from kedro.io import MemoryDataset | ||
except ImportError: # pragma: no cover | ||
# older versions | ||
from kedro.io import MemoryDataSet as MemoryDataset | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DatasetStatsHook: | ||
"""Class to collect dataset statistics during a kedro run | ||
and save it to a JSON file. The class currently supports | ||
(pd.DataFrame, list, dict and pd.core.series.Series) dataset instances""" | ||
|
||
def __init__(self): | ||
self._stats = defaultdict(dict) | ||
|
||
@hook_impl | ||
def after_catalog_created(self, catalog: DataCatalog): | ||
"""Hook to be invoked after a data catalog is created. | ||
Use this hook and get the file_size for the dataset if it has filepath. | ||
|
||
Args: | ||
catalog: The catalog that was created. | ||
""" | ||
try: | ||
datasets = catalog._data_sets | ||
|
||
for dataset_name, dataset in datasets.items(): | ||
if not isinstance(dataset, MemoryDataset): | ||
file_path = dataset._filepath # noqa: no-member | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is quite fragile, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for now we will track only DataFrame There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes even Circle CI build fails as the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May need to catch up on this later. Top of my head I think we should limit the scope to just DataFrame and don't try to address the most generic case now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, if that is the case, getting the file size without checking for extension here |
||
self._stats[dataset_name]["file_size"] = get_file_size(file_path) | ||
|
||
except Exception as exc: # pragma: no cover | ||
logger.warning( | ||
"Unable to process file_size stat for the dataset %s : %s", | ||
dataset_name, | ||
exc, | ||
) | ||
|
||
@hook_impl | ||
def after_dataset_loaded(self, dataset_name: str, data: Any): | ||
"""Hook to be invoked after a dataset is loaded from the catalog. | ||
Once the dataset is loaded, extract the required dataset statistics. | ||
The hook currently supports (pd.DataFrame, list, dict and pd.core.series.Series) | ||
dataset instances | ||
|
||
Args: | ||
dataset_name: name of the dataset that was saved to the catalog. | ||
data: the actual data that was saved to the catalog. | ||
""" | ||
try: | ||
if isinstance(data, pd.DataFrame): | ||
self._stats[dataset_name]["rows"] = int(data.shape[0]) | ||
self._stats[dataset_name]["columns"] = int(data.shape[1]) | ||
elif isinstance(data, (list, dict)): | ||
self._stats[dataset_name]["rows"] = int(len(data)) | ||
self._stats[dataset_name]["file_size"] = sys.getsizeof(data) | ||
elif isinstance(data, pd.core.series.Series): | ||
self._stats[dataset_name]["rows"] = int(len(data)) | ||
self._stats[dataset_name]["file_size"] = data.memory_usage(deep=True) | ||
|
||
except Exception as exc: # pragma: no cover | ||
logger.warning( | ||
"Error creating the stats for the dataset %s : %s", dataset_name, exc | ||
merelcht marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
@hook_impl | ||
def after_pipeline_run(self): | ||
"""Hook to be invoked after a pipeline runs. | ||
Once the pipeline run completes, write the dataset | ||
statistics to stats.json file | ||
|
||
""" | ||
try: | ||
with open("stats.json", "w", encoding="utf8") as file: | ||
sorted_stats_data = { | ||
dataset_name: stats_order(stats) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to order the JSON file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will help when reading the json in the frontend. Also, will be helpful if we have structured data for readability There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And can it be just a helper method stay within the class of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same apply to the other functions. Can we not have the |
||
for dataset_name, stats in self._stats.items() | ||
} | ||
json_lib.dump(sorted_stats_data, file) | ||
|
||
except Exception as exc: # pragma: no cover | ||
logger.warning("Error writing the stats for the pipeline: %s", exc) | ||
merelcht marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
dataset_stats_hook = DatasetStatsHook() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
"""`kedro_viz.integrations.kedro.utils` contains utility | ||
functions used in the `kedro_viz.integrations.kedro` package""" | ||
|
||
from typing import Dict | ||
|
||
|
||
def stats_order(stats: Dict): | ||
"""Sort the stats extracted from the datasets using the sort order | ||
|
||
Args: | ||
stats: A dictionary of statistics for a dataset | ||
|
||
Returns: A sorted dictionary based on the sort_order | ||
""" | ||
# Custom sort order | ||
sort_order = ["rows", "columns", "file_size"] | ||
return {stat: stats.get(stat) for stat in sort_order if stat in stats} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this transcoding @pandas1 @pandas2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stripped the transcoding now