From c0cee3763f368d11b1d41ea0dba8eb41ed7aefd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 9 Oct 2023 11:17:41 -0300 Subject: [PATCH 01/12] chore(sinan DAGS): create DAG to fetch dengue data from SINAN --- containers/airflow/Dockerfile | 8 +- .../airflow/dags/brasil/sinan/chikungunya.py | 0 .../airflow/dags/brasil/sinan/dengue.py | 113 ++++++++++++++++++ containers/airflow/dags/brasil/sinan/zika.py | 0 containers/airflow/env.tpl | 20 ++++ 5 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 containers/airflow/dags/brasil/sinan/chikungunya.py create mode 100644 containers/airflow/dags/brasil/sinan/dengue.py create mode 100644 containers/airflow/dags/brasil/sinan/zika.py create mode 100644 containers/airflow/env.tpl diff --git a/containers/airflow/Dockerfile b/containers/airflow/Dockerfile index 96068ff4..a69072a1 100644 --- a/containers/airflow/Dockerfile +++ b/containers/airflow/Dockerfile @@ -94,7 +94,13 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin && source /opt/envs/py311/bin/activate \ && pip install "cython<3.0.0" \ && pip install --no-build-isolation "pyyaml<6.0" \ - && pip install -r /opt/envs/pysus.txt + && pip install \ + psycopg2-binary \ + "apache-airflow>=2.7.1" \ + apache-airflow-providers-celery \ + redis \ + "dill>=0.3.7" \ + -r /opt/envs/pysus.txt WORKDIR ${AIRFLOW_HOME} diff --git a/containers/airflow/dags/brasil/sinan/chikungunya.py b/containers/airflow/dags/brasil/sinan/chikungunya.py new file mode 100644 index 00000000..e69de29b diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py new file mode 100644 index 00000000..573176f5 --- /dev/null +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -0,0 +1,113 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 1), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_DENG', + tags=['SINAN', 'Brasil', 'Dengue'], + schedule='@monthly', + default_args=default_args, + catchup=False, +) as dag: + from airflow.models import Variable + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='first', + python='/opt/py311/bin/python3.11', + expect_airflow=True + ) + def update_dengue(egh_conn: dict): + from pysus.online_data import parquets_to_dataframe + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "DENG" + tablename = "sinan_dengue_m" + files = sinan.get_files(dis_code=disease) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = False' + ) + count = cur.fetchone() + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + count = cur.fetchone() + + if count: + # Update prelim to final + cur = conn.execute( + f'DELETE FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + + file = sinan.download(sinan.get_files(dis_code, year)) + + df = parquets_to_dataframe(file.path) + df['year'] = year + df['prelim'] = False + df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False + ) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute( + f'DELETE FROM brasil.{tablename}' + f' WHERE year = {year} AND prelim = True' + ) + + file = sinan.download(sinan.get_files(dis_code, year)) + + df = parquets_to_dataframe(file.path) + df['year'] = year + df['prelim'] = True + df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False + ) + + update_dengue(CONN) diff --git a/containers/airflow/dags/brasil/sinan/zika.py b/containers/airflow/dags/brasil/sinan/zika.py new file mode 100644 index 00000000..e69de29b diff --git a/containers/airflow/env.tpl b/containers/airflow/env.tpl new file mode 100644 index 00000000..9b4705a8 --- /dev/null +++ b/containers/airflow/env.tpl @@ -0,0 +1,20 @@ +AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR} +AIRFLOW_UID=${AIRFLOW_UID} +AIRFLOW_PORT=${AIRFLOW_PORT} +_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME} +_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD} + +AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY} + +AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST} +AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER} +AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD} +AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587} +AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM} + +POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB} +POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST} +POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT} +POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER} +POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD} +AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}' From e940b7adf6897b99736a799691e2c53bf492ef65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 9 Oct 2023 13:38:14 -0300 Subject: [PATCH 02/12] Include EGH_CONN var to image --- containers/airflow/Dockerfile | 4 ---- containers/airflow/dags/brasil/sinan/dengue.py | 10 +++++++--- containers/airflow/envs/pysus.txt | 1 + containers/compose-airflow.yaml | 1 + 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/containers/airflow/Dockerfile b/containers/airflow/Dockerfile index a69072a1..34cfc489 100644 --- a/containers/airflow/Dockerfile +++ b/containers/airflow/Dockerfile @@ -96,10 +96,6 @@ RUN /usr/local/bin/python -m virtualenv /opt/envs/py311 --python="/opt/py311/bin && pip install --no-build-isolation "pyyaml<6.0" \ && pip install \ psycopg2-binary \ - "apache-airflow>=2.7.1" \ - apache-airflow-providers-celery \ - redis \ - "dill>=0.3.7" \ -r /opt/envs/pysus.txt WORKDIR ${AIRFLOW_HOME} diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 573176f5..c020c388 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -29,17 +29,19 @@ @task.external_python( task_id='first', - python='/opt/py311/bin/python3.11', - expect_airflow=True + python='/opt/py311/bin/python3.11' ) def update_dengue(egh_conn: dict): + import logging + + from sqlalchemy import create_engine from pysus.online_data import parquets_to_dataframe from pysus.ftp.databases.sinan import SINAN sinan = SINAN().load() dis_code = "DENG" tablename = "sinan_dengue_m" - files = sinan.get_files(dis_code=disease) + files = sinan.get_files(dis_code=dis_code) f_stage = {} for file in files: @@ -60,6 +62,8 @@ def update_dengue(egh_conn: dict): ) count = cur.fetchone() + logging.info(f"Final year {year}: {count}") + if not count: # Check on prelims with create_engine(egh_conn['URI']).connect() as conn: diff --git a/containers/airflow/envs/pysus.txt b/containers/airflow/envs/pysus.txt index 79e71e0b..3508e372 100644 --- a/containers/airflow/envs/pysus.txt +++ b/containers/airflow/envs/pysus.txt @@ -1 +1,2 @@ pysus >= 0.10.2 +SQLAlchemy >= 2.0.21 diff --git a/containers/compose-airflow.yaml b/containers/compose-airflow.yaml index 7f7fbd5d..9fc3f9fd 100644 --- a/containers/compose-airflow.yaml +++ b/containers/compose-airflow.yaml @@ -16,6 +16,7 @@ x-airflow-common: AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY} AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + AIRFLOW_VAR_EGH_CONN: ${AIRFLOW_VAR_EGH_CONN} volumes: - ${AIRFLOW_PROJ_DIR}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR}/logs:/opt/airflow/logs From 97fb3675368aade43b6df1e8968434d3a6d4dcc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 9 Oct 2023 14:03:21 -0300 Subject: [PATCH 03/12] fix sql statements --- .../airflow/dags/brasil/sinan/dengue.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index c020c388..dc345716 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -34,7 +34,7 @@ def update_dengue(egh_conn: dict): import logging - from sqlalchemy import create_engine + from sqlalchemy import create_engine, text from pysus.online_data import parquets_to_dataframe from pysus.ftp.databases.sinan import SINAN @@ -56,29 +56,29 @@ def update_dengue(egh_conn: dict): for year in f_stage['final']: # Check if final is already in DB with create_engine(egh_conn['URI']).connect() as conn: - cur = conn.execute( + cur = conn.execute(text( f'SELECT COUNT(*) FROM brasil.{tablename}' - f' WHERE year = {year} AND prelim = False' - ) - count = cur.fetchone() + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] logging.info(f"Final year {year}: {count}") if not count: # Check on prelims with create_engine(egh_conn['URI']).connect() as conn: - cur = conn.execute( + cur = conn.execute(text( f'SELECT COUNT(*) FROM brasil.{tablename}' - f' WHERE year = {year} AND prelim = True' - ) - count = cur.fetchone() + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] if count: # Update prelim to final - cur = conn.execute( + cur = conn.execute(text( f'DELETE FROM brasil.{tablename}' - f' WHERE year = {year} AND prelim = True' - ) + f" WHERE year = '{year}' AND prelim = True" + )) file = sinan.download(sinan.get_files(dis_code, year)) @@ -96,10 +96,10 @@ def update_dengue(egh_conn: dict): for year in f_stage['prelim']: with create_engine(egh_conn['URI']).connect() as conn: # Update prelim - cur = conn.execute( + cur = conn.execute(text( f'DELETE FROM brasil.{tablename}' - f' WHERE year = {year} AND prelim = True' - ) + f" WHERE year = '{year}' AND prelim = True" + )) file = sinan.download(sinan.get_files(dis_code, year)) @@ -108,7 +108,7 @@ def update_dengue(egh_conn: dict): df['prelim'] = True df.to_sql( name=tablename, - con=engine.connect(), + con=create_engine(egh_conn['URI']), schema=schema, if_exists='append', index=False From 73f2fec0327148c97a9c2a926201f23eb63f85bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Tue, 10 Oct 2023 10:07:45 -0300 Subject: [PATCH 04/12] finish SINAN_DENG DAG --- containers/airflow/dags/brasil/sinan/dengue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index dc345716..802f7d06 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -83,12 +83,13 @@ def update_dengue(egh_conn: dict): file = sinan.download(sinan.get_files(dis_code, year)) df = parquets_to_dataframe(file.path) + df.columns = df.columns.str.lower() df['year'] = year df['prelim'] = False df.to_sql( name=tablename, - con=engine.connect(), - schema=schema, + con=create_engine(egh_conn['URI']), + schema="brasil", if_exists='append', index=False ) @@ -104,12 +105,13 @@ def update_dengue(egh_conn: dict): file = sinan.download(sinan.get_files(dis_code, year)) df = parquets_to_dataframe(file.path) + df.columns = df.columns.str.lower() df['year'] = year df['prelim'] = True df.to_sql( name=tablename, con=create_engine(egh_conn['URI']), - schema=schema, + schema="brasil", if_exists='append', index=False ) From 08e498b4b7d7097111673a38f426e5a96e7288aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Tue, 10 Oct 2023 11:13:26 -0300 Subject: [PATCH 05/12] Use parquets chunks, preventing the RAM to get fulfilled --- .../airflow/dags/brasil/sinan/dengue.py | 72 ++++++++++++------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 802f7d06..74fe67d3 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -3,6 +3,7 @@ from datetime import timedelta from airflow import DAG from airflow.decorators import task +from airflow.models import Variable default_args = { @@ -23,16 +24,21 @@ default_args=default_args, catchup=False, ) as dag: - from airflow.models import Variable CONN = Variable.get('egh_conn', deserialize_json=True) @task.external_python( - task_id='first', + task_id='update_dengue', python='/opt/py311/bin/python3.11' ) def update_dengue(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all + """ + import os import logging + import pandas as pd from sqlalchemy import create_engine, text from pysus.online_data import parquets_to_dataframe @@ -80,12 +86,42 @@ def update_dengue(egh_conn: dict): f" WHERE year = '{year}' AND prelim = True" )) - file = sinan.download(sinan.get_files(dis_code, year)) + parquets = sinan.download(sinan.get_files(dis_code, year)) + + for parquet in os.listdir(parquets.path): + file = os.path.join(parquets.path, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + df.columns = df.columns.str.lower() + df['year'] = year + df['prelim'] = False + df.to_sql( + name=tablename, + con=create_engine(egh_conn['URI']), + schema="brasil", + if_exists='append', + index=False + ) + del df + os.remove(file) + logging.debug(f"{file} inserted into db") + os.rmdir(parquets.path) - df = parquets_to_dataframe(file.path) + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + + for parquet in os.listdir(parquets.path): + file = os.path.join(parquets.path, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') df.columns = df.columns.str.lower() df['year'] = year - df['prelim'] = False + df['prelim'] = True df.to_sql( name=tablename, con=create_engine(egh_conn['URI']), @@ -93,27 +129,9 @@ def update_dengue(egh_conn: dict): if_exists='append', index=False ) - - for year in f_stage['prelim']: - with create_engine(egh_conn['URI']).connect() as conn: - # Update prelim - cur = conn.execute(text( - f'DELETE FROM brasil.{tablename}' - f" WHERE year = '{year}' AND prelim = True" - )) - - file = sinan.download(sinan.get_files(dis_code, year)) - - df = parquets_to_dataframe(file.path) - df.columns = df.columns.str.lower() - df['year'] = year - df['prelim'] = True - df.to_sql( - name=tablename, - con=create_engine(egh_conn['URI']), - schema="brasil", - if_exists='append', - index=False - ) + del df + os.remove(file) + logging.debug(f"{file} inserted into db") + os.rmdir(parquets.path) update_dengue(CONN) From caebfb6a2052139f61d1daaf332f26b500dac05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 11 Oct 2023 10:04:56 -0300 Subject: [PATCH 06/12] Handle UndefinedColumn error & add column --- .../airflow/dags/brasil/sinan/dengue.py | 88 ++++++++++++------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 74fe67d3..31d5f87c 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -41,6 +41,7 @@ def update_dengue(egh_conn: dict): import pandas as pd from sqlalchemy import create_engine, text + from sqlalchemy.exc import ProgrammingError from pysus.online_data import parquets_to_dataframe from pysus.ftp.databases.sinan import SINAN @@ -49,6 +50,31 @@ def update_dengue(egh_conn: dict): tablename = "sinan_dengue_m" files = sinan.get_files(dis_code=dis_code) + + def insert_parquets(parquet_dir: str, year: int): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion. + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + df.columns = df.columns.str.lower() + df['year'] = year + df['prelim'] = False + df.to_sql( + name=tablename, + con=create_engine(egh_conn['URI']), + schema="brasil", + if_exists='append', + index=False + ) + del df + os.remove(file) + logging.debug(f"{file} inserted into db") + os.rmdir(parquets.path) + + f_stage = {} for file in files: code, year = sinan.format(file) @@ -88,22 +114,21 @@ def update_dengue(egh_conn: dict): parquets = sinan.download(sinan.get_files(dis_code, year)) - for parquet in os.listdir(parquets.path): - file = os.path.join(parquets.path, parquet) - df = pd.read_parquet(str(file), engine='fastparquet') - df.columns = df.columns.str.lower() - df['year'] = year - df['prelim'] = False - df.to_sql( - name=tablename, - con=create_engine(egh_conn['URI']), - schema="brasil", - if_exists='append', - index=False - ) - del df - os.remove(file) - logging.debug(f"{file} inserted into db") + try: + insert_parquets(parquets.path, year) + except ProgrammingError as error: + if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): + # Include new columns to table + column_name = str(error).split('"')[1] + with create_engine(egh_conn['URI']).connect() as conn: + conn.execute(text( + f'ALTER TABLE brasil.{tablename}' + f' ADD COLUMN {column_name} TEXT' + )) + conn.commit() + logging.warning(f"Column {column_name} added into {tablename}") + insert_parquets(parquets.path, year) + os.rmdir(parquets.path) for year in f_stage['prelim']: @@ -116,22 +141,21 @@ def update_dengue(egh_conn: dict): parquets = sinan.download(sinan.get_files(dis_code, year)) - for parquet in os.listdir(parquets.path): - file = os.path.join(parquets.path, parquet) - df = pd.read_parquet(str(file), engine='fastparquet') - df.columns = df.columns.str.lower() - df['year'] = year - df['prelim'] = True - df.to_sql( - name=tablename, - con=create_engine(egh_conn['URI']), - schema="brasil", - if_exists='append', - index=False - ) - del df - os.remove(file) - logging.debug(f"{file} inserted into db") + try: + insert_parquets(parquets.path, year) + except ProgrammingError as error: + if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): + # Include new columns to table + column_name = str(error).split('"')[1] + with create_engine(egh_conn['URI']).connect() as conn: + conn.execute(text( + f'ALTER TABLE brasil.{tablename}' + f' ADD COLUMN {column_name} TEXT' + )) + conn.commit() + logging.warning(f"Column {column_name} added into {tablename}") + insert_parquets(parquets.path, year) + os.rmdir(parquets.path) update_dengue(CONN) From f743f5c2ff40395a4db4bd4d448bcdd67480bc3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 11 Oct 2023 10:24:06 -0300 Subject: [PATCH 07/12] Use recursion to handle to_sql --- .../airflow/dags/brasil/sinan/dengue.py | 79 ++++++++----------- 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 31d5f87c..8496e293 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -34,7 +34,8 @@ def update_dengue(egh_conn: dict): """ This task will run in an isolated python environment, containing PySUS - package. The task will fetch for all + package. The task will fetch for all Dengue years from DATASUS and insert + them into EGH database """ import os import logging @@ -50,11 +51,37 @@ def update_dengue(egh_conn: dict): tablename = "sinan_dengue_m" files = sinan.get_files(dis_code=dis_code) + + def recursive_to_sql(df: pd.DataFrame, engine): + """ + Try inserting a dataframe into db, handle error recursively if + a column is missing + """ + try: + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + except ProgrammingError as error: + if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): + column_name = str(error).split('"')[1] + with create_engine(egh_conn['URI']).connect() as conn: + conn.execute(text( + f'ALTER TABLE brasil.{tablename}' + f' ADD COLUMN {column_name} TEXT' + )) + conn.commit() + logging.warning(f"Column {column_name} added into {tablename}") + recursive_to_sql(df, engine) + def insert_parquets(parquet_dir: str, year: int): """ Insert parquet dir into database using its chunks. Delete the chunk - and the directory after insertion. + and the directory after insertion """ for parquet in os.listdir(parquet_dir): file = os.path.join(parquet_dir, parquet) @@ -62,16 +89,12 @@ def insert_parquets(parquet_dir: str, year: int): df.columns = df.columns.str.lower() df['year'] = year df['prelim'] = False - df.to_sql( - name=tablename, - con=create_engine(egh_conn['URI']), - schema="brasil", - if_exists='append', - index=False - ) + + recursive_to_sql(df, create_engine(egh_conn['URI'])) + logging.debug(f"{file} inserted into db") + del df os.remove(file) - logging.debug(f"{file} inserted into db") os.rmdir(parquets.path) @@ -113,23 +136,7 @@ def insert_parquets(parquet_dir: str, year: int): )) parquets = sinan.download(sinan.get_files(dis_code, year)) - - try: - insert_parquets(parquets.path, year) - except ProgrammingError as error: - if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): - # Include new columns to table - column_name = str(error).split('"')[1] - with create_engine(egh_conn['URI']).connect() as conn: - conn.execute(text( - f'ALTER TABLE brasil.{tablename}' - f' ADD COLUMN {column_name} TEXT' - )) - conn.commit() - logging.warning(f"Column {column_name} added into {tablename}") - insert_parquets(parquets.path, year) - - os.rmdir(parquets.path) + insert_parquets(parquets.path, year) for year in f_stage['prelim']: with create_engine(egh_conn['URI']).connect() as conn: @@ -140,22 +147,6 @@ def insert_parquets(parquet_dir: str, year: int): )) parquets = sinan.download(sinan.get_files(dis_code, year)) - - try: - insert_parquets(parquets.path, year) - except ProgrammingError as error: - if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): - # Include new columns to table - column_name = str(error).split('"')[1] - with create_engine(egh_conn['URI']).connect() as conn: - conn.execute(text( - f'ALTER TABLE brasil.{tablename}' - f' ADD COLUMN {column_name} TEXT' - )) - conn.commit() - logging.warning(f"Column {column_name} added into {tablename}") - insert_parquets(parquets.path, year) - - os.rmdir(parquets.path) + insert_parquets(parquets.path, year) update_dengue(CONN) From 8ffb9044455c34d00644fa543eb91d9cf394806e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 11 Oct 2023 11:44:52 -0300 Subject: [PATCH 08/12] Add columns to table before inserting the dataframe --- .../airflow/dags/brasil/sinan/dengue.py | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 8496e293..876a70cb 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -52,31 +52,38 @@ def update_dengue(egh_conn: dict): files = sinan.get_files(dis_code=dis_code) - def recursive_to_sql(df: pd.DataFrame, engine): + def to_sql_include_cols(df: pd.DataFrame, engine): """ - Try inserting a dataframe into db, handle error recursively if - a column is missing + Insert dataframe into db, include missing columns if needed """ - try: - df.to_sql( - name=tablename, - con=engine, - schema="brasil", - if_exists='append', - index=False - ) - except ProgrammingError as error: - if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): - column_name = str(error).split('"')[1] - with create_engine(egh_conn['URI']).connect() as conn: - conn.execute(text( - f'ALTER TABLE brasil.{tablename}' - f' ADD COLUMN {column_name} TEXT' - )) - conn.commit() - logging.warning(f"Column {column_name} added into {tablename}") - recursive_to_sql(df, engine) - + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute(text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE {tablename}"] + for column in columns_to_add: + sql_statements.append(f"ADD COLUMN {column} TEXT,") # object + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) def insert_parquets(parquet_dir: str, year: int): """ @@ -86,11 +93,10 @@ def insert_parquets(parquet_dir: str, year: int): for parquet in os.listdir(parquet_dir): file = os.path.join(parquet_dir, parquet) df = pd.read_parquet(str(file), engine='fastparquet') - df.columns = df.columns.str.lower() df['year'] = year df['prelim'] = False - recursive_to_sql(df, create_engine(egh_conn['URI'])) + to_sql_include_cols(df, create_engine(egh_conn['URI'])) logging.debug(f"{file} inserted into db") del df From 7d276b512e2258abc99fc2e85597d67f938601a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 11 Oct 2023 14:45:21 -0300 Subject: [PATCH 09/12] Parse all columns to TEXT before inserting to db --- .../airflow/dags/brasil/sinan/dengue.py | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 876a70cb..71feb8c7 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -42,8 +42,6 @@ def update_dengue(egh_conn: dict): import pandas as pd from sqlalchemy import create_engine, text - from sqlalchemy.exc import ProgrammingError - from pysus.online_data import parquets_to_dataframe from pysus.ftp.databases.sinan import SINAN sinan = SINAN().load() @@ -52,7 +50,7 @@ def update_dengue(egh_conn: dict): files = sinan.get_files(dis_code=dis_code) - def to_sql_include_cols(df: pd.DataFrame, engine): + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): """ Insert dataframe into db, include missing columns if needed """ @@ -77,6 +75,28 @@ def to_sql_include_cols(df: pd.DataFrame, engine): conn.execute(text(sql)) conn.commit() + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime('%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + except ValueError as error: + logging.error(f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + df.to_sql( name=tablename, con=engine, @@ -85,7 +105,7 @@ def to_sql_include_cols(df: pd.DataFrame, engine): index=False ) - def insert_parquets(parquet_dir: str, year: int): + def insert_parquets(parquet_dir: str, year: int, prelim: bool): """ Insert parquet dir into database using its chunks. Delete the chunk and the directory after insertion @@ -93,10 +113,8 @@ def insert_parquets(parquet_dir: str, year: int): for parquet in os.listdir(parquet_dir): file = os.path.join(parquet_dir, parquet) df = pd.read_parquet(str(file), engine='fastparquet') - df['year'] = year - df['prelim'] = False - to_sql_include_cols(df, create_engine(egh_conn['URI'])) + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) logging.debug(f"{file} inserted into db") del df @@ -142,7 +160,7 @@ def insert_parquets(parquet_dir: str, year: int): )) parquets = sinan.download(sinan.get_files(dis_code, year)) - insert_parquets(parquets.path, year) + insert_parquets(parquets.path, year, False) for year in f_stage['prelim']: with create_engine(egh_conn['URI']).connect() as conn: @@ -153,6 +171,6 @@ def insert_parquets(parquet_dir: str, year: int): )) parquets = sinan.download(sinan.get_files(dis_code, year)) - insert_parquets(parquets.path, year) + insert_parquets(parquets.path, year, True) update_dengue(CONN) From d7435c2eddc33356126f9216d65f39cfc20d08a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 11 Oct 2023 15:02:18 -0300 Subject: [PATCH 10/12] minor fixes --- containers/airflow/dags/brasil/sinan/dengue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index 71feb8c7..ad380c6e 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -65,10 +65,12 @@ def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): columns_to_add = df_columns.difference(sql_columns) if columns_to_add: - sql_statements = [f"ALTER TABLE {tablename}"] + sql_statements = [f"ALTER TABLE brasil.{tablename}"] for column in columns_to_add: sql_statements.append(f"ADD COLUMN {column} TEXT,") # object + sql_statements[-1] = sql_statements[-1].replace(',', ';') + with create_engine(egh_conn['URI']).connect() as conn: sql = ' '.join(sql_statements) logging.warning(f"EXECUTING: {sql}") @@ -81,7 +83,7 @@ def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): df[col] = pd.to_datetime(df[col]).dt.strftime('%d%m%Y').astype('object') dtype = 'object' logging.warning( - f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" ) except ValueError as error: logging.error(f'Could not format date column correctly: {error}') From afc99a1383b794d43129c109966e9e2f91c899d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 13 Oct 2023 09:57:25 -0300 Subject: [PATCH 11/12] Include SINAN_ZIKA DAG --- .../airflow/dags/brasil/sinan/dengue.py | 2 +- containers/airflow/dags/brasil/sinan/zika.py | 180 ++++++++++++++++++ 2 files changed, 181 insertions(+), 1 deletion(-) diff --git a/containers/airflow/dags/brasil/sinan/dengue.py b/containers/airflow/dags/brasil/sinan/dengue.py index ad380c6e..97232e51 100644 --- a/containers/airflow/dags/brasil/sinan/dengue.py +++ b/containers/airflow/dags/brasil/sinan/dengue.py @@ -117,7 +117,7 @@ def insert_parquets(parquet_dir: str, year: int, prelim: bool): df = pd.read_parquet(str(file), engine='fastparquet') to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) - logging.debug(f"{file} inserted into db") + logging.warning(f"{len(df)} rows inserted into {tablename}") del df os.remove(file) diff --git a/containers/airflow/dags/brasil/sinan/zika.py b/containers/airflow/dags/brasil/sinan/zika.py index e69de29b..da6ab7c7 100644 --- a/containers/airflow/dags/brasil/sinan/zika.py +++ b/containers/airflow/dags/brasil/sinan/zika.py @@ -0,0 +1,180 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 2), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_ZIKA', + tags=['SINAN', 'Brasil', 'Zika'], + schedule='0 0 2 * *', + default_args=default_args, + catchup=False, +) as dag: + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='update_zika', + python='/opt/py311/bin/python3.11' + ) + def update_zika(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all Zika years from DATASUS and insert + them into EGH database + """ + import os + import logging + import pandas as pd + + from sqlalchemy import create_engine, text + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "ZIKA" + tablename = "sinan_zika_m" + files = sinan.get_files(dis_code=dis_code) + + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): + """ + Insert dataframe into db, include missing columns if needed + """ + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute( + text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE brasil.{tablename}"] + for column in columns_to_add: + sql_statements.append( + f"ADD COLUMN {column} TEXT,") # object + + sql_statements[-1] = sql_statements[-1].replace(',', ';') + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime( + '%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" + ) + except ValueError as error: + logging.error( + f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + + def insert_parquets(parquet_dir: str, year: int, prelim: bool): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) + logging.warning(f"{len(df)} rows inserted into {tablename}") + + del df + os.remove(file) + os.rmdir(parquets.path) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] + + logging.info(f"Final year {year}: {count}") + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] + + if count: + # Update prelim to final + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, False) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, True) + + update_zika(CONN) From c76c431efbac50092787a94a1a93f3acd81ef92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 13 Oct 2023 10:26:47 -0300 Subject: [PATCH 12/12] Include SINAN_CHIK DAG --- containers/airflow/Dockerfile | 9 +- .../airflow/dags/brasil/sinan/chikungunya.py | 180 ++++++++++++++++++ containers/compose-airflow.yaml | 6 +- 3 files changed, 183 insertions(+), 12 deletions(-) diff --git a/containers/airflow/Dockerfile b/containers/airflow/Dockerfile index 34cfc489..db1658cc 100644 --- a/containers/airflow/Dockerfile +++ b/containers/airflow/Dockerfile @@ -73,13 +73,8 @@ COPY --chown=airflow containers/airflow/envs/* /opt/envs/ USER airflow -ARG POSTGRES_EPIGRAPH_HOST -ARG POSTGRES_EPIGRAPH_PORT -ARG POSTGRES_EPIGRAPH_USER -ARG POSTGRES_EPIGRAPH_PASSWORD -ARG POSTGRES_EPIGRAPH_DB -ENV DB_USER "${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}" -ENV DB_URI "${DB_USER}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}" +ARG DB_URI +ENV DB_URI "${DB_URI}" RUN /usr/local/bin/python -m virtualenv /opt/envs/py310 --python="/opt/py310/bin/python3.10" \ && sed -i "s/include-system-site-packages = false/include-system-site-packages = true/" /opt/envs/py310/pyvenv.cfg \ diff --git a/containers/airflow/dags/brasil/sinan/chikungunya.py b/containers/airflow/dags/brasil/sinan/chikungunya.py index e69de29b..f4e037ce 100644 --- a/containers/airflow/dags/brasil/sinan/chikungunya.py +++ b/containers/airflow/dags/brasil/sinan/chikungunya.py @@ -0,0 +1,180 @@ +import pendulum + +from datetime import timedelta +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable + + +default_args = { + "owner": "epigraphhub", + "depends_on_past": False, + "start_date": pendulum.datetime(2023, 1, 3), + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=1), +} + +with DAG( + dag_id='SINAN_CHIK', + tags=['SINAN', 'Brasil', 'Chikungunya'], + schedule='0 0 3 * *', + default_args=default_args, + catchup=False, +) as dag: + + CONN = Variable.get('egh_conn', deserialize_json=True) + + @task.external_python( + task_id='update_chik', + python='/opt/py311/bin/python3.11' + ) + def update_chik(egh_conn: dict): + """ + This task will run in an isolated python environment, containing PySUS + package. The task will fetch for all Chikungunya years from DATASUS and + insert them into EGH database + """ + import os + import logging + import pandas as pd + + from sqlalchemy import create_engine, text + from pysus.ftp.databases.sinan import SINAN + + sinan = SINAN().load() + dis_code = "CHIK" + tablename = "sinan_chikungunya_m" + files = sinan.get_files(dis_code=dis_code) + + def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine): + """ + Insert dataframe into db, include missing columns if needed + """ + df.columns = df.columns.str.lower() + + with create_engine(egh_conn['URI']).connect() as conn: + # Get columns + res = conn.execute( + text(f'SELECT * FROM brasil.{tablename} LIMIT 1')) + sql_columns = set(i[0] for i in res.cursor.description) + + df_columns = set(df.columns) + columns_to_add = df_columns.difference(sql_columns) + + if columns_to_add: + sql_statements = [f"ALTER TABLE brasil.{tablename}"] + for column in columns_to_add: + sql_statements.append( + f"ADD COLUMN {column} TEXT,") # object + + sql_statements[-1] = sql_statements[-1].replace(',', ';') + + with create_engine(egh_conn['URI']).connect() as conn: + sql = ' '.join(sql_statements) + logging.warning(f"EXECUTING: {sql}") + conn.execute(text(sql)) + conn.commit() + + for col, dtype in df.dtypes.items(): + if col in ['dt_notific', 'dt_sin_pri']: + try: + df[col] = pd.to_datetime(df[col]).dt.strftime( + '%d%m%Y').astype('object') + dtype = 'object' + logging.warning( + f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'" + ) + except ValueError as error: + logging.error( + f'Could not format date column correctly: {error}') + df[col] = df[col].astype('object') + dtype = 'object' + + if str(dtype) != 'object': + df[col] = df[col].astype('object') + logging.warning( + f"Column '{col}' of type '{dtype}' has been parsed to 'object'" + ) + + df['year'] = year + df['prelim'] = prelim + + df.to_sql( + name=tablename, + con=engine, + schema="brasil", + if_exists='append', + index=False + ) + + def insert_parquets(parquet_dir: str, year: int, prelim: bool): + """ + Insert parquet dir into database using its chunks. Delete the chunk + and the directory after insertion + """ + for parquet in os.listdir(parquet_dir): + file = os.path.join(parquet_dir, parquet) + df = pd.read_parquet(str(file), engine='fastparquet') + + to_sql_include_cols(df, prelim, create_engine(egh_conn['URI'])) + logging.warning(f"{len(df)} rows inserted into {tablename}") + + del df + os.remove(file) + os.rmdir(parquets.path) + + f_stage = {} + for file in files: + code, year = sinan.format(file) + stage = 'prelim' if 'PRELIM' in file.path else 'final' + + if not stage in f_stage: + f_stage[stage] = [year] + else: + f_stage[stage].append(year) + + for year in f_stage['final']: + # Check if final is already in DB + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = False" + )) + count = cur.fetchone()[0] + + logging.info(f"Final year {year}: {count}") + + if not count: + # Check on prelims + with create_engine(egh_conn['URI']).connect() as conn: + cur = conn.execute(text( + f'SELECT COUNT(*) FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + count = cur.fetchone()[0] + + if count: + # Update prelim to final + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, False) + + for year in f_stage['prelim']: + with create_engine(egh_conn['URI']).connect() as conn: + # Update prelim + cur = conn.execute(text( + f'DELETE FROM brasil.{tablename}' + f" WHERE year = '{year}' AND prelim = True" + )) + + parquets = sinan.download(sinan.get_files(dis_code, year)) + insert_parquets(parquets.path, year, True) + + update_chik(CONN) diff --git a/containers/compose-airflow.yaml b/containers/compose-airflow.yaml index 9fc3f9fd..428d8874 100644 --- a/containers/compose-airflow.yaml +++ b/containers/compose-airflow.yaml @@ -5,11 +5,7 @@ x-airflow-common: context: .. dockerfile: containers/airflow/Dockerfile args: - POSTGRES_EPIGRAPH_HOST: ${POSTGRES_EPIGRAPH_HOST} - POSTGRES_EPIGRAPH_PORT: ${POSTGRES_EPIGRAPH_PORT} - POSTGRES_EPIGRAPH_USER: ${POSTGRES_EPIGRAPH_USER} - POSTGRES_EPIGRAPH_PASSWORD: ${POSTGRES_EPIGRAPH_PASSWORD} - POSTGRES_EPIGRAPH_DB: ${POSTGRES_EPIGRAPH_DB} + DB_URI: ${DB_URI} environment: &airflow-common-env AIRFLOW_HOME: /opt/airflow