Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Better filtering of data & metadata in chart-diff #3667

Merged
merged 4 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions apps/wizard/app_pages/chart_diff/chart_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from structlog import get_logger

from apps.wizard.utils import get_staging_creation_time
from apps.wizard.utils.io import get_all_changed_catalog_paths
from etl import grapher_model as gm
from etl.config import OWID_ENV
from etl.db import read_sql
from etl.git_helpers import get_changed_files

log = get_logger()

Expand Down Expand Up @@ -200,7 +202,9 @@ def change_types(self) -> list[str]:
self._change_types.append("data")
if self.modified_checksum["metadataChecksum"].any():
self._change_types.append("metadata")
if self.target_chart and not self.configs_are_equal():
# NOTE: configs might differ and edited_in_staging is False if the chart had just
# data / metadata changes
if self.edited_in_staging and self.target_chart and not self.configs_are_equal():
self._change_types.append("config")

# TODO: Should uncomment this maybe?
Expand Down Expand Up @@ -353,7 +357,7 @@ def set_conflict_to_resolved(self, session: Session) -> None:
def configs_are_equal(self) -> bool:
"""Compare two chart configs, ignoring version, id and isPublished."""
assert self.target_chart is not None, "Target chart is None!"
return configs_are_equal(self.source_chart.config, self.target_chart.config)
return configs_are_equal(self.source_chart.config, self.target_chart.config, verbose=False)

@property
def details(self):
Expand Down Expand Up @@ -581,6 +585,19 @@ def _modified_data_metadata_on_staging(
params["chart_ids"] = tuple(chart_ids)
source_df = read_sql(query_source, source_session, params=params)

# no charts, return empty dataframe
if source_df.empty:
return pd.DataFrame(columns=["chartId", "dataEdited", "metadataEdited"]).set_index("chartId")

# Get all changed files and their catalog paths, including downstream dependencies.
files_changed = get_changed_files()
catalog_paths = get_all_changed_catalog_paths(files_changed)

# Exclude variables that haven't been changed by updating the files. This is to prevent showing
# spurious changes from lagging behind master.
dataset_paths = source_df.catalogPath.str.split("/").str[:4].str.join("/")
source_df = source_df[dataset_paths.isin(catalog_paths)]

# no charts, return empty dataframe
if source_df.empty:
return pd.DataFrame(columns=["chartId", "dataEdited", "metadataEdited"]).set_index("chartId")
Expand Down
65 changes: 55 additions & 10 deletions apps/wizard/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from apps.wizard.utils.cached import get_datasets_from_version_tracker
from etl.git_helpers import get_changed_files
from etl.grapher_io import get_all_datasets
from etl.paths import BASE_DIR, STEP_DIR
from etl.paths import BASE_DIR, SNAPSHOTS_DIR, STEP_DIR
from etl.steps import filter_to_subgraph, load_dag

# Initialize logger.
log = get_logger()
Expand Down Expand Up @@ -61,9 +62,8 @@ def get_steps_df(archived: bool = True):
########################################################################################################################


def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]:
"""Get list of new grapher steps with their corresponding old steps."""
grapher_steps = []
def get_changed_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]:
changed_steps = []
for file_path, file_status in files_changed.items():
# File status can be: D (deleted), A (added), M (modified).
# NOTE: In principle, we could select only "A" files. But it is possible that the user adds a new grapher step, and then commits changes to it, in which case (I think) the status would be "M".
Expand All @@ -73,15 +73,26 @@ def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[
# Skip deleted files.
continue

# Identify grapher data steps, and ignore the rest.
if file_path.startswith(STEP_DIR.relative_to(BASE_DIR).as_posix()) and file_path.endswith(".py"):
parts = Path(file_path).with_suffix("").as_posix().split("/")
if len(parts) == 4 and parts[-4] == "grapher":
grapher_steps.append(file_path)
# Identify potential recipes for data steps
if file_path.startswith(
(STEP_DIR.relative_to(BASE_DIR).as_posix(), SNAPSHOTS_DIR.relative_to(BASE_DIR).as_posix())
):
changed_steps.append(file_path)
else:
continue

return grapher_steps
return changed_steps


def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]:
"""Get list of new grapher steps with their corresponding old steps."""
steps = []
for step_path in get_changed_steps(files_changed):
if step_path.endswith(".py"):
parts = Path(step_path).with_suffix("").as_posix().split("/")
if len(parts) >= 4 and parts[-4] == "grapher":
steps.append(step_path)
return steps


def get_new_grapher_datasets_and_their_previous_versions(session: Session) -> Dict[int, Optional[int]]:
Expand Down Expand Up @@ -137,3 +148,37 @@ def get_new_grapher_datasets_and_their_previous_versions(session: Session) -> Di
new_datasets[ds_id] = previous_dataset

return new_datasets


def get_all_changed_catalog_paths(files_changed: Dict[str, Dict[str, str]]) -> List[str]:
"""Get all changed steps and their downstream dependencies."""
dataset_catalog_paths = []

# Get catalog paths of all datasets with changed files.
for step_path in get_changed_steps(files_changed):
Marigold marked this conversation as resolved.
Show resolved Hide resolved
abs_step_path = BASE_DIR / Path(step_path)
try:
# TODO: use StepPath from https://github.com/owid/etl/pull/3165 to refactor this
if step_path.startswith("snapshots/"):
ds_path = abs_step_path.relative_to(SNAPSHOTS_DIR).with_suffix("").with_suffix("").as_posix()
else:
ds_path = abs_step_path.relative_to(STEP_DIR / "data").with_suffix("").with_suffix("").as_posix()
dataset_catalog_paths.append(ds_path)
except ValueError:
continue

# NOTE:
# This is OK, as it filters down the DAG a little bit. But using VersionTracker.steps_df would be much more precise. You could do:
# steps_df[(steps_df["step"].isin([...])]["all_active_usages"]
# And that would give you only the steps that are affected by the changed files. That would be ultimately what we need. But I
# understand that loading steps_df is very slow.

# Add all downstream dependencies of those datasets.
DAG = load_dag()
dag_steps = list(filter_to_subgraph(DAG, dataset_catalog_paths, downstream=True).keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK, as it filters down the DAG a little bit. But using VersionTracker.steps_df would be much more precise. You could do:

steps_df[(steps_df["step"].isin([...])]["all_active_usages"]

And that would give you only the steps that are affected by the changed files. That would be ultimately what we need. But I understand that loading steps_df is very slow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I profiled steps_df, but couldn't find any low-hanging fruit that would significantly speed it up. It just does a lot of things, which takes time. We'd have to refactor it a lot to make it both fast enough for such a simple use case as this and flexible for ETL dashboard. Anyway, I copied your comment to code to not get lost.


# From data://... extract catalogPath
# TODO: use StepPath from https://github.com/owid/etl/pull/3165 to refactor this
catalog_paths = [step.split("://")[1] for step in dag_steps if step.startswith("data://")]

return catalog_paths
Loading