From c1d1e82bca98aa0dc6ec77992c3f1f0b2aa2274e Mon Sep 17 00:00:00 2001 From: Marigold Date: Mon, 2 Dec 2024 09:45:50 +0100 Subject: [PATCH] wip --- .../wizard/app_pages/chart_diff/chart_diff.py | 17 ++++++- apps/wizard/utils/io.py | 46 ++++++++++++++++--- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/apps/wizard/app_pages/chart_diff/chart_diff.py b/apps/wizard/app_pages/chart_diff/chart_diff.py index f24a3edbb6c..a2cf6ff8f17 100644 --- a/apps/wizard/app_pages/chart_diff/chart_diff.py +++ b/apps/wizard/app_pages/chart_diff/chart_diff.py @@ -10,9 +10,11 @@ from structlog import get_logger from apps.wizard.utils import get_staging_creation_time +from apps.wizard.utils.io import get_all_changed_catalog_paths from etl import grapher_model as gm from etl.config import OWID_ENV from etl.db import read_sql +from etl.git_helpers import get_changed_files log = get_logger() @@ -200,7 +202,9 @@ def change_types(self) -> list[str]: self._change_types.append("data") if self.modified_checksum["metadataChecksum"].any(): self._change_types.append("metadata") - if self.target_chart and not self.configs_are_equal(): + # NOTE: configs might differ and edited_in_staging is False if the chart had just + # data / metadata changes + if self.edited_in_staging and self.target_chart and not self.configs_are_equal(): self._change_types.append("config") # TODO: Should uncomment this maybe? @@ -353,7 +357,7 @@ def set_conflict_to_resolved(self, session: Session) -> None: def configs_are_equal(self) -> bool: """Compare two chart configs, ignoring version, id and isPublished.""" assert self.target_chart is not None, "Target chart is None!" - return configs_are_equal(self.source_chart.config, self.target_chart.config) + return configs_are_equal(self.source_chart.config, self.target_chart.config, verbose=False) @property def details(self): @@ -585,6 +589,15 @@ def _modified_data_metadata_on_staging( if source_df.empty: return pd.DataFrame(columns=["chartId", "dataEdited", "metadataEdited"]).set_index("chartId") + # Get all changed files and their catalog paths, including downstream dependencies. + files_changed = get_changed_files() + catalog_paths = get_all_changed_catalog_paths(files_changed) + + # Exclude variables that haven't been changed by updating the files. This is to prevent showing + # spurious changes from lagging behind master. + dataset_paths = source_df.catalogPath.str.split("/").str[:4].str.join("/") + source_df = source_df[dataset_paths.isin(catalog_paths)] + # read those variables from target where = """ v.catalogPath in %(catalog_paths)s diff --git a/apps/wizard/utils/io.py b/apps/wizard/utils/io.py index c131fe02b30..ea26bcf80fd 100644 --- a/apps/wizard/utils/io.py +++ b/apps/wizard/utils/io.py @@ -15,6 +15,7 @@ from etl.git_helpers import get_changed_files from etl.grapher_io import get_all_datasets from etl.paths import BASE_DIR, STEP_DIR +from etl.steps import filter_to_subgraph, load_dag # Initialize logger. log = get_logger() @@ -61,9 +62,8 @@ def get_steps_df(archived: bool = True): ######################################################################################################################## -def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]: - """Get list of new grapher steps with their corresponding old steps.""" - grapher_steps = [] +def get_changed_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]: + changed_steps = [] for file_path, file_status in files_changed.items(): # File status can be: D (deleted), A (added), M (modified). # NOTE: In principle, we could select only "A" files. But it is possible that the user adds a new grapher step, and then commits changes to it, in which case (I think) the status would be "M". @@ -75,13 +75,21 @@ def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[ # Identify grapher data steps, and ignore the rest. if file_path.startswith(STEP_DIR.relative_to(BASE_DIR).as_posix()) and file_path.endswith(".py"): - parts = Path(file_path).with_suffix("").as_posix().split("/") - if len(parts) == 4 and parts[-4] == "grapher": - grapher_steps.append(file_path) + changed_steps.append(file_path) else: continue - return grapher_steps + return changed_steps + + +def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]: + """Get list of new grapher steps with their corresponding old steps.""" + steps = [] + for step_path in get_changed_steps(files_changed): + parts = Path(step_path).with_suffix("").as_posix().split("/") + if len(parts) >= 4 and parts[-4] == "grapher": + steps.append(step_path) + return steps def get_new_grapher_datasets_and_their_previous_versions(session: Session) -> Dict[int, Optional[int]]: @@ -137,3 +145,27 @@ def get_new_grapher_datasets_and_their_previous_versions(session: Session) -> Di new_datasets[ds_id] = previous_dataset return new_datasets + + +def get_all_changed_catalog_paths(files_changed: Dict[str, Dict[str, str]]) -> List[str]: + """Get all changed steps and their downstream dependencies.""" + dataset_catalog_paths = [] + + # Get catalog paths of all datasets with changed files. + for step_path in get_changed_steps(files_changed): + abs_step_path = BASE_DIR / Path(step_path) + try: + # TODO: use StepPath from https://github.com/owid/etl/pull/3165 to refactor this + dataset_catalog_paths.append(abs_step_path.relative_to(STEP_DIR / "data").with_suffix("").as_posix()) + except ValueError: + continue + + # Add all downstream dependencies of those datasets. + DAG = load_dag() + dag_steps = list(filter_to_subgraph(DAG, dataset_catalog_paths, downstream=True).keys()) + + # From data://... extract catalogPath + # TODO: use StepPath from https://github.com/owid/etl/pull/3165 to refactor this + catalog_paths = [step.split("://")[1] for step in dag_steps if step.startswith("data://")] + + return catalog_paths