Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): OWID, FOPH and Colombia dag. #133

Merged
merged 30 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5eefb19
Begin of owid workflow
luabida Aug 24, 2022
3d25ae1
Add logger and clean data files do owid dag
luabida Aug 25, 2022
7eb5266
Colombia data DAG
luabida Aug 26, 2022
66d214e
Add foph DAG
luabida Sep 1, 2022
27a338f
Run black
luabida Sep 1, 2022
c757d4f
DEBUG log, some table sizes won't ever match
luabida Sep 1, 2022
eee97b6
Remove duplicated file
luabida Sep 1, 2022
7514ec8
DRAFT test epigraphhub_py import
luabida Sep 2, 2022
0a7be2d
foph dag
luabida Sep 2, 2022
2aaa60d
Remove old data
luabida Sep 5, 2022
dcf79fe
Finish colombia data
luabida Sep 6, 2022
e502605
Reduce comparation bugs
luabida Sep 6, 2022
22dfc71
Minor fixes
luabida Sep 6, 2022
bb45e1f
Add stdout logging to DAGs
luabida Sep 8, 2022
ec337b5
Start of Airflow Documentation
luabida Sep 8, 2022
08e8f98
Configuring the postgres connection with .config/epigraphhub.yaml file
luabida Sep 23, 2022
caa2e13
Add documentation for owid dag
luabida Sep 23, 2022
0a207f9
FOPH docstrings plus minor adjustments
luabida Sep 26, 2022
b1af26f
Import epigraphhub from pypi
luabida Oct 10, 2022
36f08c2
chore(airflow): add initial tests
luabida Sep 26, 2022
720478c
run black
luabida Sep 26, 2022
3ec0715
Add unitary test for dags, check existence
luabida Sep 26, 2022
edf8743
Add tests to verify if table exists
luabida Oct 11, 2022
9264a6e
Merge pull request #1 from luabida/airflow-tests
luabida Oct 11, 2022
81caf6e
Fix EOF
luabida Oct 11, 2022
4f4ee21
Remove database tests
luabida Oct 11, 2022
ae3e2f8
Fix permissions for legacy Cron scripts
luabida Oct 11, 2022
e595420
Removing legacy Data Collection Cron scripts and setting Email on Fai…
luabida Oct 13, 2022
589dcb4
Removing forecast code. Deprecated.
luabida Oct 13, 2022
806b381
Fix typos
luabida Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Run black
  • Loading branch information
luabida committed Sep 1, 2022
commit 27a338f9bce879bc13d56cf92552780dd9816f3e
16 changes: 10 additions & 6 deletions data_collection/colombia/data_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
client = COLOMBIA_SOC


class DFChunkGenerator():

class DFChunkGenerator:
def chunked_fetch(start, chunk_size, maxrecords):

slice_date = datetime.date(datetime.today()) - timedelta(200)
Expand Down Expand Up @@ -44,14 +43,19 @@ def chunked_fetch(start, chunk_size, maxrecords):
if df_new.empty:
break

df_new.set_index(['id_de_caso'] , inplace = True)
df_new.set_index(["id_de_caso"], inplace=True)

df_new = df_new.convert_dtypes()

# change some strings to a standard
df_new.replace(to_replace ={ 'ubicacion': {'casa': 'Casa', 'CASA': 'Casa'},
'estado': {'leve': 'Leve', 'LEVE': 'Leve'},
'sexo': {'f': 'F', 'm': 'M'}}, inplace = True)
df_new.replace(
to_replace={
"ubicacion": {"casa": "Casa", "CASA": "Casa"},
"estado": {"leve": "Leve", "LEVE": "Leve"},
"sexo": {"f": "F", "m": "M"},
},
inplace=True,
)

# transform the datetime columns in the correct time
for c in df_new.columns:
Expand Down
15 changes: 6 additions & 9 deletions data_collection/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@
DB_NAME = os.environ.get("POSTGRES_DB")
DB_NAME_PRIVATE = os.environ.get("POSTGRES_DB_PRIVATE")

DB_URI = (
f"postgresql://{DB_USER}:{DB_PASSWORD}"
f"@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
DB_URI = f"postgresql://{DB_USER}:{DB_PASSWORD}" f"@{DB_HOST}:{DB_PORT}/{DB_NAME}"
DB_URI_PRIVATE = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME_PRIVATE}"
)

# Owid

OWID_CSV_URL = 'https://covid.ourworldindata.org/data/owid-covid-data.csv'
OWID_CSV_PATH = '/tmp/owid/releases'
OWID_FILENAME = OWID_CSV_URL.split('/')[-1]
OWID_HOST = '135.181.41.20'
OWID_CSV_URL = "https://covid.ourworldindata.org/data/owid-covid-data.csv"
OWID_CSV_PATH = "/tmp/owid/releases"
OWID_FILENAME = OWID_CSV_URL.split("/")[-1]
OWID_HOST = "135.181.41.20"
xmnlab marked this conversation as resolved.
Show resolved Hide resolved

# Colombia

Expand All @@ -35,4 +32,4 @@
# foph

FOPH_URL = "https://www.covid19.admin.ch/api/data/context"
FOPH_CSV_PATH = '/tmp/foph/releases'
FOPH_CSV_PATH = "/tmp/foph/releases"
8 changes: 5 additions & 3 deletions data_collection/foph/download_foph_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

logger.add("/var/log/foph_fetch.log", retention="7 days")
xmnlab marked this conversation as resolved.
Show resolved Hide resolved


def get_csv_relation(source=FOPH_URL):
context = requests.get(source).json()
tables = context['sources']['individual']['csv']['daily']
tables = context["sources"]["individual"]["csv"]["daily"]
for table, url in tables.items():
yield table, url


def download_csv(url):
os.makedirs(FOPH_CSV_PATH, exist_ok=True)
filename = url.split("/")[-1]
subprocess.run(['curl', '--silent', '-f', '-o', f'{FOPH_CSV_PATH}/{filename}', url])
logger.info(f'{filename} downloaded at {FOPH_CSV_PATH}')
subprocess.run(["curl", "--silent", "-f", "-o", f"{FOPH_CSV_PATH}/{filename}", url])
logger.info(f"{filename} downloaded at {FOPH_CSV_PATH}")
73 changes: 46 additions & 27 deletions data_collection/foph/foph_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,68 @@

logger.add("/var/log/foph_fetch.log", retention="7 days")


def csv_size(filename):
raw_shape = subprocess.Popen(f'wc -l {os.path.join(FOPH_CSV_PATH, filename)}', shell=True, stdout=subprocess.PIPE).stdout
raw_shape = subprocess.Popen(
f"wc -l {os.path.join(FOPH_CSV_PATH, filename)}",
shell=True,
stdout=subprocess.PIPE,
).stdout
clean = str(raw_shape.read()).split("'")
shape = clean[1].split(' ')[0]
shape = clean[1].split(" ")[0]
return int(shape)


def table_size(table):
engine = create_engine(DB_URI)
try:
try:
with engine.connect().execution_options(autocommit=True) as conn:
curr = conn.execute(text(f"SELECT COUNT(*) FROM switzerland.foph_{table.lower()}_d"))
curr = conn.execute(
text(f"SELECT COUNT(*) FROM switzerland.foph_{table.lower()}_d")
)
for count in curr:
return int(count[0])
except Exception as e:
logger.error(f"Could not access {table} table\n{e}")
raise(e)
raise (e)


def load_into_db(table, filename):
new_df = pd.read_csv(f'{FOPH_CSV_PATH}/{filename}')
logger.info(f'Reading {filename}')
new_df = new_df.rename(columns = str.lower)
new_df.index.name = 'id_'
if not 'date' in new_df.columns:
new_df['date'] = pd.to_datetime(new_df.datum)
new_df = pd.read_csv(f"{FOPH_CSV_PATH}/{filename}")
logger.info(f"Reading {filename}")

new_df = new_df.rename(columns=str.lower)
new_df.index.name = "id_"
if not "date" in new_df.columns:
new_df["date"] = pd.to_datetime(new_df.datum)
else:
new_df['date'] = pd.to_datetime(new_df.date)
logger.info(f'Table {table} passed to DataFrame')
new_df["date"] = pd.to_datetime(new_df.date)
logger.info(f"Table {table} passed to DataFrame")

engine = create_engine(DB_URI)
with engine.connect() as conn:
upsert(con=conn, df=new_df, table_name=f'foph_{table.lower()}_d', schema='switzerland', if_row_exists='update',
chunksize=1000, add_new_columns=True, create_table=True)
logger.info(f'Table {table} updated')
upsert(
con=conn,
df=new_df,
table_name=f"foph_{table.lower()}_d",
schema="switzerland",
if_row_exists="update",
chunksize=1000,
add_new_columns=True,
create_table=True,
)
logger.info(f"Table {table} updated")

with engine.connect() as connection:
try:
connection.execute(f'CREATE INDEX IF NOT EXISTS region_idx ON switzerland.foph_{table.lower()} ("geoRegion");')
except Exception as e:
logger.info(f'Could not create region index: {e}')
try:
connection.execute(f'CREATE INDEX IF NOT EXISTS date_idx ON switzerland.foph_{table.lower()} (date);')
except Exception as e:
logger.info(f'Could not create date index: {e}')

try:
connection.execute(
f'CREATE INDEX IF NOT EXISTS region_idx ON switzerland.foph_{table.lower()} ("geoRegion");'
fccoelho marked this conversation as resolved.
Show resolved Hide resolved
)
except Exception as e:
logger.info(f"Could not create region index: {e}")
try:
connection.execute(
f"CREATE INDEX IF NOT EXISTS date_idx ON switzerland.foph_{table.lower()} (date);"
xmnlab marked this conversation as resolved.
Show resolved Hide resolved
)
except Exception as e:
logger.info(f"Could not create date index: {e}")
26 changes: 18 additions & 8 deletions data_collection/owid/compare_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,41 @@
from sqlalchemy import create_engine, text
import subprocess
import shlex
import sys; CONFIG_PATH = ".."
import sys

CONFIG_PATH = ".."
sys.path.insert(0, CONFIG_PATH)
from config import OWID_HOST, OWID_CSV_PATH, OWID_FILENAME, DB_URI
from loguru import logger

logger.add("/var/log/owid_fetch.log", retention="7 days")
xmnlab marked this conversation as resolved.
Show resolved Hide resolved


def database_size(remote=True):
if remote:
proc = subprocess.Popen(shlex.split(f'ssh -f epigraph@{OWID_HOST} -L 5432:localhost:5432 -NC'))
proc = subprocess.Popen(
shlex.split(f"ssh -f epigraph@{OWID_HOST} -L 5432:localhost:5432 -NC")
)
try:
engine = create_engine(DB_URI)
engine = create_engine(DB_URI)
with engine.connect().execution_options(autocommit=True) as conn:
curr = conn.execute(text("SELECT COUNT(*) FROM owid_covid"))
for count in curr:
return int(count[0])
except Exception as e:
logger.error(f"Could not access OWID table\n{e}")
raise(e)
raise (e)
finally:
if remote:
proc.kill()
proc.kill()


def csv_size():
raw_shape = subprocess.Popen(f'wc -l {os.path.join(OWID_CSV_PATH, OWID_FILENAME)}', shell=True, stdout=subprocess.PIPE).stdout
raw_shape = subprocess.Popen(
f"wc -l {os.path.join(OWID_CSV_PATH, OWID_FILENAME)}",
shell=True,
stdout=subprocess.PIPE,
).stdout
clean = str(raw_shape.read()).split("'")
shape = clean[1].split(' ')[0]
return int(shape) -1
shape = clean[1].split(" ")[0]
return int(shape) - 1
16 changes: 13 additions & 3 deletions data_collection/owid/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@

logger.add("/var/log/owid_fetch.log", retention="7 days")


def download_csv():
os.makedirs(OWID_CSV_PATH, exist_ok=True)
subprocess.run(['curl', '--silent', '-f', '-o', f'{OWID_CSV_PATH}/{OWID_FILENAME}', f'{OWID_CSV_URL}'])
subprocess.run(
[
"curl",
"--silent",
"-f",
"-o",
f"{OWID_CSV_PATH}/{OWID_FILENAME}",
f"{OWID_CSV_URL}",
]
)
logger.warning("OWID csv downloaded.")


def remove_csv():
os.remove(f'{OWID_CSV_PATH}/{OWID_FILENAME}')
os.remove(f"{OWID_CSV_PATH}/{OWID_FILENAME}")
logger.warning("OWID csv removed.")

35 changes: 26 additions & 9 deletions data_collection/owid/load_into_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,46 @@

logger.add("/var/log/owid_fetch.log", retention="7 days")


def parse_types(df):
df = df.convert_dtypes()
df['date'] = pd.to_datetime(df.date)
df["date"] = pd.to_datetime(df.date)
logger.warning("OWID data types parsed.")
return df


def load(remote=True):
if remote:
proc = subprocess.Popen(shlex.split(f'ssh -f epigraph@{OWID_HOST} -L 5432:localhost:5432 -NC'))
proc = subprocess.Popen(
shlex.split(f"ssh -f epigraph@{OWID_HOST} -L 5432:localhost:5432 -NC")
)
try:
data = pd.read_csv(os.path.join(OWID_CSV_PATH, OWID_FILENAME))
data = parse_types(data)
engine = create_engine(DB_URI)
data.to_sql('owid_covid', engine, index=False, if_exists='replace', method='multi', chunksize=10000)
logger.warning('OWID data inserted into database')
data.to_sql(
"owid_covid",
engine,
index=False,
if_exists="replace",
method="multi",
chunksize=10000,
)
logger.warning("OWID data inserted into database")
with engine.connect() as connection:
connection.execute('CREATE INDEX IF NOT EXISTS country_idx ON owid_covid (location);')
connection.execute('CREATE INDEX IF NOT EXISTS iso_idx ON owid_covid (iso_code);')
connection.execute('CREATE INDEX IF NOT EXISTS date_idx ON owid_covid (date);')
logger.warning('Database indices created on OWID table')
connection.execute(
"CREATE INDEX IF NOT EXISTS country_idx ON owid_covid (location);"
)
connection.execute(
"CREATE INDEX IF NOT EXISTS iso_idx ON owid_covid (iso_code);"
)
connection.execute(
"CREATE INDEX IF NOT EXISTS date_idx ON owid_covid (date);"
)
logger.warning("Database indices created on OWID table")
except Exception as e:
logger.error(f"Could not update OWID table\n{e}")
raise(e)
raise (e)
finally:
if remote:
proc.kill()
32 changes: 18 additions & 14 deletions docker/airflow/dags/colombia_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,41 @@

DATA_PATH = "/opt/EpiGraphHub/data_collection"
import sys

sys.path.insert(0, DATA_PATH)
xmnlab marked this conversation as resolved.
Show resolved Hide resolved
from colombia import load_chunks_into_db
from config import COLOMBIA_SOC


default_args = {
'owner': 'epigraphhub',
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 8, 26),
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2022, 8, 26),
#'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}

@dag(schedule_interval='@daily',
default_args=default_args,
catchup=False,
template_searchpath=DATA_PATH
)

@dag(
schedule_interval="@daily",
default_args=default_args,
catchup=False,
template_searchpath=DATA_PATH,
)
def colombia():

start = EmptyOperator(
task_id='start',
task_id="start",
)

@task(task_id='load_into_db', retries=2)
@task(task_id="load_into_db", retries=2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this link: https://dev.socrata.com/foundry/www.datos.gov.co/gt2j-8ykr/ , it's available the last updated date of this dataset. Unfortunately, I don't find a way to get it directly from the API. But, maybe we can create a web scraping to get it and use it as a way to decide if the code to update data should run or not. I can do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luabida, a final comment: Following issue #129, you could create a task that, after updating the table, will add (or update) the date of update of this table in the meta table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for this first review, I'll patching the changes in this PR so we can track the TODOs for the Airflow.

def load_chunks_in_db(client=COLOMBIA_SOC):
load_chunks_into_db.gen_chunks_into_db(client)

start >> load_chunks_in_db()


dag = colombia()
Loading