Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue851 parse province-level vaccination data #876

Merged
merged 4 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions covsirphy/cleaning/vaccine_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from pathlib import Path
import numpy as np
import pandas as pd
import datetime
from datetime import datetime
from covsirphy.util.error import deprecate, SubsetNotFoundError
from covsirphy.util.term import Term
from covsirphy.cleaning.cbase import CleaningBase


Expand All @@ -22,6 +23,7 @@ class VaccineData(CleaningBase):
- Date: Observation date
- Country: country/region name
- ISO3: ISO 3166-1 alpha-3, like JPN
- Province: province/prefecture/state name
- Product: vaccine product names
- Vaccinations: cumulative number of vaccinations
- Vaccinated_once: cumulative number of people who received at least one vaccine dose
Expand All @@ -37,30 +39,29 @@ class VaccineData(CleaningBase):
- Date (pandas.TimeStamp): observation dates
- Country (pandas.Category): country (or province) names
- ISO3 (pandas.Category): ISO3 codes
- Province (pandas.Category): province/prefecture/state name
- Product (pandas.Category): vaccine product names
- Vaccinations (int): cumulative number of vaccinations
- Vaccinated_once (int): cumulative number of people who received at least one vaccine dose
- Vaccinated_full (int): cumulative number of people who received all doses prescrived by the protocol
"""
# Columns of self._raw and self._clean_df
RAW_COLS = [
CleaningBase.DATE, CleaningBase.COUNTRY, CleaningBase.ISO3, CleaningBase.PRODUCT,
CleaningBase.VAC, CleaningBase.V_ONCE, CleaningBase.V_FULL]
Term.DATE, Term.COUNTRY, Term.ISO3, Term.PROVINCE, Term.PRODUCT,
Term.VAC, Term.V_ONCE, Term.V_FULL]
# Columns of self.cleaned()
CLEANED_COLS = RAW_COLS[:]
# Columns of self.subset()
SUBSET_COLS = [CleaningBase.DATE, CleaningBase.VAC, CleaningBase.V_ONCE, CleaningBase.V_FULL]
SUBSET_COLS = [Term.DATE, Term.VAC, Term.V_ONCE, Term.V_FULL]

def __init__(self, filename=None, data=None, citation=None, **kwargs):
# Raw data
if data is not None and self.PROVINCE in data:
data_c = data.loc[data[self.PROVINCE] == self.UNKNOWN]
self._raw = self._parse_raw(filename, data_c, self.RAW_COLS)
else:
self._raw = self._parse_raw(filename, data, self.RAW_COLS)
self._raw = self._parse_raw(filename, data, self.RAW_COLS)
# Backward compatibility
if self._raw.empty:
self._raw = self._retrieve(filename, **kwargs)
if self.PROVINCE not in self._raw:
self._raw[self.PROVINCE] = self.UNKNOWN
# Data cleaning
self._cleaned_df = pd.DataFrame(columns=self.RAW_COLS) if self._raw.empty else self._cleaning()
# Citation
Expand Down Expand Up @@ -119,6 +120,7 @@ def _cleaning(self):
- Date (pandas.TimeStamp): observation dates
- Country (pandas.Category): country (or province) names
- ISO3 (pandas.Category): ISO3 codes
- Province: province/prefecture/state name
- Product (pandas.Category): vaccine product names
- Vaccinations (int): cumulative number of vaccinations
- Vaccinated_once (int): cumulative number of people who received at least one vaccine dose
Expand All @@ -127,36 +129,32 @@ def _cleaning(self):
df = self._raw.copy()
# Date
df[self.DATE] = pd.to_datetime(df[self.DATE])
for col in [self.COUNTRY, self.ISO3, self.PRODUCT]:
df[col] = df[col].astype("category")
# Fill in NA values
for col in [self.VAC, self.V_ONCE, self.V_FULL]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df[col] = df.groupby(self.ISO3)[col].fillna(method="ffill").fillna(0).astype(np.int64)
today_date = datetime.datetime.today().replace(hour=00, minute=00, second=00, microsecond=00)
for country in df.Country.unique():
subset_df = df.loc[df[self.COUNTRY] == country]
# Add any missing dates up until today
if subset_df[self.DATE].max() < today_date:
new_dates = pd.date_range(
subset_df[self.DATE].max() + datetime.timedelta(days=1), today_date)
subset_df = subset_df.reset_index(drop=True)
keep_index = subset_df[self.VAC].idxmax() + 1
new_df = pd.DataFrame(index=new_dates, columns=subset_df.columns)
new_df.index.name = self.DATE
new_df = new_df.drop(self.DATE, axis=1).reset_index()
subset_df = pd.concat([subset_df, new_df], axis=0, ignore_index=True).ffill()
subset_df = subset_df.loc[keep_index:]
df = pd.concat([df, subset_df], axis=0, ignore_index=True)
df.sort_values(by=[self.COUNTRY, self.DATE], ignore_index=True, inplace=True)
country_df = df.loc[:, [self.COUNTRY, self.ISO3, self.PRODUCT]].drop_duplicates()
# Extent dates to today
today_date = datetime.today().replace(hour=00, minute=00, second=00, microsecond=00)
df = df.pivot_table(
values=[self.VAC, self.V_ONCE, self.V_FULL],
index=self.DATE, columns=[self.COUNTRY, self.PROVINCE], aggfunc="last")
df = df.reindex(pd.date_range(df.index[0], today_date, freq="D"))
df.index.name = self.DATE
df = df.ffill().fillna(0).astype(np.int64).stack().stack().reset_index()
df.sort_values(by=[self.COUNTRY, self.PROVINCE, self.DATE], ignore_index=True, inplace=True)
df = df.merge(country_df, on=self.COUNTRY)
# Set dtype for category data
for col in [self.COUNTRY, self.ISO3, self.PROVINCE, self.PRODUCT]:
df[col] = df[col].astype("category")
return df.loc[:, self.RAW_COLS]

def subset(self, country, product=None, start_date=None, end_date=None):
def subset(self, country, province=None, product=None, start_date=None, end_date=None):
lisphilar marked this conversation as resolved.
Show resolved Hide resolved
"""
Return subset of the country/province and start/end date.

Args:
country (str or None): country name or ISO3 code
country (str): country name or ISO3 code
province (str or None): province name
product (str or None): vaccine product name
start_date (str or None): start date, like 22Jan2020
end_date (str or None): end date, like 01Feb2020
Expand All @@ -166,15 +164,15 @@ def subset(self, country, product=None, start_date=None, end_date=None):
Index
reset index
Columns
- Date (pandas.TimeStamp): observation date
- Date (pandas.Timestamp): observation date
- Vaccinations (int): the number of vaccinations
- Vaccinated_once (int): cumulative number of people who received at least one vaccine dose
- Vaccinated_full (int): cumulative number of people who received all doses prescrived by the protocol
"""
df = self._cleaned_df.copy()
# Subset by country
country_alias = self.ensure_country_name(country)
df = df.loc[df[self.COUNTRY] == country_alias]
df = df.loc[(df[self.COUNTRY] == country_alias) & (df[self.PROVINCE] == (province or self.UNKNOWN))]
# Subset by product name
if product is not None:
df = df.loc[df[self.PRODUCT] == product]
Expand Down
43 changes: 20 additions & 23 deletions covsirphy/loading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, directory="input", update_interval=12, basename_dict=None, ve
# Verbosity
self._verbose = self._ensure_natural_int(verbose, name="verbose", include_zero=True)
# Column names to indentify records
self._id_cols = [self.DATE, self.COUNTRY, self.PROVINCE]
self._id_cols = [self.COUNTRY, self.PROVINCE, self.DATE]
# Datasets retrieved from local files
self._local_df = pd.DataFrame()
self._local_citations = []
Expand Down Expand Up @@ -327,9 +327,7 @@ def _add_remote(self, current_df, remote_handler, filename, citation_dict):
remote_df[self.PROVINCE] = remote_df[self.PROVINCE].fillna(self.UNKNOWN)
remote_df = remote_df.set_index(self._id_cols)
# Update the current database
df.update(remote_df, overwrite=False)
df = pd.concat([df, remote_df], ignore_index=False, sort=True).reset_index()
df = df.drop_duplicates(subset=self._id_cols, keep="first").set_index(self._id_cols)
df = df.combine_first(remote_df).reset_index().set_index(self._id_cols)
# Update citations
cite_dict = {k: [*v, handler.CITATION] if k in remote_df else v for (k, v) in cite_dict.items()}
return (df, cite_dict, handler)
Expand All @@ -355,27 +353,36 @@ def _read_dep(self, basename=None, basename_owid=None, local_file=None, verbose=
"verbose argument was deprecated. Please use DataLoader(verbose).", DeprecationWarning)
self._verbose = self._ensure_natural_int(verbose, name="verbose")

def _auto_lock(self):
def _auto_lock(self, variables):
"""
Automatic database lock before using database.

Args:
variables (list[str] or None): variables to check citations

Returns:
tuple(pandas.DataFrame, dict[str, list[str]]):
- locked database
- dictionary of citation for each variable (column)
- citation list of the variables
"""
# Database lock
try:
self._ensure_lock_status(lock_expected=True)
except NotDBLockedError:
self.lock(*self._id_cols)
return (self._locked_df, self._locked_citation_dict)
# Citation list
if variables is None:
return (self._locked_df, [])
citation_dict = self._locked_citation_dict.copy()
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
return (self._locked_df, sorted(set(citations), key=citations.index))

@property
def covid19dh_citation(self):
"""
Return the list of primary sources of COVID-19 Data Hub.
"""
self._auto_lock()
self._auto_lock(variables=None)
return self._covid19dh_primary

def jhu(self, **kwargs):
Expand All @@ -389,9 +396,7 @@ def jhu(self, **kwargs):
covsirphy.JHUData: dataset regarding the number of cases
"""
self._read_dep(**kwargs)
df, citation_dict = self._auto_lock()
variables = [*JHUData.REQUIRED_COLS, *JHUData.OPTINAL_COLS]
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
df, citations = self._auto_lock(variables=[*JHUData.REQUIRED_COLS, *JHUData.OPTINAL_COLS])
jhu_data = JHUData(data=df, citation="\n".join(citations))
if self.update_interval is None:
return jhu_data
Expand All @@ -408,9 +413,7 @@ def population(self, **kwargs):
covsirphy.PopulationData: dataset regarding population values
"""
self._read_dep(**kwargs)
df, citation_dict = self._auto_lock()
variables = PopulationData.RAW_COLS[:]
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
df, citations = self._auto_lock(variables=PopulationData.RAW_COLS)
return PopulationData(data=df, citation="\n".join(citations))

def oxcgrt(self, **kwargs):
Expand All @@ -424,9 +427,7 @@ def oxcgrt(self, **kwargs):
covsirphy.JHUData: dataset regarding OxCGRT data
"""
self._read_dep(**kwargs)
df, citation_dict = self._auto_lock()
variables = OxCGRTData.RAW_COLS[:]
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
df, citations = self._auto_lock(variables=OxCGRTData.RAW_COLS)
return OxCGRTData(data=df, citation="\n".join(citations))

def japan(self, **kwargs):
Expand Down Expand Up @@ -476,9 +477,7 @@ def pcr(self, **kwargs):
covsirphy.PCRData: dataset regarding the number of tests and confirmed cases
"""
self._read_dep(**kwargs)
df, citation_dict = self._auto_lock()
variables = PCRData.RAW_COLS[:]
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
df, citations = self._auto_lock(variables=PCRData.RAW_COLS)
pcr_data = PCRData(data=df, citation="\n".join(citations))
if self.update_interval is None:
return pcr_data
Expand All @@ -503,9 +502,7 @@ def vaccine(self, **kwargs):
covsirphy.VaccineData: dataset regarding vaccines
"""
self._read_dep(**kwargs)
df, citation_dict = self._auto_lock()
variables = VaccineData.RAW_COLS[:]
citations = [c for (v, line) in citation_dict.items() for c in line if v in variables]
df, citations = self._auto_lock(variables=VaccineData.RAW_COLS)
return VaccineData(data=df.dropna(), citation="\n".join(citations))

def pyramid(self, **kwargs):
Expand Down