Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinan DAGS): create DAG to fetch dengue data from SINAN #201

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion containers/airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin
&& source /opt/envs/py311/bin/activate \
&& pip install "cython<3.0.0" \
&& pip install --no-build-isolation "pyyaml<6.0" \
&& pip install -r /opt/envs/pysus.txt
&& pip install \
psycopg2-binary \
-r /opt/envs/pysus.txt

WORKDIR ${AIRFLOW_HOME}

Expand Down
Empty file.
161 changes: 161 additions & 0 deletions containers/airflow/dags/brasil/sinan/dengue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import pendulum

from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable


default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2023, 1, 1),
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=1),
}

with DAG(
dag_id='SINAN_DENG',
tags=['SINAN', 'Brasil', 'Dengue'],
schedule='@monthly',
default_args=default_args,
catchup=False,
) as dag:

CONN = Variable.get('egh_conn', deserialize_json=True)

@task.external_python(
task_id='update_dengue',
python='/opt/py311/bin/python3.11'
)
def update_dengue(egh_conn: dict):
"""
This task will run in an isolated python environment, containing PySUS
package. The task will fetch for all
"""
import os
import logging
import pandas as pd

from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError
from pysus.online_data import parquets_to_dataframe
from pysus.ftp.databases.sinan import SINAN

sinan = SINAN().load()
dis_code = "DENG"
tablename = "sinan_dengue_m"
files = sinan.get_files(dis_code=dis_code)


def insert_parquets(parquet_dir: str, year: int):
"""
Insert parquet dir into database using its chunks. Delete the chunk
and the directory after insertion.
"""
for parquet in os.listdir(parquet_dir):
file = os.path.join(parquet_dir, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')
df.columns = df.columns.str.lower()
df['year'] = year
df['prelim'] = False
df.to_sql(
name=tablename,
con=create_engine(egh_conn['URI']),
schema="brasil",
if_exists='append',
index=False
)
del df
os.remove(file)
logging.debug(f"{file} inserted into db")
os.rmdir(parquets.path)


f_stage = {}
for file in files:
code, year = sinan.format(file)
stage = 'prelim' if 'PRELIM' in file.path else 'final'

if not stage in f_stage:
f_stage[stage] = [year]
else:
f_stage[stage].append(year)

for year in f_stage['final']:
# Check if final is already in DB
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = False"
))
count = cur.fetchone()[0]

logging.info(f"Final year {year}: {count}")

if not count:
# Check on prelims
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(text(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))
count = cur.fetchone()[0]

if count:
# Update prelim to final
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))

try:
insert_parquets(parquets.path, year)
except ProgrammingError as error:
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
# Include new columns to table
column_name = str(error).split('"')[1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that obtaining the missing column name from the error message is not a good approach, because if psycopg2 changes the wording in their error messages it will break our code. I think we should instead look at the list of column names of the parquet files and compare them with the columns in the current schema. From the difference in these lists, which can be efficiently obtained as list(set(cols1)-set(cols2)), we can then create the alter table query adding the new columns to the database table. With this approach, we don't even need to rely on an exception being raised. This determination of the missing columns can be done before the first insert.

with create_engine(egh_conn['URI']).connect() as conn:
conn.execute(text(
f'ALTER TABLE brasil.{tablename}'
f' ADD COLUMN {column_name} TEXT'
))
conn.commit()
logging.warning(f"Column {column_name} added into {tablename}")
insert_parquets(parquets.path, year)

os.rmdir(parquets.path)

for year in f_stage['prelim']:
with create_engine(egh_conn['URI']).connect() as conn:
# Update prelim
cur = conn.execute(text(
f'DELETE FROM brasil.{tablename}'
f" WHERE year = '{year}' AND prelim = True"
))

parquets = sinan.download(sinan.get_files(dis_code, year))

try:
insert_parquets(parquets.path, year)
except ProgrammingError as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
# Include new columns to table
column_name = str(error).split('"')[1]
with create_engine(egh_conn['URI']).connect() as conn:
conn.execute(text(
f'ALTER TABLE brasil.{tablename}'
f' ADD COLUMN {column_name} TEXT'
))
conn.commit()
logging.warning(f"Column {column_name} added into {tablename}")
insert_parquets(parquets.path, year)

os.rmdir(parquets.path)

update_dengue(CONN)
Empty file.
20 changes: 20 additions & 0 deletions containers/airflow/env.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR}
AIRFLOW_UID=${AIRFLOW_UID}
AIRFLOW_PORT=${AIRFLOW_PORT}
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD}

AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY}

AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST}
AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER}
AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD}
AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587}
AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM}

POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB}
POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST}
POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT}
POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER}
POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD}
AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}'
1 change: 1 addition & 0 deletions containers/airflow/envs/pysus.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pysus >= 0.10.2
SQLAlchemy >= 2.0.21
1 change: 1 addition & 0 deletions containers/compose-airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ x-airflow-common:
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY}
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW_VAR_EGH_CONN: ${AIRFLOW_VAR_EGH_CONN}
volumes:
- ${AIRFLOW_PROJ_DIR}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR}/logs:/opt/airflow/logs
Expand Down