Skip to content

Commit

Permalink
BREAKING CHANGE(refactor): Refactor of Data Collection modules to mat…
Browse files Browse the repository at this point in the history
…ch ETL worflow (#193)

* chore(refactor): Refactor of Data Collection modules to match ETL workflow

* Refactor OWID data collection

* Move config to a private module

* Colombia collection

* Refactor foph import module
  • Loading branch information
luabida authored Oct 28, 2022
1 parent 0fb6b50 commit ad4cc3e
Show file tree
Hide file tree
Showing 23 changed files with 304 additions and 349 deletions.
11 changes: 8 additions & 3 deletions docs/tutorials/data/foph.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
}
],
"source": [
"from epigraphhub.data.foph import get_cluster_data\n",
"from epigraphhub.data.foph.viz import get_cluster_data\n",
"\n",
"dict_cols = {\n",
" \"foph_cases_d\": [\"datum\", \"georegion\", \"entries\"],\n",
Expand Down Expand Up @@ -315,7 +315,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3.10.6 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -329,7 +329,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
"version": "3.10.6"
},
"vscode": {
"interpreter": {
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
"nbformat": 4,
Expand Down
11 changes: 8 additions & 3 deletions docs/tutorials/forecast_switzerland/article/download_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"source": [
"import os \n",
"os.chdir('../../')\n",
"from epigraphhub.data.foph import get_cluster_data, get_georegion_data"
"from epigraphhub.data.foph.viz import get_cluster_data, get_georegion_data"
]
},
{
Expand Down Expand Up @@ -86,7 +86,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3.10.6 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -100,7 +100,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
"version": "3.10.6"
},
"vscode": {
"interpreter": {
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/forecast_switzerland/forecast_swiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from epigraphhub.analysis.clustering import compute_clusters
from epigraphhub.analysis.forecast_models.ngboost_models import NGBModel
from epigraphhub.data.foph import get_cluster_data, get_data_by_location
from epigraphhub.data.foph.viz import get_cluster_data, get_data_by_location

params_model = {
"Base": default_tree_learner,
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/forecast_switzerland/verify_train.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion epigraphhub/analysis/forecast_models/lstm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from epigraphhub.analysis.clustering import compute_clusters
from epigraphhub.analysis.preprocessing import lstm_split_data as split_data
from epigraphhub.analysis.preprocessing import normalize_data
from epigraphhub.data.foph import get_cluster_data
from epigraphhub.data.foph.viz import get_cluster_data


def build_model(hidden, features, predict_n, look_back=10, batch_size=1):
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
from loguru import logger

from epigraphhub.connection import get_engine
from epigraphhub.data.data_collection.config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH
from epigraphhub.data._config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH
from epigraphhub.settings import env

logger.add(COLOMBIA_LOG_PATH, retention="7 days")
client = COLOMBIA_CLIENT


def table_last_update() -> datetime:
def compare() -> bool:
db_last_update = _table_last_update()
data_last_update = _web_last_update()
return db_last_update == data_last_update


def _table_last_update() -> datetime:
"""
This method will connect to the SQL Database and query the maximum date found in
Colombia table.
Expand All @@ -51,7 +57,7 @@ def table_last_update() -> datetime:
raise (e)


def web_last_update() -> datetime:
def _web_last_update() -> datetime:
"""
This method will request the maximum date found in Colombia data through Socrata API
and returns it as a datetime object for further evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,79 @@
@author: eduardoaraujo
Last change on 2022/09/22
This module is responsible for slicing data found in Colombia Governmental
COVID database through Socrata API and parse rows to the same pattern before
inserting into the SQL Database. Receives the data and uses a date range to
create chunk slices and yield them with the updated values.
This module is responsible retrieve generated chunks of data containing COVID
information collect via Socrata API from Colombia Governmental's data
collection. Connect to SQL Database and load chunks in order to update
positive_cases_covid_d table.
Methods
-------
chunked_fetch():
Returns a DataFrame with the chunk of Colombia data with parsed values
in order to load them into SQL DB.
gen_chunks_into_db():
Generate chunks of data to insert into SQL Database using pangres.
"""
from datetime import datetime, timedelta

import pandas as pd
from loguru import logger
from pangres import upsert

from epigraphhub.data.data_collection.config import COLOMBIA_CLIENT
from epigraphhub.connection import get_engine
from epigraphhub.data._config import COLOMBIA_CLIENT, COLOMBIA_LOG_PATH
from epigraphhub.settings import env

logger.add(COLOMBIA_LOG_PATH, retention="7 days")
client = COLOMBIA_CLIENT


def chunked_fetch(maxrecords, start=0, chunk_size=10000):
def upload():
"""
This method will receive chunks generated by chunked_fetch and load them
into the SQL Database. Pangres receives the records found in the Colombia data
through Socrata API, uses the generator to load chunks with size of 10000 into
SQL DB using upsert method.
@note Colombia sometimes has a post update in the data, so rows update
in this case is required to retrieve the rows updated.
"""
slice_date = datetime.date(datetime.today()) - timedelta(200)
slice_date = slice_date.strftime("%Y-%m-%d")

# count the number of records that will be fetched
records = client.get_all(
"gt2j-8ykr",
select="COUNT(*)",
where=f'fecha_reporte_web > "{slice_date}"',
)

for i in records:
record_count = i
break

del records

maxrecords = int(record_count["COUNT"])

engine = get_engine(env.db.default_credential)

for df_new in _chunked_fetch(maxrecords):

# save the data
with engine.connect() as conn:
upsert(
con=conn,
df=df_new,
table_name="positive_cases_covid_d",
schema="colombia",
if_row_exists="update",
chunksize=1000,
add_new_columns=True,
create_table=False,
)

logger.info("Table positive_cases_covid_d updated.")


def _chunked_fetch(maxrecords, start=0, chunk_size=10000):
"""
Connects to Colombia database through Socrata API and generates
slices of data in chunks in order to insert them into
Expand Down
76 changes: 0 additions & 76 deletions epigraphhub/data/data_collection/colombia/load_chunks_into_db.py

This file was deleted.

79 changes: 0 additions & 79 deletions epigraphhub/data/data_collection/foph/compare_data.py

This file was deleted.

Loading

0 comments on commit ad4cc3e

Please sign in to comment.