From ad4cc3ebadc74c98ffd19dd48987d334cb6f312c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= <82233055+luabida@users.noreply.github.com> Date: Fri, 28 Oct 2022 10:44:12 -0300 Subject: [PATCH] BREAKING CHANGE(refactor): Refactor of Data Collection modules to match ETL worflow (#193) * chore(refactor): Refactor of Data Collection modules to match ETL workflow * Refactor OWID data collection * Move config to a private module * Colombia collection * Refactor foph import module --- docs/tutorials/data/foph.ipynb | 11 +- .../article/download_data.ipynb | 11 +- .../forecast_switzerland/forecast_swiss.py | 2 +- .../forecast_switzerland/verify_train.ipynb | 4 +- .../analysis/forecast_models/lstm_models.py | 2 +- .../{data_collection/config.py => _config.py} | 0 .../{data_collection => colombia}/__init__.py | 0 .../compare_data.py => colombia/extract.py} | 12 +- .../data_chunk.py => colombia/loading.py} | 69 ++++++++-- .../colombia/load_chunks_into_db.py | 76 ----------- .../data/data_collection/foph/compare_data.py | 79 ----------- .../data/data_collection/foph/load_into_db.py | 62 --------- .../data/data_collection/owid/__init__.py | 0 .../data_collection/owid/download_data.py | 56 -------- .../colombia => foph}/__init__.py | 0 .../foph/download_data.py => foph/extract.py} | 34 +++-- epigraphhub/data/foph/loading.py | 123 ++++++++++++++++++ .../update_index.py => foph/transform.py} | 0 epigraphhub/data/{foph.py => foph/viz.py} | 0 .../foph => owid}/__init__.py | 0 .../owid/compare_data.py => owid/extract.py} | 69 +++++++--- .../owid/load_into_db.py => owid/loading.py} | 39 +++--- .../update_index.py => owid/transform.py} | 4 +- 23 files changed, 304 insertions(+), 349 deletions(-) rename epigraphhub/data/{data_collection/config.py => _config.py} (100%) rename epigraphhub/data/{data_collection => colombia}/__init__.py (100%) rename epigraphhub/data/{data_collection/colombia/compare_data.py => colombia/extract.py} (88%) rename epigraphhub/data/{data_collection/colombia/data_chunk.py => colombia/loading.py} (53%) delete mode 100644 epigraphhub/data/data_collection/colombia/load_chunks_into_db.py delete mode 100644 epigraphhub/data/data_collection/foph/compare_data.py delete mode 100644 epigraphhub/data/data_collection/foph/load_into_db.py delete mode 100644 epigraphhub/data/data_collection/owid/__init__.py delete mode 100644 epigraphhub/data/data_collection/owid/download_data.py rename epigraphhub/data/{data_collection/colombia => foph}/__init__.py (100%) rename epigraphhub/data/{data_collection/foph/download_data.py => foph/extract.py} (75%) create mode 100644 epigraphhub/data/foph/loading.py rename epigraphhub/data/{data_collection/foph/update_index.py => foph/transform.py} (100%) rename epigraphhub/data/{foph.py => foph/viz.py} (100%) rename epigraphhub/data/{data_collection/foph => owid}/__init__.py (100%) rename epigraphhub/data/{data_collection/owid/compare_data.py => owid/extract.py} (55%) rename epigraphhub/data/{data_collection/owid/load_into_db.py => owid/loading.py} (92%) rename epigraphhub/data/{data_collection/owid/update_index.py => owid/transform.py} (95%) diff --git a/docs/tutorials/data/foph.ipynb b/docs/tutorials/data/foph.ipynb index 2dfade12..54b8c3e1 100644 --- a/docs/tutorials/data/foph.ipynb +++ b/docs/tutorials/data/foph.ipynb @@ -250,7 +250,7 @@ } ], "source": [ - "from epigraphhub.data.foph import get_cluster_data\n", + "from epigraphhub.data.foph.viz import get_cluster_data\n", "\n", "dict_cols = {\n", " \"foph_cases_d\": [\"datum\", \"georegion\", \"entries\"],\n", @@ -315,7 +315,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3.10.6 64-bit", "language": "python", "name": "python3" }, @@ -329,7 +329,12 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.12" + "version": "3.10.6" + }, + "vscode": { + "interpreter": { + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" + } } }, "nbformat": 4, diff --git a/docs/tutorials/forecast_switzerland/article/download_data.ipynb b/docs/tutorials/forecast_switzerland/article/download_data.ipynb index 6bc4409d..d580a490 100644 --- a/docs/tutorials/forecast_switzerland/article/download_data.ipynb +++ b/docs/tutorials/forecast_switzerland/article/download_data.ipynb @@ -17,7 +17,7 @@ "source": [ "import os \n", "os.chdir('../../')\n", - "from epigraphhub.data.foph import get_cluster_data, get_georegion_data" + "from epigraphhub.data.foph.viz import get_cluster_data, get_georegion_data" ] }, { @@ -86,7 +86,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3.10.6 64-bit", "language": "python", "name": "python3" }, @@ -100,7 +100,12 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.12" + "version": "3.10.6" + }, + "vscode": { + "interpreter": { + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" + } } }, "nbformat": 4, diff --git a/docs/tutorials/forecast_switzerland/forecast_swiss.py b/docs/tutorials/forecast_switzerland/forecast_swiss.py index 4ce6ccf2..9a25b916 100644 --- a/docs/tutorials/forecast_switzerland/forecast_swiss.py +++ b/docs/tutorials/forecast_switzerland/forecast_swiss.py @@ -13,7 +13,7 @@ from epigraphhub.analysis.clustering import compute_clusters from epigraphhub.analysis.forecast_models.ngboost_models import NGBModel -from epigraphhub.data.foph import get_cluster_data, get_data_by_location +from epigraphhub.data.foph.viz import get_cluster_data, get_data_by_location params_model = { "Base": default_tree_learner, diff --git a/docs/tutorials/forecast_switzerland/verify_train.ipynb b/docs/tutorials/forecast_switzerland/verify_train.ipynb index 3787b6b7..e9b3e548 100644 --- a/docs/tutorials/forecast_switzerland/verify_train.ipynb +++ b/docs/tutorials/forecast_switzerland/verify_train.ipynb @@ -21,7 +21,7 @@ "from joblib import dump, load\n", "import matplotlib.pyplot as plt\n", "from epigraphhub.analysis.preprocessing import build_lagged_features\n", - "from epigraphhub.data.foph import get_cluster_data\n", + "from epigraphhub.data.foph.viz import get_cluster_data\n", "from epigraphhub.analysis.forecast_models.ngboost_models import get_targets \n", "from forecast_swiss import get_cluster_by_canton" ] @@ -155,7 +155,7 @@ "outputs": [ { "data": { - "image/png": "\n", + "image/png": "", "text/plain": [ "
" ] diff --git a/epigraphhub/analysis/forecast_models/lstm_models.py b/epigraphhub/analysis/forecast_models/lstm_models.py index 3d976264..67918bd4 100644 --- a/epigraphhub/analysis/forecast_models/lstm_models.py +++ b/epigraphhub/analysis/forecast_models/lstm_models.py @@ -32,7 +32,7 @@ from epigraphhub.analysis.clustering import compute_clusters from epigraphhub.analysis.preprocessing import lstm_split_data as split_data from epigraphhub.analysis.preprocessing import normalize_data -from epigraphhub.data.foph import get_cluster_data +from epigraphhub.data.foph.viz import get_cluster_data def build_model(hidden, features, predict_n, look_back=10, batch_size=1): diff --git a/epigraphhub/data/data_collection/config.py b/epigraphhub/data/_config.py similarity index 100% rename from epigraphhub/data/data_collection/config.py rename to epigraphhub/data/_config.py diff --git a/epigraphhub/data/data_collection/__init__.py b/epigraphhub/data/colombia/__init__.py similarity index 100% rename from epigraphhub/data/data_collection/__init__.py rename to epigraphhub/data/colombia/__init__.py diff --git a/epigraphhub/data/data_collection/colombia/compare_data.py b/epigraphhub/data/colombia/extract.py similarity index 88% rename from epigraphhub/data/data_collection/colombia/compare_data.py rename to epigraphhub/data/colombia/extract.py index 23b814f0..fcacaac1 100644 --- a/epigraphhub/data/data_collection/colombia/compare_data.py +++ b/epigraphhub/data/colombia/extract.py @@ -18,14 +18,20 @@ from loguru import logger from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH +from epigraphhub.data._config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH from epigraphhub.settings import env logger.add(COLOMBIA_LOG_PATH, retention="7 days") client = COLOMBIA_CLIENT -def table_last_update() -> datetime: +def compare() -> bool: + db_last_update = _table_last_update() + data_last_update = _web_last_update() + return db_last_update == data_last_update + + +def _table_last_update() -> datetime: """ This method will connect to the SQL Database and query the maximum date found in Colombia table. @@ -51,7 +57,7 @@ def table_last_update() -> datetime: raise (e) -def web_last_update() -> datetime: +def _web_last_update() -> datetime: """ This method will request the maximum date found in Colombia data through Socrata API and returns it as a datetime object for further evaluation. diff --git a/epigraphhub/data/data_collection/colombia/data_chunk.py b/epigraphhub/data/colombia/loading.py similarity index 53% rename from epigraphhub/data/data_collection/colombia/data_chunk.py rename to epigraphhub/data/colombia/loading.py index fb4ec344..0e378e76 100644 --- a/epigraphhub/data/data_collection/colombia/data_chunk.py +++ b/epigraphhub/data/colombia/loading.py @@ -3,28 +3,79 @@ @author: eduardoaraujo Last change on 2022/09/22 -This module is responsible for slicing data found in Colombia Governmental -COVID database through Socrata API and parse rows to the same pattern before -inserting into the SQL Database. Receives the data and uses a date range to -create chunk slices and yield them with the updated values. +This module is responsible retrieve generated chunks of data containing COVID +information collect via Socrata API from Colombia Governmental's data +collection. Connect to SQL Database and load chunks in order to update +positive_cases_covid_d table. Methods ------- -chunked_fetch(): - Returns a DataFrame with the chunk of Colombia data with parsed values - in order to load them into SQL DB. +gen_chunks_into_db(): + Generate chunks of data to insert into SQL Database using pangres. """ from datetime import datetime, timedelta import pandas as pd +from loguru import logger +from pangres import upsert -from epigraphhub.data.data_collection.config import COLOMBIA_CLIENT +from epigraphhub.connection import get_engine +from epigraphhub.data._config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH +from epigraphhub.settings import env +logger.add(COLOMBIA_LOG_PATH, retention="7 days") client = COLOMBIA_CLIENT -def chunked_fetch(maxrecords, start=0, chunk_size=10000): +def upload(): + """ + This method will receive chunks generated by chunked_fetch and load them + into the SQL Database. Pangres receives the records found in the Colombia data + through Socrata API, uses the generator to load chunks with size of 10000 into + SQL DB using upsert method. + @note Colombia sometimes has a post update in the data, so rows update + in this case is required to retrieve the rows updated. + """ + slice_date = datetime.date(datetime.today()) - timedelta(200) + slice_date = slice_date.strftime("%Y-%m-%d") + + # count the number of records that will be fetched + records = client.get_all( + "gt2j-8ykr", + select="COUNT(*)", + where=f'fecha_reporte_web > "{slice_date}"', + ) + + for i in records: + record_count = i + break + + del records + + maxrecords = int(record_count["COUNT"]) + + engine = get_engine(env.db.default_credential) + + for df_new in _chunked_fetch(maxrecords): + + # save the data + with engine.connect() as conn: + upsert( + con=conn, + df=df_new, + table_name="positive_cases_covid_d", + schema="colombia", + if_row_exists="update", + chunksize=1000, + add_new_columns=True, + create_table=False, + ) + + logger.info("Table positive_cases_covid_d updated.") + + +def _chunked_fetch(maxrecords, start=0, chunk_size=10000): """ Connects to Colombia database through Socrata API and generates slices of data in chunks in order to insert them into diff --git a/epigraphhub/data/data_collection/colombia/load_chunks_into_db.py b/epigraphhub/data/data_collection/colombia/load_chunks_into_db.py deleted file mode 100644 index 11e023f9..00000000 --- a/epigraphhub/data/data_collection/colombia/load_chunks_into_db.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -Created on Mon Jan 31 08:53:59 2022 -@author: eduardoaraujo - -Last change on 2022/09/22 -This module is responsible retrieve generated chunks of data containing COVID -information collect via Socrata API from Colombia Governmental's data -collection. Connect to SQL Database and load chunks in order to update -positive_cases_covid_d table. - -Methods -------- - -gen_chunks_into_db(): - Generate chunks of data to insert into SQL Database using pangres. -""" -from datetime import datetime, timedelta - -from loguru import logger -from pangres import upsert - -from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.colombia.data_chunk import chunked_fetch -from epigraphhub.data.data_collection.config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH -from epigraphhub.settings import env - -logger.add(COLOMBIA_LOG_PATH, retention="7 days") -client = COLOMBIA_CLIENT - - -def gen_chunks_into_db(): - """ - This method will receive chunks generated by chunked_fetch and load them - into the SQL Database. Pangres receives the records found in the Colombia data - through Socrata API, uses the generator to load chunks with size of 10000 into - SQL DB using upsert method. - @note Colombia sometimes has a post update in the data, so rows update - in this case is required to retrieve the rows updated. - @see epigraphhub.data.data_collection.colombia.data_chunk - """ - slice_date = datetime.date(datetime.today()) - timedelta(200) - slice_date = slice_date.strftime("%Y-%m-%d") - - # count the number of records that will be fetched - records = client.get_all( - "gt2j-8ykr", - select="COUNT(*)", - where=f'fecha_reporte_web > "{slice_date}"', - ) - - for i in records: - record_count = i - break - - del records - - maxrecords = int(record_count["COUNT"]) - - engine = get_engine(env.db.default_credential) - - for df_new in chunked_fetch(maxrecords): - - # save the data - with engine.connect() as conn: - upsert( - con=conn, - df=df_new, - table_name="positive_cases_covid_d", - schema="colombia", - if_row_exists="update", - chunksize=1000, - add_new_columns=True, - create_table=False, - ) - - logger.info("Table positive_cases_covid_d updated.") diff --git a/epigraphhub/data/data_collection/foph/compare_data.py b/epigraphhub/data/data_collection/foph/compare_data.py deleted file mode 100644 index a3e16ff1..00000000 --- a/epigraphhub/data/data_collection/foph/compare_data.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -Last change on 2022/09/22 -Comparing Federal Office of Public Health (FOPH) COVID data consists in -a step before pushing it to the database. Is responsible for retrieving -the last date in both CSV and SQL table. - -Methods -------- - -csv_last_update(filename): - Returns the max date in a CSV file. - -table_last_update(table): - Connects to the SQL database and returns its max date. -""" -from datetime import datetime - -import pandas as pd -from loguru import logger - -from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import FOPH_CSV_PATH, FOPH_LOG_PATH -from epigraphhub.settings import env - -logger.add(FOPH_LOG_PATH, retention="7 days") - - -def csv_last_update(filename) -> datetime: - """ - Method responsible for retrieving the maximum date in a CSV file. - - Args: - filename (str) : The CSV filename. - - Returns: - last_update (datetime) : Datetime with the max date in the CSV. - - Raises: - Exception (Exception) : Empty dataframe from CSV. - """ - df = pd.read_csv(f"{FOPH_CSV_PATH}/{filename}") - if "date" not in df: - last_update = df.datum.max() - else: - last_update = df.date.max() - if df.empty: - raise Exception("Empty file.") - return datetime.strptime(str(last_update), "%Y-%m-%d") - - -def table_last_update(table) -> datetime: - """ - Method responsible for connecting and retrieving the maximum date - of a table in the SQL Database. - @see epigraphhub.connection : Where the connection is configured. - - Args: - table (str) : Table name as in the CSV file. Later - tansformed into SQL DB table format. - - Returns: - last_update (datetime) : Datetime with the max date in the table. - - Raises: - Exception (Exception) : Connection with the database could not be - stablished. - """ - engine = get_engine(env.db.default_credential) - try: - df = pd.read_sql(f"select * from switzerland.foph_{table.lower()}_d;", engine) - if "date" not in df: - df = df.datum.dropna() - last_update = df.max() - df = df.date.dropna() - last_update = df.max() - return datetime.strptime(str(last_update), "%Y-%m-%d %H:%M:%S") - except Exception as e: - logger.error(f"Could not access {table} table\n{e}") - raise (e) diff --git a/epigraphhub/data/data_collection/foph/load_into_db.py b/epigraphhub/data/data_collection/foph/load_into_db.py deleted file mode 100644 index 1777b08d..00000000 --- a/epigraphhub/data/data_collection/foph/load_into_db.py +++ /dev/null @@ -1,62 +0,0 @@ -""" -Last change on 2022/09/22 -This module will retrieve the data from a CSV file, create a connection -with the SQL Database and update it with the new information. Pangres -will generate chunks with total length of 1000 and insert them into the -corresponding table as specified by the downloaded CSV file. -@see epigraphhub.data.data_collection.foph.download -@see epigraphhub.connection - -Methods -------- - -load(table, filename): - Connects to SQL DB and update a table with CSV information. -""" -import pandas as pd -from loguru import logger -from pangres import upsert - -from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import FOPH_CSV_PATH, FOPH_LOG_PATH -from epigraphhub.settings import env - -logger.add(FOPH_LOG_PATH, retention="7 days") - - -def load(table, filename): - """ - A generator responsible connecting and loading data - retrieved from a CSV file into its respective table in - the SQL Database, as defined in the connection configuration. - @see epigraphhub.connection - - Args: - table (str) : Raw table name. Later parsed to SQL format. - filename (str) : File name as defined in the CSV URL. - @see .download as above. - """ - new_df = pd.read_csv(f"{FOPH_CSV_PATH}/{filename}") - logger.info(f"Reading {filename}") - - new_df = new_df.rename(columns=str.lower) - new_df.index.name = "id_" - if "date" not in new_df.columns: - new_df["date"] = pd.to_datetime(new_df.datum) - else: - new_df["date"] = pd.to_datetime(new_df.date) - logger.info(f"Table {table} passed to DataFrame") - - engine = get_engine(env.db.default_credential) - with engine.connect() as conn: - upsert( - con=conn, - df=new_df, - table_name=f"foph_{table.lower()}_d", - schema="switzerland", - if_row_exists="update", - chunksize=1000, - add_new_columns=True, - create_table=True, - ) - logger.info(f"Table foph_{table.lower()}_d updated") diff --git a/epigraphhub/data/data_collection/owid/__init__.py b/epigraphhub/data/data_collection/owid/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/epigraphhub/data/data_collection/owid/download_data.py b/epigraphhub/data/data_collection/owid/download_data.py deleted file mode 100644 index 493c06e3..00000000 --- a/epigraphhub/data/data_collection/owid/download_data.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Last change on 2022/09/22 -This module is used for fetching and downloading COVID -data from Our World in Data. The data of interest consists in a -CSV table containing COVID information around the globe. - - -Methods -------- - -download_csv(): - Runs curl from the OWID database and stores the CSV file in the tmp dir. - -remove_csv(): - Removes the CSV file recursively. -""" -import os -import subprocess - -from loguru import logger - -from epigraphhub.data.data_collection.config import ( - OWID_CSV_PATH, - OWID_CSV_URL, - OWID_FILENAME, - OWID_LOG_PATH, -) - -logger.add(OWID_LOG_PATH, retention="7 days") - - -def download_csv(): - """ - This method is responsible for download the CSV file from the - OWID database. The file contains world information about COVID. - """ - os.makedirs(OWID_CSV_PATH, exist_ok=True) - subprocess.run( - [ - "curl", - "--silent", - "-f", - "-o", - f"{OWID_CSV_PATH}/{OWID_FILENAME}", - f"{OWID_CSV_URL}", - ] - ) - logger.info("OWID csv downloaded.") - - -def remove_csv(): - """ - This method deletes the OWID CSV file recursively. - """ - os.remove(f"{OWID_CSV_PATH}/{OWID_FILENAME}") - logger.info("OWID csv removed.") diff --git a/epigraphhub/data/data_collection/colombia/__init__.py b/epigraphhub/data/foph/__init__.py similarity index 100% rename from epigraphhub/data/data_collection/colombia/__init__.py rename to epigraphhub/data/foph/__init__.py diff --git a/epigraphhub/data/data_collection/foph/download_data.py b/epigraphhub/data/foph/extract.py similarity index 75% rename from epigraphhub/data/data_collection/foph/download_data.py rename to epigraphhub/data/foph/extract.py index 97eee0b8..fa1ef893 100644 --- a/epigraphhub/data/data_collection/foph/download_data.py +++ b/epigraphhub/data/foph/extract.py @@ -1,5 +1,5 @@ """ -Last change on 2022/09/22 +Last change on 2022/10/24 This module is used for fetching and downloading COVID data from Federal Office of Public Health. The data of interest consists in the following CSV tables: @@ -35,20 +35,17 @@ """ import os import subprocess +from pathlib import Path import requests from loguru import logger -from epigraphhub.data.data_collection.config import ( - FOPH_CSV_PATH, - FOPH_LOG_PATH, - FOPH_URL, -) +from epigraphhub.data._config import FOPH_CSV_PATH, FOPH_LOG_PATH, FOPH_URL logger.add(FOPH_LOG_PATH, retention="7 days") -def get_csv_relation(source=FOPH_URL): +def fetch(source=FOPH_URL): """ A generator responsible for accessing FOPH and retrieve the CSV relation, such as its Table name and URL as a tuple. @@ -66,7 +63,7 @@ def get_csv_relation(source=FOPH_URL): yield table, url -def download_csv(url): +def download(url): """ This methods runs curl in a URL that corresponds to a CSV file and stores it as specified in the URL. @@ -89,9 +86,22 @@ def download_csv(url): logger.info(f"{filename} downloaded at {FOPH_CSV_PATH}.") -def remove_csvs(): +def remove(filename: str = None, entire_dir: bool = False): """ - Removes recursively the FOPH CSV's folder. + Removes recursively the FOPH CSV's folder or filename. """ - subprocess.run(["rm", "-rf", FOPH_CSV_PATH]) - logger.info(f"{FOPH_CSV_PATH} removed.") + if entire_dir: + subprocess.run(["rm", "-rf", FOPH_CSV_PATH]) + logger.info(f"{FOPH_CSV_PATH} removed.") + + elif filename: + file_path = Path(FOPH_CSV_PATH) / filename + if file_path.exists(): + file_path.unlink() + logger.info(f"{file_path} removed.") + else: + raise Exception(f"{file_path} not found.") + + else: + logger.error(f"Set `entire_dir=True` to remove CSV dir") + raise Exception("Nothing was selected to remove") diff --git a/epigraphhub/data/foph/loading.py b/epigraphhub/data/foph/loading.py new file mode 100644 index 00000000..8194c270 --- /dev/null +++ b/epigraphhub/data/foph/loading.py @@ -0,0 +1,123 @@ +""" +Last change on 2022/10/24 +This module will retrieve the data from a CSV file, create a connection +with the SQL Database and update it with the new information. Pangres +will generate chunks with total length of 1000 and insert them into the +corresponding table as specified by the downloaded CSV file. +@see epigraphhub.connection + +Methods +------- + +load(table, filename): + Connects to SQL DB and update a table with CSV information. +""" +from datetime import datetime + +import pandas as pd +from loguru import logger +from pangres import upsert + +from epigraphhub.connection import get_engine +from epigraphhub.data._config import FOPH_CSV_PATH, FOPH_LOG_PATH +from epigraphhub.settings import env + +logger.add(FOPH_LOG_PATH, retention="7 days") + + +def upload(table, filename): + """ + A generator responsible connecting and loading data + retrieved from a CSV file into its respective table in + the SQL Database, as defined in the connection configuration. + @see epigraphhub.connection + + Args: + table (str) : Raw table name. Later parsed to SQL format. + filename (str) : File name as defined in the CSV URL. + @see .download as above. + """ + new_df = pd.read_csv(f"{FOPH_CSV_PATH}/{filename}") + logger.info(f"Reading {filename}") + + new_df = new_df.rename(columns=str.lower) + new_df.index.name = "id_" + if "date" not in new_df.columns: + new_df["date"] = pd.to_datetime(new_df.datum) + else: + new_df["date"] = pd.to_datetime(new_df.date) + logger.info(f"Table {table} passed to DataFrame") + + engine = get_engine(env.db.default_credential) + with engine.connect() as conn: + upsert( + con=conn, + df=new_df, + table_name=f"foph_{table.lower()}_d", + schema="switzerland", + if_row_exists="update", + chunksize=1000, + add_new_columns=True, + create_table=True, + ) + logger.info(f"Table foph_{table.lower()}_d updated") + + +def compare(filename, table) -> bool: + csv_date = _csv_last_update(filename) + table_date = _table_last_update(table) + return csv_date == table_date + + +def _csv_last_update(filename) -> datetime: + """ + Method responsible for retrieving the maximum date in a CSV file. + + Args: + filename (str) : The CSV filename. + + Returns: + last_update (datetime) : Datetime with the max date in the CSV. + + Raises: + Exception (Exception) : Empty dataframe from CSV. + """ + df = pd.read_csv(f"{FOPH_CSV_PATH}/{filename}") + if "date" not in df: + last_update = df.datum.max() + else: + last_update = df.date.max() + if df.empty: + raise Exception("Empty file.") + return datetime.strptime(str(last_update), "%Y-%m-%d") + + +def _table_last_update(table) -> datetime: + """ + Method responsible for connecting and retrieving the maximum date + of a table in the SQL Database. + @see epigraphhub.connection : Where the connection is configured. + + Args: + table (str) : Table name as in the CSV file. Later + tansformed into SQL DB table format. + + Returns: + last_update (datetime) : Datetime with the max date in the table. + + Raises: + Exception (Exception) : Connection with the database could not be + stablished. + """ + engine = get_engine(env.db.default_credential) + try: + df = pd.read_sql(f"select * from switzerland.foph_{table.lower()}_d;", engine) + if "date" not in df: + df = df.datum.dropna() + last_update = df.max() + df = df.date.dropna() + last_update = df.max() + return datetime.strptime(str(last_update), "%Y-%m-%d %H:%M:%S") + except Exception as e: + logger.error(f"Could not access {table} table\n{e}") + raise (e) diff --git a/epigraphhub/data/data_collection/foph/update_index.py b/epigraphhub/data/foph/transform.py similarity index 100% rename from epigraphhub/data/data_collection/foph/update_index.py rename to epigraphhub/data/foph/transform.py diff --git a/epigraphhub/data/foph.py b/epigraphhub/data/foph/viz.py similarity index 100% rename from epigraphhub/data/foph.py rename to epigraphhub/data/foph/viz.py diff --git a/epigraphhub/data/data_collection/foph/__init__.py b/epigraphhub/data/owid/__init__.py similarity index 100% rename from epigraphhub/data/data_collection/foph/__init__.py rename to epigraphhub/data/owid/__init__.py diff --git a/epigraphhub/data/data_collection/owid/compare_data.py b/epigraphhub/data/owid/extract.py similarity index 55% rename from epigraphhub/data/data_collection/owid/compare_data.py rename to epigraphhub/data/owid/extract.py index 128aabed..6297ef3a 100644 --- a/epigraphhub/data/data_collection/owid/compare_data.py +++ b/epigraphhub/data/owid/extract.py @@ -1,19 +1,18 @@ """ -Last change on 2022/09/22 -Comparing Our World in Data (OWID) COVID data consists in -a step before pushing it to the database. Is responsible for retrieving -the table size in both CSV and in the SQL Database. -@see epigraphhub.connection +Last change on 2022/10/24 +This module is used for fetching and downloading COVID +data from Our World in Data. The data of interest consists in a +CSV table containing COVID information around the globe. + Methods ------- -database_size(remote): - If remote, creates the ssh connection with the SQL server. Then returns - the total count of the OWID table. +download_csv(): + Runs curl from the OWID database and stores the CSV file in the tmp dir. -csv_size(): - Looks for the OWID CSV file and returns its total rows count. +remove_csv(): + Removes the CSV file recursively. """ import os import shlex as sx @@ -22,8 +21,9 @@ from loguru import logger from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import ( +from epigraphhub.data._config import ( OWID_CSV_PATH, + OWID_CSV_URL, OWID_FILENAME, OWID_HOST, OWID_LOG_PATH, @@ -31,9 +31,43 @@ from epigraphhub.settings import env logger.add(OWID_LOG_PATH, retention="7 days") +engine = get_engine(env.db.default_credential) + + +def download() -> None: + """ + This method is responsible for download the CSV file from the + OWID database. The file contains world information about COVID. + """ + os.makedirs(OWID_CSV_PATH, exist_ok=True) + subprocess.run( + [ + "curl", + "--silent", + "-f", + "-o", + f"{OWID_CSV_PATH}/{OWID_FILENAME}", + f"{OWID_CSV_URL}", + ] + ) + logger.info("OWID csv downloaded.") + + +def compare() -> bool: + table_size = _get_database_size(remote=False) + csv_size = _get_csv_size() + return table_size == csv_size -def database_size(remote=True): +def remove() -> None: + """ + This method deletes the OWID CSV file recursively. + """ + os.remove(f"{OWID_CSV_PATH}/{OWID_FILENAME}") + logger.info("OWID csv removed.") + + +def _get_database_size(remote=True) -> int: """ Method responsible for connecting in the SQL database and return the total count of the OWID table. @@ -64,18 +98,13 @@ def database_size(remote=True): proc.kill() -def csv_size(): +def _get_csv_size() -> int: """ Method responsible for connecting in the SQL database and return - the total count of the OWID table. - - Args: - remote (bool) : If the SQL container is not locally configured, - creates a ssh tunnel with the Database. + the total count of the OWID CSV file. Raises: - Exception (Exception) : Connection with the Database could not be - stablished. + Exception (Exception) : CSV file not found. """ try: raw_shape = subprocess.Popen( diff --git a/epigraphhub/data/data_collection/owid/load_into_db.py b/epigraphhub/data/owid/loading.py similarity index 92% rename from epigraphhub/data/data_collection/owid/load_into_db.py rename to epigraphhub/data/owid/loading.py index e75a1656..1193943a 100644 --- a/epigraphhub/data/data_collection/owid/load_into_db.py +++ b/epigraphhub/data/owid/loading.py @@ -24,7 +24,7 @@ from loguru import logger from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import ( +from epigraphhub.data._config import ( OWID_CSV_PATH, OWID_FILENAME, OWID_HOST, @@ -37,24 +37,7 @@ engine_public = get_engine(env.db.default_credential) -def parse_types(df): - """ - Method responsible for receive the OWID DataFrame and return - a DataFrame with the date parsed to datetime objects. - - Args: - df (DataFrame) : OWID DataFrame. - - Returns: - df (DataFrame) : OWID DataFrame with date column parsed to datetime. - """ - df = df.convert_dtypes() - df["date"] = pd.to_datetime(df.date) - logger.info("OWID data types parsed.") - return df - - -def load(remote=True): +def upload(remote=True): """ A generator responsible connecting and loading data retrieved from the OWID CSV file into owid_covid table in @@ -70,7 +53,7 @@ def load(remote=True): ) try: data = pd.read_csv(os.path.join(OWID_CSV_PATH, OWID_FILENAME)) - data = parse_types(data) + data = _parse_date(data) engine = engine_public data.to_sql( "owid_covid", @@ -87,3 +70,19 @@ def load(remote=True): finally: if remote: proc.kill() + + +def _parse_date(df): + """ + Method responsible for receive the OWID DataFrame and return + a DataFrame with the date parsed to datetime objects. + + Args: + df (DataFrame) : OWID DataFrame. + + Returns: + df (DataFrame) : OWID DataFrame with date column parsed to datetime. + """ + df = df.convert_dtypes() + df["date"] = pd.to_datetime(df.date) + return df diff --git a/epigraphhub/data/data_collection/owid/update_index.py b/epigraphhub/data/owid/transform.py similarity index 95% rename from epigraphhub/data/data_collection/owid/update_index.py rename to epigraphhub/data/owid/transform.py index 2bff3c70..32763d75 100644 --- a/epigraphhub/data/data_collection/owid/update_index.py +++ b/epigraphhub/data/owid/transform.py @@ -1,5 +1,5 @@ """ -Last change on 2022/09/22 +Last change on 2022/10/24 This module is responsible for including missing index in the OWID SQL table. Connects to the SQL server as defined in the connection. @see epigraphhub.connection @@ -16,7 +16,7 @@ from loguru import logger from epigraphhub.connection import get_engine -from epigraphhub.data.data_collection.config import OWID_HOST, OWID_LOG_PATH +from epigraphhub.data._config import OWID_HOST, OWID_LOG_PATH from epigraphhub.settings import env logger.add(OWID_LOG_PATH, retention="7 days")