diff --git a/apps/wizard/pages/dashboard.py b/apps/wizard/pages/dashboard.py index 3bfbb4aa9ca..4550f2c5889 100644 --- a/apps/wizard/pages/dashboard.py +++ b/apps/wizard/pages/dashboard.py @@ -65,6 +65,10 @@ "snapshot/wb/income_groups.xlsx", # World Bank country shapes. "snapshot/countries/world_bank.zip", + # World Bank WDI. + "snapshot/worldbank_wdi/wdi.zip", + "meadow/worldbank_wdi/wdi", + "garden/worldbank_wdi/wdi", # Other steps we don't want to update (because the underlying data does not get updated). # TODO: We need a better way to achieve this, for example adding update_period_days to all steps and snapshots. # A simpler alternative would be to move these steps to a separate file in a meaningful place. diff --git a/dag/archive/main.yml b/dag/archive/main.yml index c1d337d6a3a..9523961ff58 100644 --- a/dag/archive/main.yml +++ b/dag/archive/main.yml @@ -116,6 +116,28 @@ steps: data://grapher/un/2022-11-29/undp_hdr: - data://garden/un/2022-11-29/undp_hdr + # + # EM-DAT Natural disasters (2023). + # + data://meadow/emdat/2023-09-20/natural_disasters: + - snapshot://emdat/2023-09-20/natural_disasters.xlsx + # The following dataset has a table for yearly data and another for decadal data. + data://garden/emdat/2023-09-20/natural_disasters: + - data://meadow/emdat/2023-09-20/natural_disasters + - data://garden/demography/2023-03-31/population + - data://garden/wb/2023-04-30/income_groups + - data://garden/regions/2023-01-01/regions + - data://garden/worldbank_wdi/2023-05-29/wdi + # The following dataset has all (yearly and decadal) variables together. + data://grapher/emdat/2023-09-20/natural_disasters: + - data://garden/emdat/2023-09-20/natural_disasters + # The following dataset has only global data, and entity corresponds to the type of disaster. + data://grapher/emdat/2023-09-20/natural_disasters_global_by_type: + - data://garden/emdat/2023-09-20/natural_disasters + # Natural disasters explorer. + data://explorers/emdat/2023-09-20/natural_disasters: + - data://garden/emdat/2023-09-20/natural_disasters + # Include all active steps plus all archive steps. include: - dag/main.yml diff --git a/dag/main.yml b/dag/main.yml index c0801432278..e7099ae98c9 100644 --- a/dag/main.yml +++ b/dag/main.yml @@ -177,28 +177,6 @@ steps: data://grapher/un/2024-04-09/undp_hdr: - data://garden/un/2024-04-09/undp_hdr - # - # EM-DAT Natural disasters (2023). - # - data://meadow/emdat/2023-09-20/natural_disasters: - - snapshot://emdat/2023-09-20/natural_disasters.xlsx - # The following dataset has a table for yearly data and another for decadal data. - data://garden/emdat/2023-09-20/natural_disasters: - - data://meadow/emdat/2023-09-20/natural_disasters - - data://garden/demography/2023-03-31/population - - data://garden/wb/2023-04-30/income_groups - - data://garden/regions/2023-01-01/regions - - data://garden/worldbank_wdi/2023-05-29/wdi - # The following dataset has all (yearly and decadal) variables together. - data://grapher/emdat/2023-09-20/natural_disasters: - - data://garden/emdat/2023-09-20/natural_disasters - # The following dataset has only global data, and entity corresponds to the type of disaster. - data://grapher/emdat/2023-09-20/natural_disasters_global_by_type: - - data://garden/emdat/2023-09-20/natural_disasters - # Natural disasters explorer. - data://explorers/emdat/2023-09-20/natural_disasters: - - data://garden/emdat/2023-09-20/natural_disasters - # Country profiles - overview data://garden/country_profile/2022/overview: - backport://backport/owid/latest/dataset_5599_ihme__global_burden_of_disease__deaths_and_dalys__institute_for_health_metrics_and_evaluation__2022_04 @@ -720,6 +698,37 @@ steps: data://grapher/wb/2024-03-11/income_groups: - data://garden/wb/2024-03-11/income_groups + # + # EM-DAT Natural disasters. + # + data://meadow/emdat/2024-04-11/natural_disasters: + - snapshot://emdat/2024-04-11/natural_disasters.xlsx + + # The following dataset has a table for yearly data and another for decadal data. + data://garden/emdat/2024-04-11/natural_disasters: + - data://meadow/emdat/2024-04-11/natural_disasters + - data://garden/demography/2023-03-31/population + - data://garden/wb/2024-03-11/income_groups + - data://garden/regions/2023-01-01/regions + - data://garden/worldbank_wdi/2023-05-29/wdi + + # The following dataset has all (yearly and decadal) variables together. + data://grapher/emdat/2024-04-11/natural_disasters: + - data://garden/emdat/2024-04-11/natural_disasters + + # The following dataset has only global data, and entity corresponds to the type of disaster. + data://grapher/emdat/2024-04-11/natural_disasters_global_by_type: + - data://garden/emdat/2024-04-11/natural_disasters + + # Natural disasters explorer. + data://explorers/emdat/latest/natural_disasters: + - data://garden/emdat/2024-04-11/natural_disasters + + ###################################################################################################################### + # Older versions that should be archived once they are not used by any other steps. + + ###################################################################################################################### + include: - dag/open_numbers.yml - dag/faostat.yml diff --git a/etl/steps/data/explorers/emdat/latest/natural_disasters.py b/etl/steps/data/explorers/emdat/latest/natural_disasters.py new file mode 100644 index 00000000000..03942128ecf --- /dev/null +++ b/etl/steps/data/explorers/emdat/latest/natural_disasters.py @@ -0,0 +1,115 @@ +"""Natural disasters explorer data step. + +Loads the latest EM-DAT natural_disasters data from garden and stores a table (as a csv file) for yearly data, and +another for decadal data. + +NOTES: +* Some of the columns in the output files are not used by the explorer (but they appear in the "Sort by" dropdown menu), + consider removing them. For now, we'll ensure all of the old columns are present, to avoid any possible issues. +* Most charts in the explorer are generated from the data in the files, but 3 of them are directly linked to grapher + charts, namely: + "All disasters (by type) - Deaths - Decadal average - false" + "All disasters (by type) - Deaths - Decadal average - true" + "All disasters (by type) - Economic damages (% GDP) - Decadal average - false" + At some point it would be good to let the explorer take all the data from files. + +""" + +from owid.catalog import Table + +from etl.helpers import PathFinder, create_dataset + +paths = PathFinder(__file__) + +# Mapping of old to new disaster type names. +DISASTER_TYPE_RENAMING = { + "all_disasters": "all_disasters", + "drought": "drought", + "earthquake": "earthquake", + "extreme_temperature": "temperature", + "flood": "flood", + "fog": "fog", + "glacial_lake_outburst": "glacial_lake", + "landslide": "landslide", + "dry_mass_movement": "mass_movement", + "extreme_weather": "storm", + "volcanic_activity": "volcanic", + "wildfire": "wildfire", +} + + +def create_wide_tables(table: Table) -> Table: + """Convert input table from long to wide format, and adjust column names to adjust to the old names in the files + used by the explorer. + """ + # Adapt disaster type names to match those in the old explorer files. + table = table.reset_index() + table["type"] = table.astype({"type": str})["type"].replace(DISASTER_TYPE_RENAMING) + + # Create wide table. + table_wide = table.pivot(index=["country", "year"], columns="type", join_column_levels_with="_") + + # Rename columns to match the old names in explorer. + table_wide = table_wide.rename( + columns={ + column: column.replace("per_100k_people", "rate_per_100k") + .replace("total_dead", "deaths") + .replace("total_damages_per_gdp", "total_damages_pct_gdp") + for column in table_wide.columns + }, + errors="raise", + ) + + # Remove unnecessary columns. + table_wide = table_wide[ + [ + column + for column in table_wide.columns + if not column.startswith( + ("gdp_", "n_events_", "population_", "insured_damages_per_gdp", "reconstruction_costs_per_gdp_") + ) + if column + not in [ + "affected_rate_per_100k_glacial_lake", + "homeless_rate_per_100k_glacial_lake", + "total_damages_pct_gdp_fog", + ] + ] + ] + + # Set an appropriate index and sort conveniently. + table_wide = table_wide.format() + + return table_wide + + +def run(dest_dir: str) -> None: + # + # Load inputs. + # + # Load the latest dataset from garden. + ds_garden = paths.load_dataset("natural_disasters") + + # Read tables with yearly and decadal data. + tb_yearly = ds_garden["natural_disasters_yearly"] + tb_decadal = ds_garden["natural_disasters_decadal"] + + # + # Process data. + # + # Create wide tables adapted to the old format in explorers. + tb_yearly_wide = create_wide_tables(table=tb_yearly) + tb_decadal_wide = create_wide_tables(table=tb_decadal) + + # + # Save outputs. + # + # Initialize a new grapher dataset and add dataset metadata. + ds_grapher = create_dataset( + dest_dir, + tables=[tb_yearly_wide, tb_decadal_wide], + default_metadata=ds_garden.metadata, + check_variables_metadata=True, + formats=["csv"], + ) + ds_grapher.save() diff --git a/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.countries.json b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.countries.json new file mode 100644 index 00000000000..1ac1e755edb --- /dev/null +++ b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.countries.json @@ -0,0 +1,228 @@ +{ + "Afghanistan": "Afghanistan", + "Albania": "Albania", + "Algeria": "Algeria", + "American Samoa": "American Samoa", + "Angola": "Angola", + "Anguilla": "Anguilla", + "Antigua and Barbuda": "Antigua and Barbuda", + "Argentina": "Argentina", + "Armenia": "Armenia", + "Australia": "Australia", + "Austria": "Austria", + "Azerbaijan": "Azerbaijan", + "Bahamas": "Bahamas", + "Bangladesh": "Bangladesh", + "Barbados": "Barbados", + "Belarus": "Belarus", + "Belgium": "Belgium", + "Belize": "Belize", + "Benin": "Benin", + "Bermuda": "Bermuda", + "Bhutan": "Bhutan", + "Bolivia (Plurinational State of)": "Bolivia", + "Bosnia and Herzegovina": "Bosnia and Herzegovina", + "Botswana": "Botswana", + "Brazil": "Brazil", + "British Virgin Islands": "British Virgin Islands", + "Brunei Darussalam": "Brunei", + "Bulgaria": "Bulgaria", + "Burkina Faso": "Burkina Faso", + "Burundi": "Burundi", + "Cabo Verde": "Cape Verde", + "Cambodia": "Cambodia", + "Cameroon": "Cameroon", + "Canada": "Canada", + "Cayman Islands": "Cayman Islands", + "Central African Republic": "Central African Republic", + "Chad": "Chad", + "Chile": "Chile", + "China": "China", + "Colombia": "Colombia", + "Comoros": "Comoros", + "Congo": "Congo", + "Cook Islands": "Cook Islands", + "Costa Rica": "Costa Rica", + "Croatia": "Croatia", + "Cuba": "Cuba", + "Cyprus": "Cyprus", + "Czechia": "Czechia", + "Czechoslovakia": "Czechoslovakia", + "C\u00f4te d\u2019Ivoire": "Cote d'Ivoire", + "Democratic People's Republic of Korea": "North Korea", + "Democratic Republic of the Congo": "Democratic Republic of Congo", + "Denmark": "Denmark", + "Djibouti": "Djibouti", + "Dominica": "Dominica", + "Dominican Republic": "Dominican Republic", + "Ecuador": "Ecuador", + "Egypt": "Egypt", + "El Salvador": "El Salvador", + "Eritrea": "Eritrea", + "Estonia": "Estonia", + "Eswatini": "Eswatini", + "Ethiopia": "Ethiopia", + "Fiji": "Fiji", + "Finland": "Finland", + "France": "France", + "French Guiana": "French Guiana", + "French Polynesia": "French Polynesia", + "Gabon": "Gabon", + "Gambia": "Gambia", + "Georgia": "Georgia", + "German Democratic Republic": "East Germany", + "Germany": "Germany", + "Ghana": "Ghana", + "Greece": "Greece", + "Grenada": "Grenada", + "Guadeloupe": "Guadeloupe", + "Guam": "Guam", + "Guatemala": "Guatemala", + "Guinea": "Guinea", + "Guinea-Bissau": "Guinea-Bissau", + "Guyana": "Guyana", + "Haiti": "Haiti", + "Honduras": "Honduras", + "Hungary": "Hungary", + "Iceland": "Iceland", + "India": "India", + "Indonesia": "Indonesia", + "Iran (Islamic Republic of)": "Iran", + "Iraq": "Iraq", + "Ireland": "Ireland", + "Isle of Man": "Isle of Man", + "Israel": "Israel", + "Italy": "Italy", + "Jamaica": "Jamaica", + "Japan": "Japan", + "Jordan": "Jordan", + "Kazakhstan": "Kazakhstan", + "Kenya": "Kenya", + "Kiribati": "Kiribati", + "Kuwait": "Kuwait", + "Kyrgyzstan": "Kyrgyzstan", + "Lao People's Democratic Republic": "Laos", + "Latvia": "Latvia", + "Lebanon": "Lebanon", + "Lesotho": "Lesotho", + "Liberia": "Liberia", + "Libya": "Libya", + "Lithuania": "Lithuania", + "Luxembourg": "Luxembourg", + "Madagascar": "Madagascar", + "Malawi": "Malawi", + "Malaysia": "Malaysia", + "Maldives": "Maldives", + "Mali": "Mali", + "Malta": "Malta", + "Marshall Islands": "Marshall Islands", + "Martinique": "Martinique", + "Mauritania": "Mauritania", + "Mauritius": "Mauritius", + "Mexico": "Mexico", + "Micronesia (Federated States of)": "Micronesia (country)", + "Mongolia": "Mongolia", + "Montenegro": "Montenegro", + "Montserrat": "Montserrat", + "Morocco": "Morocco", + "Mozambique": "Mozambique", + "Myanmar": "Myanmar", + "Namibia": "Namibia", + "Nepal": "Nepal", + "Netherlands Antilles": "Netherlands Antilles", + "New Caledonia": "New Caledonia", + "New Zealand": "New Zealand", + "Nicaragua": "Nicaragua", + "Niger": "Niger", + "Nigeria": "Nigeria", + "Niue": "Niue", + "North Macedonia": "North Macedonia", + "Northern Mariana Islands": "Northern Mariana Islands", + "Norway": "Norway", + "Oman": "Oman", + "Pakistan": "Pakistan", + "Palau": "Palau", + "Panama": "Panama", + "Papua New Guinea": "Papua New Guinea", + "Paraguay": "Paraguay", + "Peru": "Peru", + "Philippines": "Philippines", + "Poland": "Poland", + "Portugal": "Portugal", + "Puerto Rico": "Puerto Rico", + "Qatar": "Qatar", + "Republic of Korea": "South Korea", + "Republic of Moldova": "Moldova", + "Romania": "Romania", + "Russian Federation": "Russia", + "Rwanda": "Rwanda", + "R\u00e9union": "Reunion", + "Saint Barth\u00e9lemy": "Saint Barthelemy", + "Saint Helena": "Saint Helena", + "Saint Kitts and Nevis": "Saint Kitts and Nevis", + "Saint Lucia": "Saint Lucia", + "Saint Martin (French Part)": "Saint Martin (French part)", + "Saint Vincent and the Grenadines": "Saint Vincent and the Grenadines", + "Samoa": "Samoa", + "Sao Tome and Principe": "Sao Tome and Principe", + "Saudi Arabia": "Saudi Arabia", + "Senegal": "Senegal", + "Serbia": "Serbia", + "Seychelles": "Seychelles", + "Sierra Leone": "Sierra Leone", + "Sint Maarten (Dutch part)": "Sint Maarten (Dutch part)", + "Slovakia": "Slovakia", + "Slovenia": "Slovenia", + "Solomon Islands": "Solomon Islands", + "Somalia": "Somalia", + "South Africa": "South Africa", + "South Sudan": "South Sudan", + "Soviet Union": "USSR", + "Spain": "Spain", + "Sri Lanka": "Sri Lanka", + "State of Palestine": "Palestine", + "Sudan": "Sudan", + "Suriname": "Suriname", + "Sweden": "Sweden", + "Switzerland": "Switzerland", + "Syrian Arab Republic": "Syria", + "Taiwan (Province of China)": "Taiwan", + "Tajikistan": "Tajikistan", + "Thailand": "Thailand", + "Timor-Leste": "East Timor", + "Togo": "Togo", + "Tokelau": "Tokelau", + "Tonga": "Tonga", + "Trinidad and Tobago": "Trinidad and Tobago", + "Tunisia": "Tunisia", + "Turkmenistan": "Turkmenistan", + "Turks and Caicos Islands": "Turks and Caicos Islands", + "Tuvalu": "Tuvalu", + "Uganda": "Uganda", + "Ukraine": "Ukraine", + "United Arab Emirates": "United Arab Emirates", + "United Kingdom of Great Britain and Northern Ireland": "United Kingdom", + "United Republic of Tanzania": "Tanzania", + "United States Virgin Islands": "United States Virgin Islands", + "United States of America": "United States", + "Uruguay": "Uruguay", + "Uzbekistan": "Uzbekistan", + "Vanuatu": "Vanuatu", + "Venezuela (Bolivarian Republic of)": "Venezuela", + "Viet Nam": "Vietnam", + "Wallis and Futuna Islands": "Wallis and Futuna", + "Yemen": "Yemen", + "Yemen Arab Republic": "Yemen Arab Republic", + "People's Democratic Republic of Yemen": "Yemen People's Republic", + "Yugoslavia": "Yugoslavia", + "Zambia": "Zambia", + "Zimbabwe": "Zimbabwe", + "Azores Islands": "Portugal", + "Canary Islands": "Spain", + "China, Hong Kong Special Administrative Region": "Hong Kong", + "China, Macao Special Administrative Region": "Macao", + "Germany Federal Republic": "West Germany", + "Netherlands (Kingdom of the)": "Netherlands", + "Serbia Montenegro": "Serbia and Montenegro", + "T\u00fcrkiye": "Turkey" +} diff --git a/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.meta.yml b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.meta.yml new file mode 100644 index 00000000000..aa8e964b15c --- /dev/null +++ b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.meta.yml @@ -0,0 +1,154 @@ +definitions: + common: + presentation: + topic_tags: + - Natural Disasters + description_key: + - "EM-DAT defines the following variables: + + - Affected: People requiring immediate assistance during a period of emergency, i.e. requiring basic survival needs such as food, water, shelter, sanitation and immediate medical assistance. + + - Injured: People suffering from physical injuries, trauma or an illness requiring immediate medical assistance as a direct result of a disaster. + + - Homeless: Number of people whose house is destroyed or heavily damaged and therefore need shelter after an event. + + - Total affected: In EM-DAT, it is the sum of the injured, affected and left homeless after a disaster. + + - Estimated economic damage: The amount of damage to property, crops, and livestock. In EM-DAT estimated damage are given in US$ ('000). For each disaster, the registered figure corresponds to the damage value at the moment of the event, i.e. the figures are shown true to the year of the event. + + - Total deaths: In EM-DAT, it is the sum of deaths and missing." + - "EM-DAT defines the following types of disasters: + + - Drought: An extended period of unusually low precipitation that produces a shortage of water for people, animals and plants. Drought is different from most other hazards in that it develops slowly, sometimes even over years, and its onset is generally difficult to detect. Drought is not solely a physical phenomenon because its impacts can be exacerbated by human activities and water supply demands. Drought is therefore often defined both conceptually and operationally. Operational definitions of drought, meaning the degree of precipitation reduction that constitutes a drought, vary by locality, climate and environmental sector. + + - Earthquake: Sudden movement of a block of the Earth's crust along a geological fault and associated ground shaking. + + - Extreme temperature: A general term for temperature variations above (extreme heat) or below (extreme cold) normal conditions. + + - Flood: A general term for the overflow of water from a stream channel onto normally dry land in the floodplain (riverine flooding), higher-than-normal levels along the coast and in lakes or reservoirs (coastal flooding) as well as ponding of water at or near the point where the rain fell (flash floods). + + - Fog: Water droplets that are suspended in the air near the Earth's surface. Fog is simply a cloud that is in contact with the ground. + + - Glacial lake outburst: A flood that occurs when water dammed by a glacier or moraine is suddenly released. Glacial lakes can be at the front of the glacier (marginal lake) or below the ice sheet (sub-glacial lake). + + - Landslide: Any kind of moderate to rapid soil movement incl. lahar, mudslide, debris flow. A landslide is the movement of soil or rock controlled by gravity and the speed of the movement usually ranges between slow and rapid, but not very slow. It can be superficial or deep, but the materials have to make up a mass that is a portion of the slope or the slope itself. The movement has to be downward and outward with a free face. + + - Mass movement: Any type of downslope movement of earth materials. + + - Extreme weather: Storm. + + - Volcanic activity: A type of volcanic event near an opening/vent in the Earth's surface including volcanic eruptions of lava, ash, hot vapour, gas, and pyroclastic material. + + - Wildfire: Any uncontrolled and non-prescribed combustion or burning of plants in a natural setting such as a forest, grassland, brush land or tundra, which consumes the natural fuels and spreads based on environmental conditions (e.g., wind, topography). Wildfires can be triggered by lightning or human actions." + +dataset: + title: Natural disasters + +tables: + natural_disasters_decadal: + variables: &variables-default + total_dead: + title: Total deaths + unit: 'people' + description_short: |- + Total number of deaths as a result of a natural disaster. In EM-DAT, it is the sum of deaths and missing. + injured: + title: Injured + unit: 'people' + description_short: |- + People suffering from physical injuries, trauma or an illness requiring immediate medical assistance as a direct result of a disaster. + affected: + title: Affected + unit: 'people' + description_short: |- + Number of people requiring immediate assistance during a period of emergency, i.e. requiring basic survival needs such as food, water, shelter, sanitation and immediate medical assistance. + homeless: + title: Homeless + unit: 'people' + description_short: |- + Number of people whose house is destroyed or heavily damaged and therefore need shelter after an event. + total_affected: + title: Total affected + unit: 'people' + description_short: |- + Total number of people affected by a natural disaster. In EM-DAT, it is the sum of the injured, affected and left homeless after a disaster. + reconstruction_costs: + title: Reconstruction costs + short_unit: '$' + unit: 'current US$' + description_short: |- + Reconstruction costs. + insured_damages: + title: Insured damages + short_unit: "$" + unit: "current US$" + description_short: |- + Insured losses are those which are covered by the insurance sector and paid directly to the owner of the damaged or destroyed property or crops and livestock or the primary insurance company (in case of reinsurance). + total_damages: + title: "Total economic damage from natural disasters" + short_unit: '$' + unit: 'current US$' + description_short: |- + The amount of damage to property, crops, and livestock. In EM-DAT estimated damage are given in US$. For each disaster, the registered figure corresponds to the damage value at the moment of the event, i.e. the figures are shown true to the year of the event. + n_events: + title: Number of reported natural disasters + unit: 'events' + description_short: |- + Number of reported natural disasters. + population: + title: Population + unit: 'people' + total_dead_per_100k_people: + title: Total number of deaths per 100,000 people + unit: 'deaths per 100k people' + description_processing: &description-processing-100k |- + Disaster-related impacts from EM-DAT have been normalized by Our World in Data to provide data in terms of occurrences per 100,000 people. + injured_per_100k_people: + title: Number of injured persons per 100,000 people + unit: 'injured per 100k people' + description_processing: *description-processing-100k + affected_per_100k_people: + title: Number of affected persons per 100,000 people + unit: 'affected per 100k people' + description_processing: *description-processing-100k + homeless_per_100k_people: + title: Number of homeless persons per 100,000 people + unit: 'homeless per 100k people' + description_processing: *description-processing-100k + total_affected_per_100k_people: + title: Total number of affected persons per 100,000 people + unit: 'affected per 100k people' + description_processing: *description-processing-100k + n_events_per_100k_people: + title: Number of events per 100,000 people + unit: 'events per 100k people' + description_processing: *description-processing-100k + gdp: + title: "GDP" + unit: "current US$" + short_unit: "$" + description_processing: &description-processing-gdp |- + Our World in Data has calculated economic damage metrics adjusted for gross domestic product (GDP). + reconstruction_costs_per_gdp: + title: "Reconstruction costs from natural disasters as a share of GDP" + unit: "%" + short_unit: "%" + description_short: |- + Reconstruction costs from natural disasters as a share of GDP. + description_processing: *description-processing-gdp + insured_damages_per_gdp: + title: "Insured damages from natural disasters as a share of GDP" + unit: "%" + short_unit: "%" + description_short: |- + Insured damages from natural disasters as a share of GDP. + description_processing: *description-processing-gdp + total_damages_per_gdp: + title: "Total economic damages from natural disasters as a share of GDP" + unit: "%" + short_unit: "%" + description_short: |- + Total economic damages from natural disasters as a share of GDP. + description_processing: *description-processing-gdp + natural_disasters_yearly: + variables: *variables-default diff --git a/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.py b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.py new file mode 100644 index 00000000000..18da703821b --- /dev/null +++ b/etl/steps/data/garden/emdat/2024-04-11/natural_disasters.py @@ -0,0 +1,625 @@ +"""Process and harmonize EM-DAT natural disasters dataset. + +NOTES: +1. We don't have population for some historical regions (e.g. East Germany, or North Yemen). +2. Some issues in the data were detected (see below, we may report them to EM-DAT). Some of them could not be fixed. + Namely, some disasters affect, in one year, a number of people that is larger than the entire population. + For example, the number of people affected by one drought event in Botswana 1981 is 1037300 while population + was 982753. I suppose this could be due to inaccuracies in the estimates of affected people or in the population + (which may not include people living temporarily in the country or visitors). +3. There are some potential issues that can't be fixed: + * On the one hand, we may be underestimating the real impacts of events. The reason is that the original data does + not include zeros. Therefore we can't know if the impacts of a certain event were zero, or unknown. Our only option + is to treat missing data as zeros. + * On the other hand, we may overestimate the real impacts on a given country-year, because events may affect the same + people multiple times during the same year. This can't be fixed, but I suppose it's not common. + * Additionally, it is understandable that some values are rough estimates, that some events are not recorded, and + that there may be duplicated events. + +""" + +import datetime + +import numpy as np +import owid.catalog.processing as pr +import pandas as pd +from owid.catalog import Dataset, Table, utils +from owid.datautils.dataframes import map_series +from shared import ( + HISTORIC_TO_CURRENT_REGION, + REGIONS, + add_region_aggregates, + correct_data_points, + get_last_day_of_month, +) + +from etl.data_helpers import geo +from etl.helpers import PathFinder, create_dataset + +# Get paths and naming conventions for current step. +paths = PathFinder(__file__) + +# List of expected disaster types in the raw data to consider, and how to rename them. +# We consider only natural disasters of subgroups Geophysical, Meteorological, Hydrological and Climatological. +# We therefore ignore Extra-terrestrial (of which there is just one meteorite impact event) and Biological subgroups. +# For completeness, add all existing types here, and rename them as np.nan if they should not be used. +# If new types are included on a data update, simply add them here. +EXPECTED_DISASTER_TYPES = { + "Animal incident": np.nan, + "Drought": "Drought", + "Earthquake": "Earthquake", + "Epidemic": np.nan, + "Extreme temperature": "Extreme temperature", + "Flood": "Flood", + "Fog": "Fog", + "Glacial lake outburst flood": "Glacial lake outburst flood", + "Impact": np.nan, + "Infestation": np.nan, + # "Landslide (dry)": "Landslide", + "Mass movement (dry)": "Dry mass movement", + "Mass movement (wet)": "Wet mass movement", + "Storm": "Extreme weather", + "Volcanic activity": "Volcanic activity", + "Wildfire": "Wildfire", +} + +# List of columns to select from raw data, and how to rename them. +COLUMNS = { + "country": "country", + "type": "type", + "total_dead": "total_dead", + "injured": "injured", + "affected": "affected", + "homeless": "homeless", + "total_affected": "total_affected", + "reconstruction_costs": "reconstruction_costs", + "insured_damages": "insured_damages", + "total_damages": "total_damages", + "start_year": "start_year", + "start_month": "start_month", + "start_day": "start_day", + "end_year": "end_year", + "end_month": "end_month", + "end_day": "end_day", +} + +# Columns of values related to natural disaster impacts. +IMPACT_COLUMNS = [ + "total_dead", + "injured", + "affected", + "homeless", + "total_affected", + "reconstruction_costs", + "insured_damages", + "total_damages", +] + +# Variables related to costs, measured in thousand current US$ (not adjusted for inflation or PPP). +COST_VARIABLES = ["reconstruction_costs", "insured_damages", "total_damages"] + +# Variables to calculate per 100,000 people. +VARIABLES_PER_100K_PEOPLE = [column for column in IMPACT_COLUMNS if column not in COST_VARIABLES] + ["n_events"] + +# New natural disaster type corresponding to the sum of all disasters. +ALL_DISASTERS_TYPE = "all_disasters" + +# List issues found in the data: +# Each element is a tuple with a dictionary that fully identifies the wrong row, +# and another dictionary that specifies the changes. +# Note: Countries here should appear as in the raw data (i.e. not harmonized). +DATA_CORRECTIONS = [] + + +def prepare_input_data(tb: Table) -> Table: + """Prepare input data, and fix some known issues.""" + # Select and rename columns. + tb = tb[list(COLUMNS)].rename(columns=COLUMNS, errors="raise") + + # Add a year column (assume the start of the event). + tb["year"] = tb["start_year"].copy() + + # Correct wrong data points (defined above in DATA_CORRECTIONS). + tb = correct_data_points(tb=tb, corrections=DATA_CORRECTIONS) + + # Remove spurious spaces in entities. + tb["type"] = tb["type"].str.strip() + + # Sanity check + error = "List of expected disaster types has changed. Consider updating EXPECTED_DISASTER_TYPES." + assert set(tb["type"]) == set(EXPECTED_DISASTER_TYPES), error + + # Rename disaster types conveniently. + tb["type"] = map_series( + series=tb["type"], mapping=EXPECTED_DISASTER_TYPES, warn_on_missing_mappings=True, warn_on_unused_mappings=True + ) + + # Drop rows for disaster types that are not relevant. + tb = tb.dropna(subset="type").reset_index(drop=True) + + return tb + + +def sanity_checks_on_inputs(tb: Table) -> None: + """Run sanity checks on input data.""" + error = "All values should be positive." + assert (tb.select_dtypes("number").fillna(0) >= 0).all().all(), error + + error = "Column 'total_affected' should be the sum of columns 'injured', 'affected', and 'homeless'." + assert ( + tb["total_affected"].fillna(0) == tb[["injured", "affected", "homeless"]].sum(axis=1).fillna(0) + ).all(), error + + error = "Natural disasters are not expected to last more than 9 years." + assert (tb["end_year"] - tb["start_year"]).max() < 10, error + + error = "Some of the columns that can't have nan do have one or more nans." + assert tb[["country", "year", "type", "start_year", "end_year"]].notnull().all().all(), error + + for column in ["year", "start_year", "end_year"]: + error = f"Column '{column}' has a year prior to 1900 or posterior to current year." + assert 1900 < tb[column].max() <= datetime.datetime.now().year, error + + error = "Some rows have end_day specified, but not end_month." + assert tb[(tb["end_month"].isnull()) & (tb["end_day"].notnull())].empty, error + + +def fix_faulty_dtypes(tb: Table) -> Table: + """Fix an issue related to column dtypes. + + Dividing a UInt32 by float64 results in a faulty Float64 that does not handle nans properly (which may be a bug: + https://github.com/pandas-dev/pandas/issues/49818). + To avoid this, there are various options: + 1. Convert all UInt32 into standard int before dividing by a float. But, if there are nans, int dtype is not valid. + 2. Convert all floats into Float64 before dividing. + 3. Convert all Float64 into float, after dividing. + + We adopt option 3. + + """ + tb = tb.astype({column: float for column in tb[tb.columns[tb.dtypes == "Float64"]]}) + + return tb + + +def calculate_start_and_end_dates(tb: Table) -> Table: + """Calculate start and end dates of disasters. + + The original data had year, month and day of start and end, and some of those fields were missing. This function + deals with those missing fields and creates datetime columns for start and end of events. + + """ + tb = tb.copy() + + # When start month is not given, assume the beginning of the year. + tb["start_month"] = tb["start_month"].fillna(1) + # When start day is not given, assume the beginning of the month. + tb["start_day"] = tb["start_day"].fillna(1) + + # When end month is not given, assume the end of the year. + tb["end_month"] = tb["end_month"].fillna(12) + + # When end day is not given, assume the last day of the month. + last_day_of_month = pd.Series( + [get_last_day_of_month(year=row["end_year"], month=row["end_month"]) for i, row in tb.iterrows()] + ) + tb["end_day"] = tb["end_day"].fillna(last_day_of_month) + + # Create columns for start and end dates. + tb["start_date"] = ( + tb["start_year"].astype(str) + + "-" + + tb["start_month"].astype(str).str.zfill(2) + + "-" + + tb["start_day"].astype(str).str.zfill(2) + ) + tb["end_date"] = ( + tb["end_year"].astype(str) + + "-" + + tb["end_month"].astype(str).str.zfill(2) + + "-" + + tb["end_day"].astype(str).str.zfill(2) + ) + + # Convert dates into datetime objects. + # Note: This may fail if one of the dates is wrong, e.g. September 31 (if so, check error message for row index). + tb["start_date"] = pd.to_datetime(tb["start_date"]) + tb["end_date"] = pd.to_datetime(tb["end_date"]) + + error = "Events can't have an end_date prior to start_date." + assert (tb["end_date"] >= tb["start_date"]).all(), error + + # Drop unnecessary columns. + tb = tb.drop(columns=["start_year", "start_month", "start_day", "end_year", "end_month", "end_day"]) + + return tb + + +def calculate_yearly_impacts(tb: Table) -> Table: + """Equally distribute the impact of disasters lasting longer than one year among the individual years, as separate + events. + + Many disasters last more than one year. Therefore, we need to spread their impact among the different years. + Otherwise, if we assign the impact of a disaster to, say, the first year, we may overestimate the impacts on a + particular country-year. + Hence, for events that started and ended in different years, we distribute their impact equally across the + time spanned by the disaster. + + """ + tb = tb.copy() + + # There are many rows that have no data on impacts of disasters. + # I suppose those are known disasters for which we don't know the impact. + # Given that we want to count overall impact, fill them with zeros (to count them as disasters that had no victims). + tb[IMPACT_COLUMNS] = tb[IMPACT_COLUMNS].fillna(0) + + # Select rows of disasters that last more than one year. + multi_year_rows_mask = tb["start_date"].dt.year != tb["end_date"].dt.year + multi_year_rows = tb[multi_year_rows_mask].reset_index(drop=True) + + # Go row by row, and create a new disaster event with the impact normalized by the fraction of days it happened + # in a specific year. + added_events = Table().copy_metadata(tb) + for _, row in multi_year_rows.iterrows(): + # Start table for new event. + new_event = Table(row).transpose().reset_index(drop=True).copy_metadata(tb) + # Years spanned by the disaster. + years = np.arange(row["start_date"].year, row["end_date"].year + 1).tolist() + # Calculate the total number of days spanned by the disaster (and add 1 day to include the day of the end date). + days_total = (row["end_date"] + pd.DateOffset(1) - row["start_date"]).days + + for year in years: + if year == years[0]: + # Get number of days. + days_affected_in_year = (pd.Timestamp(year=year + 1, month=1, day=1) - row["start_date"]).days + # Fraction of days affected this year. + days_fraction = days_affected_in_year / days_total + # Impacts this years. + impacts = pd.DataFrame(row[IMPACT_COLUMNS] * days_fraction).transpose().astype(int) + # Ensure "total_affected" is the sum of "injured", "affected" and "homeless". + # Note that the previous line may have introduced rounding errors. + impacts["total_affected"] = impacts["injured"] + impacts["affected"] + impacts["homeless"] + # Start a series that counts the impacts accumulated over the years. + cumulative_impacts = impacts + # Normalize data by the number of days affected in this year. + new_event.loc[:, IMPACT_COLUMNS] = impacts.values + # Correct year and dates. + new_event["year"] = year + new_event["end_date"] = pd.Timestamp(year=year, month=12, day=31) + elif years[0] < year < years[-1]: + # The entire year was affected by the disaster. + # Note: Ignore leap years. + days_fraction = 365 / days_total + # Impacts this year. + impacts = pd.DataFrame(row[IMPACT_COLUMNS] * days_fraction).transpose().astype(int) + # Ensure "total_affected" is the sum of "injured", "affected" and "homeless". + # Note that the previous line may have introduced rounding errors. + impacts["total_affected"] = impacts["injured"] + impacts["affected"] + impacts["homeless"] + # Add impacts to the cumulative impacts series. + cumulative_impacts += impacts # type: ignore + # Normalize data by the number of days affected in this year. + new_event.loc[:, IMPACT_COLUMNS] = impacts.values + # Correct year and dates. + new_event["year"] = year + new_event["start_date"] = pd.Timestamp(year=year, month=1, day=1) + new_event["end_date"] = pd.Timestamp(year=year, month=12, day=31) + else: + # Assign all remaining impacts to the last year. + impacts = (pd.Series(row[IMPACT_COLUMNS]) - cumulative_impacts).astype(int) # type: ignore + new_event.loc[:, IMPACT_COLUMNS] = impacts.values + # Correct year and dates. + new_event["year"] = year + new_event["start_date"] = pd.Timestamp(year=year, month=1, day=1) + new_event["end_date"] = row["end_date"] + added_events = pr.concat([added_events, new_event], ignore_index=True).copy() + + # Remove multi-year rows from main dataframe, and add those rows after separating events year by year. + tb_yearly = pr.concat([tb[~(multi_year_rows_mask)], added_events], ignore_index=True) # type: ignore + + # Sort conveniently. + tb_yearly = tb_yearly.sort_values(["country", "year", "type"]).reset_index(drop=True) + + return tb_yearly + + +def get_total_count_of_yearly_impacts(tb: Table) -> Table: + """Get the total count of impacts in the year, ignoring the individual events. + + We are not interested in each individual event, but the number of events of each kind and their impacts. + This function will produce the total count of impacts per country, year and type of disaster. + + """ + # Get the total count of impacts per country, year and type of disaster. + counts = ( + tb.reset_index() + .groupby(["country", "year", "type"], observed=True) + .agg({"index": "count"}) + .reset_index() + .rename(columns={"index": "n_events"}) + ) + # Copy metadata from any other column into the new column of counts of events. + counts["n_events"] = counts["n_events"].copy_metadata(tb["total_dead"]) + # Ensure columns have the right type. + tb = tb.astype( + {column: int for column in tb.columns if column not in ["country", "year", "type", "start_date", "end_date"]} + ) + # Get the sum of impacts per country, year and type of disaster. + tb = tb.groupby(["country", "year", "type"], observed=True).sum(numeric_only=True, min_count=1).reset_index() + # Add the column of the number of events. + tb = tb.merge(counts, on=["country", "year", "type"], how="left") + + return tb + + +def create_a_new_type_for_all_disasters_combined(tb: Table) -> Table: + """Add a new disaster type that has the impact of all other disasters combined.""" + all_disasters = ( + tb.groupby(["country", "year"], observed=True) + .sum(numeric_only=True, min_count=1) + .assign(**{"type": ALL_DISASTERS_TYPE}) + .reset_index() + ) + tb = ( + pr.concat([tb, all_disasters], ignore_index=True) + .sort_values(["country", "year", "type"]) + .reset_index(drop=True) + ) + + return tb + + +def create_additional_variables(tb: Table, ds_population: Dataset, tb_gdp: Table) -> Table: + """Create additional variables, namely damages per GDP, and impacts per 100,000 people.""" + # Add population to table. + tb = geo.add_population_to_table(tb=tb, ds_population=ds_population) + + # Combine natural disasters with GDP data. + tb = tb.merge(tb_gdp.rename(columns={"ny_gdp_mktp_cd": "gdp"}), on=["country", "year"], how="left") + # Prepare cost variables. + for variable in COST_VARIABLES: + # Convert costs (given in '000 US$, aka thousand current US$) into current US$. + tb[variable] *= 1000 + # Create variables of costs (in current US$) as a share of GDP (in current US$). + tb[f"{variable}_per_gdp"] = tb[variable] / tb["gdp"] * 100 + + # Add rates per 100,000 people. + for column in VARIABLES_PER_100K_PEOPLE: + tb[f"{column}_per_100k_people"] = tb[column] * 1e5 / tb["population"] + + # Fix issue with faulty dtypes (see more details in the function's documentation). + tb = fix_faulty_dtypes(tb=tb) + + return tb + + +def create_decade_data(tb: Table) -> Table: + """Create data of average impacts over periods of 10 years. + + For example (as explained in the footer of the natural disasters explorer), the value for 1900 of any column should + represent the average of that column between 1900 and 1909. + + """ + tb_decadal = tb.copy() + + # Ensure each country has data for all years (and fill empty rows with zeros). + # Otherwise, the average would only be performed only across years for which we have data. + # For example, if we have data only for 1931 (and no other year in the 1930s) we want that data point to be averaged + # over all years in the decade (assuming they are all zero). + # Note that, for the current decade, since it's not complete, we want to average over the number of current years + # (not the entire decade). + + # List all countries, years and types in the data. + countries = sorted(set(tb_decadal["country"])) + years = np.arange(tb_decadal["year"].min(), tb_decadal["year"].max() + 1).tolist() + types = sorted(set(tb_decadal["type"])) + + # Create a new index covering all combinations of countries, years and types. + new_indexes = pd.MultiIndex.from_product([countries, years, types], names=["country", "year", "type"]) + + # Reindex data so that all countries and types have data for each year (filling with zeros when there's no data). + tb_decadal = tb_decadal.set_index(["country", "year", "type"]).reindex(new_indexes, fill_value=0).reset_index() + + # For each year, calculate the corresponding decade (e.g. 1951 -> 1950, 1929 -> 1920). + tb_decadal["decade"] = (tb_decadal["year"] // 10) * 10 + + # Group by that country-decade-type and get the mean for each column. + tb_decadal = ( + tb_decadal.drop(columns=["year"]) + .groupby(["country", "decade", "type"], observed=True) + .mean(numeric_only=True) + .reset_index() + .rename(columns={"decade": "year"}) + ) + + return tb_decadal + + +def sanity_checks_on_outputs(tb: Table, is_decade: bool) -> None: + """Run sanity checks on output (yearly or decadal) data. + + Parameters + ---------- + tb : Table + Output (yearly or decadal) data. + is_decade : bool + True if tb is decadal data; False if it is yearly data. + + """ + # Common sanity checks for yearly and decadal data. + error = "All values should be positive." + assert (tb.select_dtypes("number").fillna(0) >= 0).all().all(), error + + error = ( + "List of expected disaster types has changed. " + "Consider updating EXPECTED_DISASTER_TYPES (or renaming ALL_DISASTERS_TYPE)." + ) + expected_disaster_types = [ALL_DISASTERS_TYPE] + [ + utils.underscore(EXPECTED_DISASTER_TYPES[disaster]) + for disaster in EXPECTED_DISASTER_TYPES + if not pd.isna(EXPECTED_DISASTER_TYPES[disaster]) + ] + assert set(tb["type"]) == set(expected_disaster_types), error + + columns_that_should_not_have_nans = [ + "country", + "year", + "type", + "total_dead", + "injured", + "affected", + "homeless", + "total_affected", + "reconstruction_costs", + "insured_damages", + "total_damages", + "n_events", + ] + error = "There are unexpected nans in data." + assert tb[columns_that_should_not_have_nans].notnull().all(axis=1).all(), error + + # Sanity checks only for yearly data. + if not is_decade: + all_countries = sorted(set(tb["country"]) - set(REGIONS) - set(HISTORIC_TO_CURRENT_REGION)) + + # Check that the aggregate of all countries and disasters leads to the same numbers we have for the world. + # This check would not pass when adding historical regions (since we know there are some overlaps between data + # from historical and successor countries). So check for a specific year. + year_to_check = 2022 + all_disasters_for_world = tb[ + (tb["country"] == "World") & (tb["year"] == year_to_check) & (tb["type"] == ALL_DISASTERS_TYPE) + ].reset_index(drop=True) + all_disasters_check = ( + tb[(tb["country"].isin(all_countries)) & (tb["year"] == year_to_check) & (tb["type"] != ALL_DISASTERS_TYPE)] + .groupby("year") + .sum(numeric_only=True) + .reset_index() + ) + + cols_to_check = [ + "total_dead", + "injured", + "affected", + "homeless", + "total_affected", + "reconstruction_costs", + "insured_damages", + "total_damages", + ] + error = f"Aggregate for the World in {year_to_check} does not coincide with the sum of all countries." + assert all_disasters_for_world[cols_to_check].equals(all_disasters_check[cols_to_check]), error + + error = "Column 'total_affected' should be the sum of columns 'injured', 'affected', and 'homeless'." + assert ( + tb["total_affected"].fillna(0) >= tb[["injured", "affected", "homeless"]].sum(axis=1).fillna(0) + ).all(), error + + # Another sanity check would be that certain disasters (e.g. an earthquake) cannot last for longer than 1 day. + # However, for some disasters we don't have exact day, or even exact month, just the year. + + # List of columns whose value should not be larger than population. + columns_to_inspect = [ + "total_dead", + "total_dead_per_100k_people", + ] + error = "One disaster should not be able to cause the death of the entire population of a country in one year." + for column in columns_to_inspect: + informed_rows = tb[column].notnull() & tb["population"].notnull() + assert (tb[informed_rows][column] <= tb[informed_rows]["population"]).all(), error + + +def run(dest_dir: str) -> None: + # + # Load inputs. + # + # Load natural disasters dataset from meadow and read its main table. + ds_meadow = paths.load_dataset("natural_disasters") + tb_meadow = ds_meadow["natural_disasters"].reset_index() + + # Load WDI dataset, read its main table and select variable corresponding to GDP (in current US$). + ds_wdi = paths.load_dataset("wdi") + tb_gdp = ds_wdi["wdi"][["ny_gdp_mktp_cd"]].reset_index() + + #################################################################################################################### + # TODO: Remote this temporary solution once WDI has origins. + from etl.data_helpers.misc import add_origins_to_wdi + + tb_gdp = add_origins_to_wdi(tb_wdi=tb_gdp) + #################################################################################################################### + + # Load regions dataset. + ds_regions = paths.load_dataset("regions") + + # Load income groups dataset. + ds_income_groups = paths.load_dataset("income_groups") + + # Load population dataset. + ds_population = paths.load_dataset("population") + + # + # Process data. + # + # Prepare input data (and fix some known issues). + tb = prepare_input_data(tb=tb_meadow) + + # Sanity checks. + sanity_checks_on_inputs(tb=tb) + + # Harmonize country names. + tb = geo.harmonize_countries( + df=tb, countries_file=paths.country_mapping_path, warn_on_missing_countries=True, warn_on_unused_countries=True + ) + + # Calculate start and end dates of disasters. + tb = calculate_start_and_end_dates(tb=tb) + + # Distribute the impacts of disasters lasting longer than a year among separate yearly events. + tb = calculate_yearly_impacts(tb=tb) + + # Get total count of impacts per year (regardless of the specific individual events during the year). + tb = get_total_count_of_yearly_impacts(tb=tb) + + # Add a new category (or "type") corresponding to the total of all natural disasters. + tb = create_a_new_type_for_all_disasters_combined(tb=tb) + + # Add region aggregates. + tb = add_region_aggregates( + data=tb, + index_columns=["country", "year", "type"], + regions_to_add=REGIONS, + ds_regions=ds_regions, + ds_income_groups=ds_income_groups, + ) + + # Add damages per GDP, and rates per 100,000 people. + tb = create_additional_variables(tb=tb, ds_population=ds_population, tb_gdp=tb_gdp) + + # Change disaster types to snake, lower case. + tb["type"] = tb["type"].replace({value: utils.underscore(value) for value in tb["type"].unique()}) + + # Create data aggregated (using a simple mean) in intervals of 10 years. + tb_decadal = create_decade_data(tb=tb) + + # Run sanity checks on output yearly data. + sanity_checks_on_outputs(tb=tb, is_decade=False) + + # Run sanity checks on output decadal data. + sanity_checks_on_outputs(tb=tb_decadal, is_decade=True) + + # Set an appropriate index to yearly data and sort conveniently. + tb = tb.format(keys=["country", "year", "type"], sort_columns=True) + + # Set an appropriate index to decadal data and sort conveniently. + tb_decadal = tb_decadal.format(keys=["country", "year", "type"], sort_columns=True) + + # Rename yearly and decadal tables. + tb.metadata.short_name = "natural_disasters_yearly" + tb_decadal.metadata.short_name = "natural_disasters_decadal" + + # + # Save outputs. + # + # Create new Garden dataset. + ds_garden = create_dataset( + dest_dir, tables=[tb, tb_decadal], default_metadata=ds_meadow.metadata, check_variables_metadata=True + ) + ds_garden.save() diff --git a/etl/steps/data/garden/emdat/2024-04-11/shared.py b/etl/steps/data/garden/emdat/2024-04-11/shared.py new file mode 100644 index 00000000000..ca0fa66efcc --- /dev/null +++ b/etl/steps/data/garden/emdat/2024-04-11/shared.py @@ -0,0 +1,378 @@ +import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +import owid.catalog.processing as pr +import pandas as pd +from owid.catalog import Dataset, Table, Variable +from structlog import get_logger + +from etl.data_helpers import geo + +CURRENT_DIR = Path(__file__).parent + +log = get_logger() + +# Aggregate regions to add, following OWID definitions. +# Regions and income groups to create by aggregating contributions from member countries. +# In the following dictionary, if nothing is stated, the region is supposed to be a default continent/income group. +# Otherwise, the dictionary can have "regions_included", "regions_excluded", "countries_included", and +# "countries_excluded". The aggregates will be calculated on the resulting countries. +REGIONS = { + # Default continents. + "Africa": {}, + "Asia": {}, + "Europe": {}, + "European Union (27)": {}, + "North America": {}, + "Oceania": {}, + "South America": {}, + "World": {}, + # Income groups. + "Low-income countries": {}, + "Upper-middle-income countries": {}, + "Lower-middle-income countries": {}, + "High-income countries": {}, +} + +# When creating region aggregates, decide how to distribute historical regions. +# The following decisions are based on the current location of the countries that succeeded the region, and their income +# group. Continent and income group assigned corresponds to the continent and income group of the majority of the +# population in the member countries. +HISTORIC_TO_CURRENT_REGION: Dict[str, Dict[str, Union[str, List[str]]]] = { + "Czechoslovakia": { + "continent": "Europe", + "income_group": "High-income countries", + "regions_included": [ + # Europe - High-income countries. + "Czechia", + "Slovakia", + ], + }, + "East Germany": { + "continent": "Europe", + "income_group": "", + "regions_included": [ + # Europe - High-income countries. + "Germany", + ], + }, + "West Germany": { + "continent": "Europe", + "income_group": "", + "regions_included": [ + # Europe - High-income countries. + "Germany", + ], + }, + "Netherlands Antilles": { + "continent": "North America", + "income_group": "High-income countries", + "regions_included": [ + # North America - High-income countries. + "Aruba", + "Curacao", + "Sint Maarten (Dutch part)", + "Bonaire Sint Eustatius and Saba", + ], + }, + "Serbia and Montenegro": { + "continent": "Europe", + "income_group": "Upper-middle-income countries", + "regions_included": [ + # Europe - Upper-middle-income countries. + "Serbia", + "Montenegro", + ], + }, + "North Yemen": { + "continent": "Asia", + "income_group": "Low-income countries", + "regions_included": [ + # Asia - Low-income countries. + "Yemen", + ], + }, + "South Yemen": { + "continent": "Asia", + "income_group": "Low-income countries", + "regions_included": [ + # Asia - Low-income countries. + "Yemen", + ], + }, + "USSR": { + "continent": "Europe", + "income_group": "Upper-middle-income countries", + "regions_included": [ + # Europe - High-income countries. + "Lithuania", + "Estonia", + "Latvia", + # Europe - Upper-middle-income countries. + "Moldova", + "Belarus", + "Russia", + # Europe - Lower-middle-income countries. + "Ukraine", + # Asia - Upper-middle-income countries. + "Georgia", + "Armenia", + "Azerbaijan", + "Turkmenistan", + "Kazakhstan", + # Asia - Lower-middle-income countries. + "Kyrgyzstan", + "Uzbekistan", + "Tajikistan", + ], + }, + "Yugoslavia": { + "continent": "Europe", + "income_group": "Upper-middle-income countries", + "regions_included": [ + # Europe - High-income countries. + "Croatia", + "Slovenia", + # Europe - Upper-middle-income countries. + "North Macedonia", + "Bosnia and Herzegovina", + "Serbia", + "Montenegro", + ], + }, +} + +# Historical countries whose population can be built by adding up the population of their successor countries. +# Those historical countries not listed here will have no population data. +BUILD_POPULATION_FOR_HISTORICAL_COUNTRIES = [ + # The following regions split into smaller ones, and can be estimated by the population of the successors. + "Czechoslovakia", + "Netherlands Antilles", + "Serbia and Montenegro", + "USSR", + "Yugoslavia", + # The following countries cannot be replaced by the successor countries. + # 'East Germany', + # 'West Germany', + # 'North Yemen', + # 'South Yemen', +] + +# Historical countries for which we don't have population, and can't be built from successor countries. +EXPECTED_COUNTRIES_WITHOUT_POPULATION = list( + set(HISTORIC_TO_CURRENT_REGION) - set(BUILD_POPULATION_FOR_HISTORICAL_COUNTRIES) +) + +# Overlaps found between historical regions and successor countries, that we accept in the data. +# We accept them either because they happened close to the transition, or to avoid needing to introduce new +# countries for which we do not have data (like the Russian Empire). +ACCEPTED_OVERLAPS = { + 1911: {"Kazakhstan", "USSR"}, + 1991: {"Georgia", "USSR"}, +} + + +def detect_overlapping_regions( + tb, index_columns, region_and_members, country_col="country", year_col="year", ignore_zeros=True +): + """Detect years on which the data for two regions overlap, e.g. a historical region and one of its successors. + + Parameters + ---------- + tb : _type_ + Data (with a dummy index). + index_columns : _type_ + Names of index columns. + region_and_members : _type_ + Regions to check for overlaps. Each region must have a dictionary "regions_included", listing the subregions + contained. If the region is historical, "regions_included" would be the list of successor countries. + country_col : str, optional + Name of country column (usually "country"). + year_col : str, optional + Name of year column (usually "year"). + ignore_zeros : bool, optional + True to ignore overlaps of zeros. + + Returns + ------- + all_overlaps : dict + All overlaps found. + + """ + # Sum over all columns to get the total sum of each column for each country-year. + tb_total = ( + tb.groupby([country_col, year_col]) + .agg({column: "sum" for column in tb.columns if column not in index_columns}) + .reset_index() + ) + # Create a list of values that will be ignored in overlaps (usually zero or nothing). + if ignore_zeros: + overlapping_values_to_ignore = [0] + else: + overlapping_values_to_ignore = [] + # List all variables in data (ignoring index columns). + variables = [column for column in tb.columns if column not in index_columns] + # List all country names found in data. + countries_in_data = tb[country_col].unique().tolist() + # List all regions found in data. + regions = [country for country in list(region_and_members) if country in countries_in_data] + # Initialize a dictionary that will store all overlaps found. + all_overlaps = {} + for region in regions: + # List members of current region. + members = [member for member in region_and_members[region]["regions_included"] if member in countries_in_data] + for member in members: + # Select data for current region. + region_values = ( + tb_total[tb_total[country_col] == region] + .replace(overlapping_values_to_ignore, np.nan) + .dropna(subset=variables, how="all") + ) + # Select data for current member. + member_values = ( + tb_total[tb_total[country_col] == member] + .replace(overlapping_values_to_ignore, np.nan) + .dropna(subset=variables, how="all") + ) + # Concatenate both selections of data, and select duplicated rows. + combined = pd.concat([region_values, member_values]) + overlaps = combined[combined.duplicated(subset=[year_col], keep=False)] # type: ignore + if len(overlaps) > 0: + # Add the overlap found to the dictionary of all overlaps. + all_overlaps.update({year: set(overlaps[country_col]) for year in overlaps[year_col].unique()}) + + # Sort overlaps conveniently. + all_overlaps = {year: all_overlaps[year] for year in sorted(list(all_overlaps))} + + return all_overlaps + + +def add_region_aggregates( + data: Table, + regions_to_add: Dict[Any, Any], + index_columns: List[str], + ds_regions: Dataset, + ds_income_groups: Dataset, + country_column: str = "country", + aggregates: Optional[Dict[str, str]] = None, +) -> Table: + """Add region aggregates for all regions (which may include continents and income groups). + + Parameters + ---------- + data : Table + Data. + regions_to_add: list + Regions to add. + index_columns : list + Name of index columns. + ds_regions : Dataset + Regions dataset. + ds_income_groups : Dataset + Income groups dataset. + country_column : str + Name of country column. + year_column : str + Name of year column. + aggregates : dict or None + Dictionary of type of aggregation to use for each variable. If None, variables will be aggregated by summing. + + Returns + ------- + data : Table + Data after adding aggregate regions. + + """ + data = data.copy() + + all_overlaps = detect_overlapping_regions( + tb=data, region_and_members=HISTORIC_TO_CURRENT_REGION, index_columns=index_columns + ) + + # Check whether all accepted overlaps are found in the data, and that there are no new unknown overlaps. + error = "Either the list of accepted overlaps is not found in the data, or there are new unknown overlaps." + assert ACCEPTED_OVERLAPS == all_overlaps, error + + if aggregates is None: + # If aggregations are not specified, assume all variables are to be aggregated, by summing. + aggregates = {column: "sum" for column in data.columns if column not in index_columns} + + for region in regions_to_add: + # List of countries in region. + countries_in_region = geo.list_members_of_region( + region=region, ds_regions=ds_regions, ds_income_groups=ds_income_groups + ) + # Select rows of data for member countries. + data_region = data[data[country_column].isin(countries_in_region)] + + # Add region aggregates. + region_df = ( + data_region.groupby([column for column in index_columns if column != country_column]) + .agg(aggregates) + .reset_index() + .assign(**{country_column: region}) + ) + data = pr.concat([data, region_df], ignore_index=True) + + return data + + +def get_last_day_of_month(year: int, month: int): + """Get the number of days in a specific month of a specific year. + + Parameters + ---------- + year : int + Year. + month : int + Month. + + Returns + ------- + last_day + Number of days in month. + + """ + if month == 12: + last_day = 31 + else: + last_day = (datetime.datetime.strptime(f"{year:04}-{month + 1:02}", "%Y-%m") + datetime.timedelta(days=-1)).day + + return last_day + + +def correct_data_points(tb: Table, corrections: List[Tuple[Dict[Any, Any], Dict[Any, Any]]]) -> Table: + """Make individual corrections to data points in a table. + + Parameters + ---------- + tb : Table + Data to be corrected. + corrections : List[Tuple[Dict[Any, Any], Dict[Any, Any]]] + Corrections. + + Returns + ------- + tb_corrected : Table + Corrected data. + + """ + tb_corrected = tb.copy() + + for correction in corrections: + wrong_row, corrected_row = correction + + # Select the row in the table where the wrong data point is. + # The 'fillna(False)' is added because otherwise rows that do not fulfil the selection will create ambiguity. + selection = tb_corrected.loc[(tb_corrected[list(wrong_row)] == Variable(wrong_row)).fillna(False).all(axis=1)] + # Sanity check. + error = "Either raw data has been corrected, or dictionary selecting wrong row is ambiguous." + assert len(selection) == 1, error + + # Replace wrong fields by the corrected ones. + # Note: Changes to categorical fields will not work. + tb_corrected.loc[selection.index, list(corrected_row)] = list(corrected_row.values()) + + return tb_corrected diff --git a/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters.py b/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters.py new file mode 100644 index 00000000000..d8a9bc36964 --- /dev/null +++ b/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters.py @@ -0,0 +1,64 @@ +from owid.catalog import Table + +from etl.helpers import PathFinder, create_dataset + +paths = PathFinder(__file__) + + +def create_wide_tables(table: Table, is_decade: bool) -> Table: + # Create wide tables. + table_wide = table.reset_index().pivot(index=["country", "year"], columns="type", join_column_levels_with="-") + + if is_decade: + variable_name_suffix = "_decadal" + variable_title_suffix = " (decadal)" + else: + variable_name_suffix = "_yearly" + variable_title_suffix = "" + + # Improve variable names and titles. + for column in table_wide.drop(columns=["country", "year"], errors="raise").columns: + table_wide[column].metadata.title += ( + " - " + column.split("-")[-1].capitalize().replace("_", " ") + variable_title_suffix + ) + table_wide = table_wide.rename( + columns={column: column.replace("-", "_") + variable_name_suffix}, errors="raise" + ) + + # Set an appropriate index and sort conveniently. + table_wide = table_wide.format() + + return table_wide + + +def run(dest_dir: str) -> None: + # + # Load inputs. + # + # Load garden tables and remove unnecessary columns. + ds_garden = paths.load_dataset("natural_disasters") + tb_yearly = ds_garden["natural_disasters_yearly"] + tb_decadal = ds_garden["natural_disasters_decadal"] + + # + # Process data. + # + # Remove unnecessary columns. + tb_yearly = tb_yearly.drop(columns=["population", "gdp"], errors="raise") + tb_decadal = tb_decadal.drop(columns=["population", "gdp"], errors="raise") + + # Create wide tables. + tb_yearly_wide = create_wide_tables(table=tb_yearly, is_decade=False) + tb_decadal_wide = create_wide_tables(table=tb_decadal, is_decade=True) + + # + # Save outputs. + # + # Create new grapher dataset, add tables, and save dataset. + ds_grapher = create_dataset( + dest_dir, + tables=[tb_yearly_wide, tb_decadal_wide], + default_metadata=ds_garden.metadata, + check_variables_metadata=True, + ) + ds_grapher.save() diff --git a/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters_global_by_type.py b/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters_global_by_type.py new file mode 100644 index 00000000000..a216fcfee1e --- /dev/null +++ b/etl/steps/data/grapher/emdat/2024-04-11/natural_disasters_global_by_type.py @@ -0,0 +1,41 @@ +from etl.helpers import PathFinder, create_dataset + +paths = PathFinder(__file__) + +GRAPHER_DATASET_TITLE = "Global natural disasters by type" +GRAPHER_DATASET_SHORT_NAME = "natural_disasters_global_by_type" + + +def run(dest_dir: str) -> None: + # + # Load inputs. + # + # Load garden dataset and read table on yearly data. + ds_garden = paths.load_dataset("natural_disasters") + tb = ds_garden["natural_disasters_yearly"].reset_index() + + # + # Process data. + # + # Select data for the World and remove unnecessary columns. + tb_global = ( + tb[tb["country"] == "World"] + .drop(columns=["country", "population", "gdp"], errors="raise") + .reset_index(drop=True) + ) + # Assign human-readable names to disaster types. + tb_global["type"] = tb_global.astype({"type": str})["type"].replace( + {disaster_type: disaster_type.capitalize().replace("_", " ") for disaster_type in tb_global["type"].unique()} + ) + # Treat column for disaster type as the new entity (so they can be selected in grapher as if they were countries). + tb_global = tb_global.rename(columns={"type": "country"}, errors="raise") + + # Set an appropriate index. + tb_global = tb_global.format() + + tb_global.metadata.title = GRAPHER_DATASET_TITLE + tb_global.metadata.short_name = GRAPHER_DATASET_SHORT_NAME + + # Create new grapher dataset, update metadata, add table, and save dataset. + ds_grapher = create_dataset(dest_dir, tables=[tb_global], check_variables_metadata=True) + ds_grapher.save() diff --git a/etl/steps/data/meadow/emdat/2024-04-11/natural_disasters.py b/etl/steps/data/meadow/emdat/2024-04-11/natural_disasters.py new file mode 100644 index 00000000000..8ab53402c1b --- /dev/null +++ b/etl/steps/data/meadow/emdat/2024-04-11/natural_disasters.py @@ -0,0 +1,62 @@ +"""Load snapshot of EM-DAT natural disasters data and prepare a table with basic metadata. + +""" +from etl.helpers import PathFinder, create_dataset + +# Get paths and naming conventions for current step. +paths = PathFinder(__file__) + +# Columns to extract from raw data, and how to rename them. +COLUMNS = { + "Country": "country", + "Disaster Group": "group", + "Disaster Subgroup": "subgroup", + "Disaster Type": "type", + "Disaster Subtype": "subtype", + "Event Name": "event", + "Region": "region", + "Total Deaths": "total_dead", + "No. Injured": "injured", + "No. Affected": "affected", + "No. Homeless": "homeless", + "Total Affected": "total_affected", + "Reconstruction Costs ('000 US$)": "reconstruction_costs", + "Insured Damage ('000 US$)": "insured_damages", + "Total Damage ('000 US$)": "total_damages", + "Start Year": "start_year", + "Start Month": "start_month", + "Start Day": "start_day", + "End Year": "end_year", + "End Month": "end_month", + "End Day": "end_day", +} + + +def run(dest_dir: str) -> None: + # + # Load and process inputs. + # + # Load snapshot. + snap = paths.load_snapshot("natural_disasters.xlsx") + tb = snap.read(sheet_name="EM-DAT Data") + + # + # Process data. + # + # Select and rename columns. + tb = tb[list(COLUMNS)].rename(columns=COLUMNS, errors="raise") + + # Sanity check. + error = "Expected only 'Natural' in 'group' column." + assert set(tb["group"]) == set(["Natural"]), error + + # Set an appropriate index and sort conveniently. + # NOTE: There are multiple rows for certain country-years. This will be handled in the garden step. + tb = tb.format(keys=["country", "start_year"], verify_integrity=False, sort_columns=True) + + # + # Save outputs. + # + # Create a new meadow dataset. + ds_meadow = create_dataset(dest_dir, tables=[tb], check_variables_metadata=True) + ds_meadow.save() diff --git a/snapshots/emdat/2024-04-11/natural_disasters.py b/snapshots/emdat/2024-04-11/natural_disasters.py new file mode 100644 index 00000000000..9639c1f36cb --- /dev/null +++ b/snapshots/emdat/2024-04-11/natural_disasters.py @@ -0,0 +1,40 @@ +"""Ingest EM-DAT raw data on natural disasters. + +Before running this script, the data needs to be downloaded: +* Register at https://public.emdat.be/ and verify email. +* Access https://public.emdat.be/data and select: + * "Natural" in the list of "Classification". + * All regions in "Countries". + * All years. +* Activate "Include Historical events (pre-2000)". +* Then click on "Download". +* Run this script using the argument --path-to-file followed by the path to the downloaded file. + +""" + +from pathlib import Path + +import click + +from etl.snapshot import Snapshot + +# Version for current snapshot dataset. +SNAPSHOT_VERSION = Path(__file__).parent.name + + +@click.command() +@click.option( + "--upload/--skip-upload", + default=True, + type=bool, + help="Upload dataset to Snapshot", +) +@click.option("--path-to-file", prompt=True, type=str, help="Path to local data file.") +def main(path_to_file: str, upload: bool) -> None: + # Create a new snapshot. + snap = Snapshot(f"emdat/{SNAPSHOT_VERSION}/natural_disasters.xlsx") + snap.create_snapshot(filename=path_to_file, upload=upload) + + +if __name__ == "__main__": + main() diff --git a/snapshots/emdat/2024-04-11/natural_disasters.xlsx.dvc b/snapshots/emdat/2024-04-11/natural_disasters.xlsx.dvc new file mode 100644 index 00000000000..7288a38f38e --- /dev/null +++ b/snapshots/emdat/2024-04-11/natural_disasters.xlsx.dvc @@ -0,0 +1,18 @@ +meta: + origin: + producer: EM-DAT, CRED / UCLouvain + title: Natural disasters + description: |- + EM-DAT contains data on the occurrence and impacts of mass disasters worldwide from 1900 to the present day. EM-DAT data includes all categories classified as "natural disasters" (distinguished from technological disasters, such as oil spills and industrial accidents). This includes those from drought, earthquakes, extreme temperatures, extreme weather, floods, fogs, glacial lake outbursts, landslide, dry mass movements, volcanic activity, and wildfires. + citation_full: |- + EM-DAT, CRED / UCLouvain, Brussels, Belgium - www.emdat.be + url_main: https://emdat.be/ + date_accessed: 2024-04-11 + date_published: 2024-04-05 + license: + name: UCLouvain 2023 + url: https://doc.emdat.be/docs/legal/terms-of-use/ +outs: + - md5: 95507e56ee11e9bfa45ef4eeaa948838 + size: 4655627 + path: natural_disasters.xlsx