Skip to content

Commit

Permalink
🎉 Enable converting from long to wide format in grapher step (#3674)
Browse files Browse the repository at this point in the history
* 🎉 Enable converting from long to wide format in grapher step
  • Loading branch information
Marigold authored Dec 13, 2024
1 parent b50f483 commit 2876953
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 45 deletions.
111 changes: 84 additions & 27 deletions etl/grapher_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _yield_wide_table(
table_to_yield = table_to_yield[[c for c in table_to_yield.columns if c not in dim_names]]

# Filter NaN values from dimensions and return dictionary
dim_dict: Dict[str, Any] = {n: v for n, v in zip(dim_names, dim_values) if pd.notnull(v)}
dim_dict = _create_dim_dict(dim_names, dim_values) # type: ignore

# Now iterate over every column in the original dataset and export the
# subset of data that we prepared above
Expand Down Expand Up @@ -171,34 +171,91 @@ def _yield_wide_table(
tab.metadata.short_name = short_name
tab.rename(columns={column: short_name}, inplace=True)

# add info about dimensions to metadata
if dim_dict:
tab[short_name].metadata.additional_info = {
"dimensions": {
"originalShortName": column,
"originalName": tab[short_name].metadata.title,
"filters": [
{"name": dim_name, "value": sanitize_numpy(dim_value)}
for dim_name, dim_value in dim_dict.items()
],
}
}

# Add dimensions to title (which will be used as variable name in grapher)
if tab[short_name].metadata.title:
# We use template as a title
if _uses_jinja(tab[short_name].metadata.title):
title_with_dims = _expand_jinja_text(tab[short_name].metadata.title, dim_dict)
# Otherwise use default
else:
title_with_dims = _title_column_and_dimensions(tab[short_name].metadata.title, dim_dict)
tab[short_name].metadata = _metadata_for_dimensions(tab[short_name].metadata, dim_dict, column)

tab[short_name].metadata.title = title_with_dims
yield tab

# traverse metadata and expand Jinja
tab[short_name].metadata = _expand_jinja(tab[short_name].metadata, dim_dict)

yield tab
def _metadata_for_dimensions(meta: catalog.VariableMeta, dim_dict: Dict[str, Any], column: str) -> catalog.VariableMeta:
"""Add dimensions to metadata and expand Jinja in metadata fields."""
# add info about dimensions to metadata
if dim_dict:
meta.additional_info = {
"dimensions": {
"originalShortName": column,
"originalName": meta.title,
"filters": [
{"name": dim_name, "value": sanitize_numpy(dim_value)} for dim_name, dim_value in dim_dict.items()
],
}
}

# Add dimensions to title (which will be used as variable name in grapher)
if meta.title:
# We use template as a title
if _uses_jinja(meta.title):
title_with_dims = _expand_jinja_text(meta.title, dim_dict)
# Otherwise use default
else:
title_with_dims = _title_column_and_dimensions(meta.title, dim_dict)

meta.title = str(title_with_dims)

# traverse metadata and expand Jinja
meta = _expand_jinja(meta, dim_dict)

return meta


def _create_dim_dict(dim_names: List[str], dim_values: List[Any]) -> Dict[str, Any]:
# Filter NaN values from dimensions and return dictionary
return {n: v for n, v in zip(dim_names, dim_values) if pd.notnull(v)}


def long_to_wide(long_tb: catalog.Table) -> catalog.Table:
"""Convert a long table to a wide table by unstacking dimensions. This function mimics the process that occurs
when a long table is upserted to the database. With this function, you can explicitly perform this transformation
in the grapher step and store a flattened dataset in the catalog."""

dim_names = [k for k in long_tb.primary_key if k not in ("year", "country", "date")]

# Unstack dimensions to a wide format
wide_tb = cast(catalog.Table, long_tb.unstack(level=dim_names)) # type: ignore

# Drop columns with all NaNs
wide_tb = wide_tb.dropna(axis=1, how="all")

# Get short names and metadata for all columns
short_names = []
metadatas = []
for dims in wide_tb.columns:
column = dims[0]

# Filter NaN values from dimensions and return dictionary
dim_dict = _create_dim_dict(dim_names, dims[1:])

# Create a short name from dimension values
short_name = _underscore_column_and_dimensions(column, dim_dict)

if short_name in short_names:
duplicate_short_name_ix = short_names.index(short_name)
# raise ValueError(f"Duplicate short name: {short_name} for column: {column} and dimensions: {dim_dict}")
duplicate_dim_dict = dict(zip(dim_names, wide_tb.columns[duplicate_short_name_ix][1:]))
raise ValueError(
f"Duplicate short name for column '{column}' with dim values:\n{duplicate_dim_dict}\n{dim_dict}"
)

short_names.append(short_name)

# Create metadata for the column from dimensions
metadatas.append(_metadata_for_dimensions(long_tb[dims[0]].metadata.copy(), dim_dict, column))

# Set column names to new short names and use proper metadata
wide_tb.columns = short_names
for col, meta in zip(wide_tb.columns, metadatas):
wide_tb[col].metadata = meta

return wide_tb


def _uses_jinja(text: Optional[str]):
Expand Down Expand Up @@ -570,7 +627,7 @@ def _adapt_table_for_grapher(table: catalog.Table, engine: Engine) -> catalog.Ta
variable_titles_counts = variable_titles.value_counts()
assert (
variable_titles_counts.empty or variable_titles_counts.max() == 1
), f"Variable titles are not unique ({variable_titles_counts[variable_titles_counts > 1].index})."
), f"Variable titles are not unique:\n{variable_titles_counts[variable_titles_counts > 1].index}."

# Remember original dimensions
dim_names = [n for n in table.index.names if n and n not in ("year", "date", "entity_id", "country")]
Expand Down
46 changes: 43 additions & 3 deletions etl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import sys
import tempfile
import time
from collections.abc import Generator
from contextlib import contextmanager
from functools import cache
Expand Down Expand Up @@ -200,6 +201,7 @@ def create_dataset(
default_metadata: Optional[Union[SnapshotMeta, catalog.DatasetMeta]] = None,
underscore_table: bool = True,
camel_to_snake: bool = False,
long_to_wide: Optional[bool] = None,
formats: List[FileFormat] = DEFAULT_FORMATS,
check_variables_metadata: bool = False,
run_grapher_checks: bool = True,
Expand All @@ -221,6 +223,7 @@ def create_dataset(
:param default_metadata: The default metadata to use for the dataset, could be either SnapshotMeta or DatasetMeta.
:param underscore_table: Whether to underscore the table name before adding it to the dataset.
:param camel_to_snake: Whether to convert camel case to snake case for the table name.
:param long_to_wide: Convert data in long format (with dimensions) to wide format (flattened).
:param check_variables_metadata: Check that all variables in tables have metadata; raise a warning otherwise.
:param run_grapher_checks: Run grapher checks on the dataset, only applies to grapher channel.
:param yaml_params: Dictionary of parameters that can be used in the metadata yaml file.
Expand Down Expand Up @@ -262,6 +265,11 @@ def create_dataset(

ds = _set_metadata_from_dest_dir(ds, dest_dir)

meta_path = get_metadata_path(str(dest_dir))

# Raise an error if there's a variable in YAML that is not in the dataset
extra_variables = "raise"

# add tables to dataset
used_short_names = set()
for table in tables:
Expand All @@ -270,16 +278,48 @@ def create_dataset(
if table.metadata.short_name in used_short_names:
raise ValueError(f"Table short name `{table.metadata.short_name}` is already in use.")
used_short_names.add(table.metadata.short_name)

from etl import grapher_helpers as gh

# Default long_to_wide for grapher channel is true
if long_to_wide is None:
long_to_wide = ds.metadata.channel == "grapher"

# Expand long to wide
if long_to_wide:
if ds.metadata.channel != "grapher":
log.warning("It is recommended to use long_to_wide=True only in the grapher channel")

dim_names = set(table.index.names) - {"country", "code", "year", "date", None}
if dim_names:
# First pass to update metadata from YAML
if meta_path.exists():
table.update_metadata_from_yaml(meta_path, table.m.short_name) # type: ignore
log.info("long_to_wide.start", shape=table.shape, short_name=table.m.short_name, dim_names=dim_names)
t = time.time()
table = gh.long_to_wide(table)
log.info("long_to_wide.end", shape=table.shape, short_name=table.m.short_name, t=time.time() - t)

# Ignore extra variables for the following pass of metadata
extra_variables = "ignore"
else:
log.info("long_to_wide.skip", short_name=table.m.short_name)

ds.add(table, formats=formats, repack=repack)

meta_path = get_metadata_path(str(dest_dir))
if meta_path.exists():
ds.update_metadata(meta_path, if_origins_exist=if_origins_exist, yaml_params=yaml_params, errors=errors)
ds.update_metadata(
meta_path,
if_origins_exist=if_origins_exist,
yaml_params=yaml_params,
errors=errors,
extra_variables=extra_variables,
)

# another override YAML file with higher priority
meta_override_path = get_metadata_path(str(dest_dir)).with_suffix(".override.yml")
if meta_override_path.exists():
ds.update_metadata(meta_override_path, if_origins_exist=if_origins_exist)
ds.update_metadata(meta_override_path, if_origins_exist=if_origins_exist, extra_variables=extra_variables)

# run grapher checks
if ds.metadata.channel == "grapher" and run_grapher_checks:
Expand Down
2 changes: 1 addition & 1 deletion etl/steps/data/garden/cancer/2024-08-30/gco_alcohol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def run(dest_dir: str) -> None:
#
tb = geo.harmonize_countries(df=tb, countries_file=paths.country_mapping_path)

# To display on grapher we need to replace "<0.1" with "0.05" and set the decimal places to 1 so that it shows up as <0.1 on the chart
# To display on grapher we need to replace "<0.1" with "0.05" and set the decimal places to 1 so that it shows up as <0.1 on the chart.
tb["value"] = tb["value"].replace("<0.1", "0.05")

tb = tb.format(["country", "year", "sex", "cancer", "indicator"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ def run(dest_dir: str) -> None:
#
# Load Statistical Review dataset and read its main table.
ds_review = paths.load_dataset("statistical_review_of_world_energy")
tb_review = ds_review["statistical_review_of_world_energy"]
tb_review = ds_review.read("statistical_review_of_world_energy", reset_index=False)

# Load Smil dataset and read its main table.
ds_smil = paths.load_dataset("smil_2017")
tb_smil = ds_smil["smil_2017"]
tb_smil = ds_smil.read("smil_2017", reset_index=False)

#
# Process data.
Expand Down
3 changes: 3 additions & 0 deletions etl/steps/data/garden/health/2023-05-04/global_wellbeing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def run(dest_dir: str) -> None:
# Reset index
df = df.reset_index()

# Fix typos
df.dimension = df.dimension.str.replace("villAge", "village")

#
# Process data.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ def run(dest_dir: str) -> None:
tb = geo.harmonize_countries(df=tb, countries_file=paths.country_mapping_path)
tb = combining_sexes_for_all_age_groups(tb)
tb = add_region_sum_aggregates(tb, ds_regions, ds_income_groups)
tb["tsr"] = tb["tsr"].astype(
"float16"
) # Ensure the column is of type float16 - was getting an error when it was float64
tb = tb.set_index(["country", "year", "age_group", "sex", "cohort_type"], verify_integrity=True)

#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ tables:
note: |-
All deaths in a country may not have been registered with a [cause of death](#dod:underlying-cause-of-death).
age_standardized_death_rate_per_100_000_standard_population:
title: Age-standardized deaths from << cause.lower() >> in << sex.lower() >> per 100,000 people
description_short: Reported deaths from << cause.lower() >> in << sex.lower() >> per 100,000 people.
title: Age-standardized deaths from << cause.lower() >> in << sex.lower() >> in those aged << age_group.lower() >> per 100,000 people
description_short: Reported deaths from << cause.lower() >> in << sex.lower() >> in those aged << age_group.lower() >> per 100,000 people.
description_key: ["The International Classification of Diseases (Version 10) codes that define << cause.lower() >> are << icd10_codes >>."]
unit: deaths per 100,000 people
display:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ tables:
note: |-
All deaths in a country may not have been registered with a [cause of death](#dod:underlying-cause-of-death).
age_standardized_death_rate_per_100_000_standard_population:
title: Age-standardized deaths from << cause.lower() >> in << sex.lower() >> per 100,000 people
description_short: Reported deaths from << cause.lower() >> in << sex.lower() >> per 100,000 people.
title: Age-standardized deaths from << cause.lower() >> in << sex.lower() >> in those aged << age_group.lower() >> per 100,000 people
description_short: Reported deaths from << cause.lower() >> in << sex.lower() >> in those aged << age_group.lower() >> per 100,000 people.
description_key: ["The International Classification of Diseases (Version 10) codes that define << cause.lower() >> are << icd10_codes >>."]
unit: deaths per 100,000 people
display:
Expand Down
2 changes: 1 addition & 1 deletion etl/steps/data/grapher/cancer/2024-09-06/gco_infections.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def run(dest_dir: str) -> None:
#
# Create a new grapher dataset with the same metadata as the garden dataset.
ds_grapher = create_dataset(
dest_dir, tables=[tb], check_variables_metadata=True, default_metadata=ds_garden.metadata
dest_dir, tables=[tb], check_variables_metadata=True, default_metadata=ds_garden.metadata, long_to_wide=False
)

# Save changes in the new grapher dataset.
Expand Down
2 changes: 1 addition & 1 deletion etl/steps/data/grapher/hmd/2024-11-19/hfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def run(dest_dir: str) -> None:
#
# Create a new grapher dataset with the same metadata as the garden dataset.
ds_grapher = create_dataset(
dest_dir, tables=tables, check_variables_metadata=True, default_metadata=ds_garden.metadata
dest_dir, tables=tables, check_variables_metadata=True, default_metadata=ds_garden.metadata, long_to_wide=False
)

# Save changes in the new grapher dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def run(dest_dir: str) -> None:
#
# Create a new grapher dataset with the same metadata as the garden dataset.
ds_grapher = create_dataset(
dest_dir, tables=[tb], check_variables_metadata=True, default_metadata=ds_garden.metadata
dest_dir, tables=[tb], check_variables_metadata=True, default_metadata=ds_garden.metadata, long_to_wide=False
)

# Save changes in the new grapher dataset.
Expand Down
7 changes: 6 additions & 1 deletion lib/catalog/owid/catalog/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def update_metadata(
if_source_exists: SOURCE_EXISTS_OPTIONS = "replace",
if_origins_exist: SOURCE_EXISTS_OPTIONS = "replace",
errors: Literal["ignore", "warn", "raise"] = "raise",
extra_variables: Literal["raise", "ignore"] = "raise",
) -> None:
"""
Load YAML file with metadata from given path and update metadata of dataset and its tables.
Expand Down Expand Up @@ -258,7 +259,11 @@ def update_metadata(
warnings.warn(str(e))
continue
table.update_metadata_from_yaml(
metadata_path, table_name, if_origins_exist=if_origins_exist, yaml_params=yaml_params
metadata_path,
table_name,
if_origins_exist=if_origins_exist,
yaml_params=yaml_params,
extra_variables=extra_variables,
)
table._save_metadata(join(self.path, table.metadata.checked_name + ".meta.json"))

Expand Down
6 changes: 6 additions & 0 deletions lib/catalog/owid/catalog/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ def _save_metadata(self, filename: str) -> None:
try:
json.dump(metadata, ostream, indent=2, default=str, allow_nan=False)
except ValueError as e:
# try to find a problematic field
for k, v in metadata["fields"].items():
try:
json.dumps(v, default=str, allow_nan=False)
except ValueError as e2:
raise ValueError(f"metadata field {k} contains NaNs:\n{v}") from e2
raise ValueError(f"metadata contains NaNs:\n{metadata}") from e

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion lib/catalog/owid/catalog/yaml_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def update_metadata_from_yaml(

tb.metadata.short_name = table_name

t_annot = annot["tables"][table_name]
t_annot = annot.get("tables", {}).get(table_name, {})

# validation
if extra_variables == "raise":
Expand Down
23 changes: 23 additions & 0 deletions tests/test_grapher_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,26 @@ def test_underscore_column_and_dimensions():
def test_title_column_and_dimensions():
assert gh._title_column_and_dimensions("A", {"age": "1"}) == "A - Age: 1"
assert gh._title_column_and_dimensions("A", {"age_group": "15-18"}) == "A - Age group: 15-18"


def test_long_to_wide():
df = pd.DataFrame(
{
"year": [2019, 2019, 2019, 2019],
"country": ["France", "France", "France", "France"],
"age": ["10-18", "19-25", "26-30", np.nan],
"deaths": [1, 2, 3, 4],
}
)
table = Table(df.set_index(["country", "year", "age"]))
table.deaths.metadata.unit = "people"
table.deaths.metadata.title = "Deaths"

wide = gh.long_to_wide(table)

assert list(wide.columns) == ["deaths", "deaths__age_10_18", "deaths__age_19_25", "deaths__age_26_30"]

assert wide["deaths"].m.title == "Deaths"
assert wide["deaths__age_10_18"].m.title == "Deaths - Age: 10-18"
assert wide["deaths__age_19_25"].m.title == "Deaths - Age: 19-25"
assert wide["deaths__age_26_30"].m.title == "Deaths - Age: 26-30"

0 comments on commit 2876953

Please sign in to comment.