Skip to content

Commit

Permalink
add sheets dump
Browse files Browse the repository at this point in the history
  • Loading branch information
ThiagoTrabach committed Oct 14, 2023
1 parent 753b76d commit 1551a68
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/rj_sms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
from pipelines.rj_sms.dump_api_prontuario_vitai.flows import *
from pipelines.rj_sms.dump_azureblob_estoque_tpc.flows import *
from pipelines.rj_sms.dump_ftp_cnes.flows import *
from pipelines.rj_sms.dump_sheets.flows import *
from pipelines.rj_sms.materialize_datalake.flows import *
13 changes: 13 additions & 0 deletions pipelines/rj_sms/dump_sheets/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for sheets dump.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump sheets flows
"""
DATASET_ID = "brutos_sheets"
38 changes: 38 additions & 0 deletions pipelines/rj_sms/dump_sheets/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for sheets dump.
"""

from copy import deepcopy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.constants import constants
from pipelines.utils.dump_url.flows import dump_url_flow
from pipelines.utils.utils import set_default_parameters
from pipelines.rj_sms.dump_sheets.constants import constants as sheets_constants
from pipelines.rj_sms.dump_sheets.schedules import every_sunday_at_six_am


unidade_saude_flow = deepcopy(dump_url_flow)
unidade_saude_flow.name = "SMS: Dump Unidades de Saude - Ingerir tabela auxiliar"
unidade_saude_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
unidade_saude_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_DEV_AGENT_LABEL.value,
],
)

unidade_saude_flow_parameters = {
"url": "https://docs.google.com/spreadsheets/d/1EkYfxuN2bWD_q4OhHL8hJvbmQKmQKFrk0KLf6D7nKS4/edit?usp=sharing", # noqa: E501
"url_type": "google_sheet",
"gsheets_sheet_name": "Sheet1",
"table_id": "unidade_saude_auxiliar",
"dataset_id": sheets_constants.DATASET_ID.value,
"dump_mode": "overwrite",
}

unidade_saude_flow = set_default_parameters(
unidade_saude_flow, default_parameters=unidade_saude_flow_parameters
)
unidade_saude_flow.schedule = every_sunday_at_six_am
23 changes: 23 additions & 0 deletions pipelines/rj_sms/dump_sheets/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Schedules for the sheets dump pipeline
"""

from datetime import timedelta
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants

every_sunday_at_six_am = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=7),
start_date=pendulum.datetime(2023, 10, 8, 6, 0, 0, tz="America/Sao_Paulo"),
labels=[
constants.RJ_SMS_DEV_AGENT_LABEL.value,
],
)
]
)

0 comments on commit 1551a68

Please sign in to comment.