Skip to content

Commit

Permalink
wip: wpp
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasrodes committed May 28, 2024
1 parent e571186 commit f710ecd
Show file tree
Hide file tree
Showing 13 changed files with 1,693 additions and 68 deletions.
12 changes: 6 additions & 6 deletions dag/demography.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ steps:
- data://garden/demography/2023-03-31/population

# WPP (2024)
data://meadow/un/2024-07-11/un_wpp:
- snapshot://un/2024-07-11/un_wpp.zip
data://garden/un/2024-07-11/un_wpp:
- data://meadow/un/2024-07-11/un_wpp
data://grapher/un/2024-07-11/un_wpp:
- data://garden/un/2024-07-11/un_wpp
data-private://meadow/un/2024-07-11/un_wpp:
- snapshot-private://un/2024-07-11/un_wpp_population.csv
data-private://garden/un/2024-07-11/un_wpp:
- data-private://meadow/un/2024-07-11/un_wpp
data-private://grapher/un/2024-07-11/un_wpp:
- data-private://garden/un/2024-07-11/un_wpp

# Population (Fariss et al.)
data://meadow/demography/2023-12-20/population_fariss:
Expand Down
173 changes: 173 additions & 0 deletions etl/steps/data/garden/un/2024-07-11/un_wpp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import json
from pathlib import Path
from typing import Any, List

import owid.catalog.processing as pr
import pandas as pd

# from deaths import process as process_deaths
# from demographics import process as process_demographics
# from dep_ratio import process as process_depratio
# from fertility import process as process_fertility
from owid.catalog import Table
from population import process as process_population

from etl.helpers import PathFinder, create_dataset

# Get paths and naming conventions for current step.
paths = PathFinder(__file__)

YEAR_SPLIT = 2024


def run(dest_dir: str) -> None:
#
# Load inputs.
#
# Load meadow dataset.
ds_meadow = paths.load_dataset("un_wpp")
tb_population = ds_meadow["population"].reset_index()

#
# Process data.
#
tb_population = process_population(tb_population)

# Build tables list for dataset
tables = [
tb_population,
]

#
# Save outputs.
#
# Create a new garden dataset with the same metadata as the meadow dataset.
ds_garden = create_dataset(
dest_dir, tables=tables, check_variables_metadata=True, default_metadata=ds_meadow.metadata
)

# Save changes in the new garden dataset.
ds_garden.save()


#################################################################################
#################################################################################
# Old code below. Left in case it's needed for reference.
#################################################################################
#################################################################################
METRIC_CATEGORIES = {
"migration": [
"net_migration",
"net_migration_rate",
],
"fertility": [
"fertility_rate",
"births",
"birth_rate",
],
"population": [
"population",
"population_density",
"population_change",
"population_broad",
],
"mortality": [
"deaths",
"death_rate",
"life_expectancy",
"child_mortality_rate",
"infant_mortality_rate",
],
"demographic": [
"median_age",
"growth_natural_rate",
"growth_rate",
"sex_ratio",
],
}


def merge_dfs(dfs: List[Table]) -> Table:
"""Merge all datasets"""
df = pr.concat(dfs, ignore_index=True)
# Fix variant name
df.loc[df.year < YEAR_SPLIT, "variant"] = "estimates"
# Index
df = df.set_index(["location", "year", "metric", "sex", "age", "variant"], verify_integrity=True)
df = df.dropna(subset=["value"])
# df = df.sort_index()
return df


def load_country_mapping() -> Any:
with open(Path(__file__).parent / "un_wpp.countries.json") as f:
return json.load(f)


def get_wide_df(df: pd.DataFrame) -> pd.DataFrame:
df_wide = df.reset_index()
df_wide = df_wide.pivot(
index=["location", "year", "sex", "age", "variant"],
columns="metric",
values="value",
)
return df_wide


# def run_old(dest_dir: str) -> None:
# ds = paths.load_dataset("un_wpp")
# # country rename
# paths.log.info("Loading country standardised names...")
# country_std = load_country_mapping()
# # pocess
# paths.log.info("Processing population variables...")
# df_population_granular, df_population = process_population(ds["population"], country_std)
# paths.log.info("Processing fertility variables...")
# df_fertility = process_fertility(ds["fertility"], country_std)
# paths.log.info("Processing demographics variables...")
# df_demographics = process_demographics(ds["demographics"], country_std)
# paths.log.info("Processing dependency_ratio variables...")
# df_depratio = process_depratio(ds["dependency_ratio"], country_std)
# paths.log.info("Processing deaths variables...")
# df_deaths = process_deaths(ds["deaths"], country_std)
# # merge main df
# paths.log.info("Merging tables...")
# df = merge_dfs([df_population, df_fertility, df_demographics, df_depratio, df_deaths])
# # create tables
# table_long = df.update_metadata(
# short_name="un_wpp",
# description=(
# "Main UN WPP dataset by OWID. It comes in 'long' format, i.e. column"
# " 'metric' gives the metric name and column 'value' its corresponding"
# " value."
# ),
# )
# # generate sub-datasets
# tables = []
# for category, metrics in METRIC_CATEGORIES.items():
# paths.log.info(f"Generating table for category {category}...")
# tables.append(
# df.query(f"metric in {metrics}")
# .copy()
# .update_metadata(
# short_name=category,
# description=f"UN WPP dataset by OWID. Contains only metrics corresponding to sub-group {category}.",
# )
# )
# # add dataset with single-year age group population
# cols_index = ["location", "year", "metric", "sex", "age", "variant"]
# df_population_granular = df_population_granular.set_index(cols_index, verify_integrity=True)
# tables.append(
# df_population_granular.update_metadata(
# short_name="population_granular",
# description=(
# "UN WPP dataset by OWID. Contains only metrics corresponding to population for all dimensions (age and"
# " sex groups)."
# ),
# )
# )
# tables.append(table_long)

# # create dataset
# ds_garden = create_dataset(dest_dir, tables, default_metadata=ds.metadata)
# ds_garden.save()
99 changes: 99 additions & 0 deletions etl/steps/data/garden/un/2024-07-11/un_wpp/deaths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Deaths table"""
from typing import Any, Dict

import pandas as pd
from owid.catalog import Table

from .dtypes import optimize_dtypes

# Initial settings
COLUMNS_ID = {
"location": "location",
"time": "year",
"variant": "variant",
"sex": "sex",
}
COLUMNS_METRICS: Dict[str, Dict[str, Any]] = {
**{
f"_{i}": {
"name": "deaths",
"age": f"{i}",
}
for i in range(100)
},
**{
"_100plus": {
"name": "deaths",
"age": "100+",
}
},
}
MAPPING_SEX = {
"Both": "all",
"Female": "female",
"Male": "male",
}
COLUMNS_ORDER = ["location", "year", "metric", "sex", "age", "variant", "value"]


def process(df: Table, country_std: str) -> Table:
df = df.reset_index()
# Melt
df = df.melt(COLUMNS_ID.keys(), COLUMNS_METRICS.keys(), "metric", "value")
# Add columns, rename columns
df = df.rename(columns=COLUMNS_ID)
df = df.assign(
sex=df.sex.map(MAPPING_SEX),
age=df.metric.map({k: v["age"] for k, v in COLUMNS_METRICS.items()}),
variant=df.variant.apply(lambda x: x.lower()),
location=df.location.map(country_std),
metric="deaths",
value=(df.value * 1000).astype(int),
)
df = optimize_dtypes(df, simple=True)
# Add/Build age groups
df = add_age_groups(df)
# Dtypes
df = optimize_dtypes(df)
# Column order
df = df[COLUMNS_ORDER]
# Drop unmapped regions
df = df.dropna(subset=["location"])
return df


def add_age_groups(df: Table) -> Table:
# <1
df_0 = df[df.age == "0"].copy()
# 1-4
df_1_4 = df[df.age.isin(["1", "2", "3", "4"])].drop(columns="age").copy()
df_1_4 = (
df_1_4.groupby(
["location", "year", "metric", "sex", "variant"],
as_index=False,
observed=True,
)
.sum()
.assign(age="1-4")
)
# Basic 5-year age groups
age_map = {str(i): f"{i - i%5}-{i + 4 - i%5}" for i in range(0, 100)}
df_5 = df.assign(age=df.age.map(age_map)).copy()
df_5 = df_5.groupby(
["location", "year", "metric", "sex", "age", "variant"],
as_index=False,
observed=True,
).sum()
# Basic 10-year age groups
age_map = {str(i): f"{i - i%10}-{i + 9 - i%10}" for i in range(0, 100)}
df_10 = df.assign(age=df.age.map(age_map)).copy()
df_10 = df_10.groupby(
["location", "year", "metric", "sex", "age", "variant"],
as_index=False,
observed=True,
).sum()
# 100+ age group
df_100 = df[df.age == "100+"].copy()
# Merge all age groups
df = pd.concat([df_0, df_1_4, df_5, df_10, df_100], ignore_index=True) # type: ignore
return df
Loading

0 comments on commit f710ecd

Please sign in to comment.