Skip to content

Commit

Permalink
chore(sinan DAGS): create DAG to fetch dengue data from SINAN
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Oct 9, 2023
1 parent 722af34 commit c0cee37
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 1 deletion.
8 changes: 7 additions & 1 deletion containers/airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin
&& source /opt/envs/py311/bin/activate \
&& pip install "cython<3.0.0" \
&& pip install --no-build-isolation "pyyaml<6.0" \
&& pip install -r /opt/envs/pysus.txt
&& pip install \
psycopg2-binary \
"apache-airflow>=2.7.1" \
apache-airflow-providers-celery \
redis \
"dill>=0.3.7" \
-r /opt/envs/pysus.txt

WORKDIR ${AIRFLOW_HOME}

Expand Down
Empty file.
113 changes: 113 additions & 0 deletions containers/airflow/dags/brasil/sinan/dengue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pendulum

from datetime import timedelta
from airflow import DAG
from airflow.decorators import task


default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2023, 1, 1),
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=1),
}

with DAG(
dag_id='SINAN_DENG',
tags=['SINAN', 'Brasil', 'Dengue'],
schedule='@monthly',
default_args=default_args,
catchup=False,
) as dag:
from airflow.models import Variable

CONN = Variable.get('egh_conn', deserialize_json=True)

@task.external_python(
task_id='first',
python='/opt/py311/bin/python3.11',
expect_airflow=True
)
def update_dengue(egh_conn: dict):
from pysus.online_data import parquets_to_dataframe
from pysus.ftp.databases.sinan import SINAN

sinan = SINAN().load()
dis_code = "DENG"
tablename = "sinan_dengue_m"
files = sinan.get_files(dis_code=disease)

f_stage = {}
for file in files:
code, year = sinan.format(file)
stage = 'prelim' if 'PRELIM' in file.path else 'final'

if not stage in f_stage:
f_stage[stage] = [year]
else:
f_stage[stage].append(year)

for year in f_stage['final']:
# Check if final is already in DB
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f' WHERE year = {year} AND prelim = False'
)
count = cur.fetchone()

if not count:
# Check on prelims
with create_engine(egh_conn['URI']).connect() as conn:
cur = conn.execute(
f'SELECT COUNT(*) FROM brasil.{tablename}'
f' WHERE year = {year} AND prelim = True'
)
count = cur.fetchone()

if count:
# Update prelim to final
cur = conn.execute(
f'DELETE FROM brasil.{tablename}'
f' WHERE year = {year} AND prelim = True'
)

file = sinan.download(sinan.get_files(dis_code, year))

df = parquets_to_dataframe(file.path)
df['year'] = year
df['prelim'] = False
df.to_sql(
name=tablename,
con=engine.connect(),
schema=schema,
if_exists='append',
index=False
)

for year in f_stage['prelim']:
with create_engine(egh_conn['URI']).connect() as conn:
# Update prelim
cur = conn.execute(
f'DELETE FROM brasil.{tablename}'
f' WHERE year = {year} AND prelim = True'
)

file = sinan.download(sinan.get_files(dis_code, year))

df = parquets_to_dataframe(file.path)
df['year'] = year
df['prelim'] = True
df.to_sql(
name=tablename,
con=engine.connect(),
schema=schema,
if_exists='append',
index=False
)

update_dengue(CONN)
Empty file.
20 changes: 20 additions & 0 deletions containers/airflow/env.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR}
AIRFLOW_UID=${AIRFLOW_UID}
AIRFLOW_PORT=${AIRFLOW_PORT}
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD}

AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY}

AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST}
AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER}
AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD}
AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587}
AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM}

POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB}
POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST}
POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT}
POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER}
POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD}
AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}'

0 comments on commit c0cee37

Please sign in to comment.