diff --git a/pipelines/rj_sms/__init__.py b/pipelines/rj_sms/__init__.py index f83071671..5fc17a62b 100644 --- a/pipelines/rj_sms/__init__.py +++ b/pipelines/rj_sms/__init__.py @@ -8,4 +8,5 @@ from pipelines.rj_sms.dump_api_prontuario_vitai.flows import * from pipelines.rj_sms.dump_azureblob_estoque_tpc.flows import * from pipelines.rj_sms.dump_ftp_cnes.flows import * +from pipelines.rj_sms.dump_sheets.flows import * from pipelines.rj_sms.materialize_datalake.flows import * diff --git a/pipelines/rj_sms/dump_sheets/constants.py b/pipelines/rj_sms/dump_sheets/constants.py new file mode 100644 index 000000000..7ecc6ce2c --- /dev/null +++ b/pipelines/rj_sms/dump_sheets/constants.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Constants for sheets dump. +""" +from enum import Enum + + +class constants(Enum): + """ + Constant values for the dump sheets flows + """ + DATASET_ID = "brutos_sheets" diff --git a/pipelines/rj_sms/dump_sheets/flows.py b/pipelines/rj_sms/dump_sheets/flows.py new file mode 100644 index 000000000..dbcd1fc20 --- /dev/null +++ b/pipelines/rj_sms/dump_sheets/flows.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +""" +Database dumping flows for sheets dump. +""" + +from copy import deepcopy +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.constants import constants +from pipelines.utils.dump_url.flows import dump_url_flow +from pipelines.utils.utils import set_default_parameters +from pipelines.rj_sms.dump_sheets.constants import constants as sheets_constants +from pipelines.rj_sms.dump_sheets.schedules import every_sunday_at_six_am + + +unidade_saude_flow = deepcopy(dump_url_flow) +unidade_saude_flow.name = "SMS: Dump Unidades de Saude - Ingerir tabela auxiliar" +unidade_saude_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +unidade_saude_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], +) + +unidade_saude_flow_parameters = { + "url": "https://docs.google.com/spreadsheets/d/1EkYfxuN2bWD_q4OhHL8hJvbmQKmQKFrk0KLf6D7nKS4/edit?usp=sharing", # noqa: E501 + "url_type": "google_sheet", + "gsheets_sheet_name": "Sheet1", + "table_id": "unidade_saude_auxiliar", + "dataset_id": sheets_constants.DATASET_ID.value, + "dump_mode": "overwrite", +} + +unidade_saude_flow = set_default_parameters( + unidade_saude_flow, default_parameters=unidade_saude_flow_parameters +) +unidade_saude_flow.schedule = every_sunday_at_six_am diff --git a/pipelines/rj_sms/dump_sheets/schedules.py b/pipelines/rj_sms/dump_sheets/schedules.py new file mode 100644 index 000000000..6c09cba83 --- /dev/null +++ b/pipelines/rj_sms/dump_sheets/schedules.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Schedules for the sheets dump pipeline +""" + +from datetime import timedelta +import pendulum +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +every_sunday_at_six_am = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=7), + start_date=pendulum.datetime(2023, 10, 8, 6, 0, 0, tz="America/Sao_Paulo"), + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], + ) + ] +) \ No newline at end of file