From c0cee3763f368d11b1d41ea0dba8eb41ed7aefd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 9 Oct 2023 11:17:41 -0300 Subject: [PATCH] chore(sinan DAGS): create DAG to fetch dengue data from SINAN --- containers/airflow/Dockerfile | 8 +- .../airflow/dags/brasil/sinan/chikungunya.py | 0 .../airflow/dags/brasil/sinan/dengue.py | 113 ++++++++++++++++++ containers/airflow/dags/brasil/sinan/zika.py | 0 containers/airflow/env.tpl | 20 ++++ 5 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 containers/airflow/dags/brasil/sinan/chikungunya.py create mode 100644 containers/airflow/dags/brasil/sinan/dengue.py create mode 100644 containers/airflow/dags/brasil/sinan/zika.py create mode 100644 containers/airflow/env.tpl diff --git a/containers/airflow/Dockerfile b/containers/airflow/Dockerfile index 96068ff4..a69072a1 100644 --- a/containers/airflow/Dockerfile +++ b/containers/airflow/Dockerfile @@ -94,7 +94,13 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin && source /opt/envs/py311/bin/activate \ && pip install "cython<3.0.0" \ && pip install --no-build-isolation "pyyaml<6.0" \ - && pip install -r /opt/envs/pysus.txt + && pip install \ + psycopg2-binary \ + "apache-airflow>=2.7.1" \ + apache-airflow-providers-celery \ + redis \ + "dill>=0.3.7" \ + -r /opt/envs/pysus.txt WORKDIR ${AIRFLOW_HOME} diff --git a/containers/airflow/dags/brasil/sinan/chikungunya.py b/containers/airflow/dags/brasil/sinan/chikungunya.py new file mode 100644 index 00000000..e69de29b diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py new file mode 100644 index 00000000..573176f5 --- /dev/null +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -0,0 +1,113 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 1), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_DENG', + tags=['SINAN', 'Brasil', 'Dengue'], + schedule='@monthly', + default_args=default_args, + catchup=False, +) as dag: + from airflow.models import Variable + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='first', + python='/opt/py311/bin/python3.11', + expect_airflow=True + ) + def update_dengue(egh_conn: dict): + from pysus.online_data import parquets_to_dataframe + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "DENG" + tablename = "sinan_dengue_m" + files = sinan.get_files(dis_code=disease) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = False' + ) + count = cur.fetchone() + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + count = cur.fetchone() + + if count: + # Update prelim to final + cur = conn.execute( + f'DELETE FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + + file = sinan.download(sinan.get_files(dis_code, year)) + + df = parquets_to_dataframe(file.path) + df['year'] = year + df['prelim'] = False + df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False + ) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute( + f'DELETE FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + + file = sinan.download(sinan.get_files(dis_code, year)) + + df = parquets_to_dataframe(file.path) + df['year'] = year + df['prelim'] = True + df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False + ) + + update_dengue(CONN) diff --git a/containers/airflow/dags/brasil/sinan/zika.py b/containers/airflow/dags/brasil/sinan/zika.py new file mode 100644 index 00000000..e69de29b diff --git a/containers/airflow/env.tpl b/containers/airflow/env.tpl new file mode 100644 index 00000000..9b4705a8 --- /dev/null +++ b/containers/airflow/env.tpl @@ -0,0 +1,20 @@ +AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR} +AIRFLOW_UID=${AIRFLOW_UID} +AIRFLOW_PORT=${AIRFLOW_PORT} +_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME} +_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD} + +AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY} + +AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST} +AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER} +AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD} +AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587} +AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM} + +POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB} +POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST} +POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT} +POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER} +POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD} +AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}'