-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
BREAKING CHANGE(refactor SINAN data collection): Move SINAN data coll…
…ection to ETL format (#194) * chore(script): chore(script): Remove unnecessary code * Extract data from SINAN - Pysus * Linter * Change way of extracting data by year or aggravate * Implementing new changes from PySUS * Minor fix and tests * Add unique constraints on foph_d tables * Move commit to proper branch * Move foph refactor to proper branch * Linter * Docstrings * Aggravate to disease Co-authored-by: esloch <[email protected]>
- Loading branch information
Showing
7 changed files
with
518 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from pathlib import PosixPath | ||
|
||
from loguru import logger | ||
from pysus.online_data import SINAN | ||
|
||
from epigraphhub.data._config import SINAN_LOG_PATH | ||
|
||
logger.add(SINAN_LOG_PATH, retention="7 days") | ||
|
||
diseases = SINAN.agravos | ||
|
||
|
||
def download(disease: str): | ||
""" | ||
Download all parquets available for an disease, | ||
according to `SINAN.agravos`. | ||
Attrs: | ||
disease (str): The disease to be downloaded. | ||
data_dir (str) : The output directory were files will be downloaded. | ||
A directory with the disease code will be created. | ||
Returns: | ||
parquets_paths_list list(PosixPath) : A list with all parquets dirs. | ||
""" | ||
|
||
SINAN.download_all_years_in_chunks(disease) | ||
|
||
logger.info(f"All years for {disease} downloaded at /tmp/pysus") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import os | ||
from pathlib import Path | ||
|
||
from loguru import logger | ||
from pangres import upsert | ||
from pysus.online_data import parquets_to_dataframe as to_df | ||
|
||
from epigraphhub.connection import get_engine | ||
from epigraphhub.data._config import SINAN_LOG_PATH | ||
from epigraphhub.settings import env | ||
|
||
logger.add(SINAN_LOG_PATH, retention="7 days") | ||
|
||
engine = get_engine(credential_name=env.db.default_credential) | ||
|
||
|
||
def upload(): | ||
""" | ||
Connects to the EGH SQL server and load all the chunks for all | ||
diseases found at `/tmp/pysus` into database. This method cleans | ||
the chunks left. | ||
""" | ||
diseases_dir = Path('/tmp/pysus').glob('*') | ||
di_years_dir = [x for x in diseases_dir if x.is_dir()] | ||
|
||
for dir in di_years_dir: | ||
|
||
parquets_dir = Path(dir).glob('*.parquet') | ||
parquets = [x for x in parquets_dir if x.is_dir()] | ||
|
||
for parquet in parquets: | ||
if 'parquet' in Path(parquet).suffix and any(os.listdir(parquet)): | ||
|
||
df = to_df(str(parquet), clean_after_read=True) | ||
df.columns = df.columns.str.lower() | ||
df.index.name = "index" | ||
|
||
table_i = str(parquet).split("/")[-1].split(".parquet")[0] | ||
st, yr = table_i[:-4].lower(), table_i[-2:] | ||
table = "".join([st, yr]) | ||
schema = "brasil" | ||
|
||
with engine.connect() as conn: | ||
try: | ||
|
||
upsert( | ||
con=conn, | ||
df=df, | ||
table_name=table, | ||
schema=schema, | ||
if_row_exists="update", | ||
chunksize=1000, | ||
add_new_columns=True, | ||
create_table=True, | ||
) | ||
|
||
logger.info(f"Table {table} updated") | ||
|
||
except Exception as e: | ||
logger.error(f"Not able to upsert {table} \n{e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import pandas as pd | ||
from loguru import logger | ||
from pysus.online_data import SINAN | ||
from pysus.online_data import parquets_to_dataframe as to_df | ||
|
||
from epigraphhub.connection import get_engine | ||
from epigraphhub.data._config import SINAN_LOG_PATH | ||
from epigraphhub.settings import env | ||
|
||
logger.add(SINAN_LOG_PATH, retention="7 days") | ||
|
||
engine = get_engine(credential_name=env.db.default_credential) | ||
aggrs = SINAN.agravos | ||
|
||
|
||
def parquet(ppath: str, clean_after_read=False) -> pd.DataFrame: | ||
""" | ||
Convert the parquet files into a pandas DataFrame. | ||
Parameters | ||
---------- | ||
fname (str) : Path of the parquet dir. | ||
clean_after_read (bool): If set to True, will delete the data after | ||
returning the DataFrame. | ||
Returns | ||
------- | ||
df (DataFrame) : A Pandas DataFrame. | ||
""" | ||
|
||
df = to_df(str(ppath), clean_after_read) | ||
logger.info("Parquet files converted to dataFrame") | ||
df.columns = df.columns.str.lower() | ||
|
||
return df | ||
|
||
|
||
def table(disease: str, year: int) -> pd.DataFrame: | ||
""" | ||
Connect to EGH SQL server and retrieve the data by disease and year. | ||
Parameters | ||
---------- | ||
disease (str) : The name of the disease according to SINAN.agravos | ||
year (int) : Year of the wanted data. | ||
Returns | ||
------- | ||
df (DataFrame): The data requested in a Pandas DataFrame. | ||
""" | ||
|
||
year = str(year)[-2:].zfill(2) | ||
disease = SINAN.check_case(disease) | ||
dis_code = aggrs[disease].lower() | ||
tablename = f"{dis_code}{year}" | ||
|
||
with engine.connect() as conn: | ||
df = pd.read_sql(f"SELECT * FROM brasil.{tablename}", conn) | ||
|
||
return df |
Oops, something went wrong.