Skip to content

Commit

Permalink
Merge pull request #580 from prefeitura-rio/staging/sms-estoque
Browse files Browse the repository at this point in the history
Staging/sms estoque
  • Loading branch information
ThiagoTrabach authored Nov 30, 2023
2 parents 6ac03ed + d158c9b commit b8dfb4e
Show file tree
Hide file tree
Showing 24 changed files with 1,685 additions and 677 deletions.
Binary file added .DS_Store
Binary file not shown.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ test/*.csv
setup.py
.vscode/*
*.hdf
*.DS_Store
.idea/*
*.DS_Store


# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -148,4 +149,5 @@ dmypy.json

# Backfill stuff
backfill.csv
REVIEW.md
REVIEW.md
.DS_Store
4 changes: 4 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,8 @@ class constants(Enum): # pylint: disable=c0103
"user_id": "222842688117014528",
"type": "user_nickname",
},
"danilo": {
"user_id": "1147152438487416873",
"type": "user_nickname",
},
}
Binary file not shown.
3 changes: 3 additions & 0 deletions pipelines/rj_sms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@

from pipelines.rj_sms.dump_db_sivep.flows import *
from pipelines.rj_sms.dump_ftp_cnes.flows import *
from pipelines.rj_sms.dump_azureblob_estoque_tpc.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitai.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitacare.flows import *
from pipelines.rj_sms.dump_sheets.flows import *
31 changes: 31 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for Vitacare.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump vitai flows
"""

VAULT_PATH = "prontuario_vitacare"
DATASET_ID = "brutos_prontuario_vitacare"
BASE_URL = {
"10": "http://consolidado-ap10.pepvitacare.com:8088",
"21": "http://consolidado-ap21.pepvitacare.com:8090",
"22": "http://consolidado-ap22.pepvitacare.com:8091",
"31": "http://consolidado-ap31.pepvitacare.com:8089",
"32": "http://consolidado-ap32.pepvitacare.com:8090",
"33": "http://consolidado-ap33.pepvitacare.com:8089",
"40": "http://consolidado-ap40.pepvitacare.com:8089",
"51": "http://consolidado-ap51.pepvitacare.com:8091",
"52": "http://consolidado-ap52.pepvitacare.com:8088",
"53": "http://consolidado-ap53.pepvitacare.com:8092",
}
ENDPOINT = {
"posicao": "/reports/pharmacy/stocks",
"movimento": "/reports/pharmacy/movements",
}
138 changes: 138 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Vitacare healthrecord dumping flows
"""

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import (
constants as vitacare_constants,
)
from pipelines.rj_sms.tasks import (
get_secret,
create_folders,
cloud_function_request,
create_partitions,
upload_to_datalake,
)
from pipelines.rj_sms.dump_api_prontuario_vitacare.tasks import (
rename_flow,
build_url,
build_params,
create_filename,
save_data_to_file,
)

from pipelines.rj_sms.dump_api_prontuario_vitacare.schedules import (
vitacare_daily_update_schedule,
)


with Flow(
name="SMS: Dump VitaCare - Ingerir dados do prontuário VitaCare",
code_owners=[
"thiago",
"andre",
"danilo",
],
) as dump_vitacare:
#####################################
# Parameters
#####################################

# Flow
RENAME_FLOW = Parameter("rename_flow", default=True)

# Vault
VAULT_PATH = vitacare_constants.VAULT_PATH.value

# Vitacare API
AP = Parameter("ap", required=True, default="10")
ENDPOINT = Parameter("endpoint", required=True)
DATE = Parameter("date", default="today")

# GCP
DATASET_ID = Parameter("dataset_id", default=vitacare_constants.DATASET_ID.value)
TABLE_ID = Parameter("table_id", required=True)

#####################################
# Rename flow run
####################################

with case(RENAME_FLOW, True):
rename_flow_task = rename_flow(table_id=TABLE_ID, ap=AP)

####################################
# Tasks section #1 - Get data
#####################################

get_secret_task = get_secret(secret_path=VAULT_PATH)

create_folders_task = create_folders()
create_folders_task.set_upstream(get_secret_task) # pylint: disable=E1101

build_url_task = build_url(ap=AP, endpoint=ENDPOINT)

build_params_task = build_params(date_param=DATE)
build_params_task.set_upstream(create_folders_task) # pylint: disable=E1101

file_name_task = create_filename(table_id=TABLE_ID, ap=AP)
file_name_task.set_upstream(build_params_task)

download_task = cloud_function_request(
url=build_url_task,
credential=get_secret_task,
request_type="GET",
body_params=None,
query_params=build_params_task,
env="prod",
)
download_task.set_upstream(file_name_task) # pylint: disable=E1101

save_data_task = save_data_to_file(
data=download_task,
file_folder=create_folders_task["raw"],
table_id=TABLE_ID,
ap=AP,
add_load_date_to_filename=True,
load_date=build_params_task["date"],
)
save_data_task.set_upstream(download_task) # pylint: disable=E1101

#####################################
# Tasks section #2 - Transform data and Create table
#####################################

with case(save_data_task, True):
create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(save_data_task)

upload_to_datalake_task = upload_to_datalake(
input_path=create_folders_task["partition_directory"],
dataset_id=DATASET_ID,
table_id=TABLE_ID,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
dataset_is_public=False,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitacare.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitacare.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
)

dump_vitacare.schedule = vitacare_daily_update_schedule
58 changes: 58 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Schedules for the vitacare dump pipeline
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
import pytz


from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import (
constants as vitacare_constants,
)
from pipelines.utils.utils import untuple_clocks as untuple
from pipelines.rj_sms.utils import generate_dicts, generate_dump_api_schedules


posicao_parameters = generate_dicts(
dict_template={
"dataset_id": vitacare_constants.DATASET_ID.value,
"table_id": "estoque_posicao",
"ap": "",
"endpoint": "posicao",
"date": "today",
},
key="ap",
values=["10", "21", "22", "31", "32", "33", "40", "51", "52", "53"],
)

movimento_parameters = generate_dicts(
dict_template={
"dataset_id": vitacare_constants.DATASET_ID.value,
"table_id": "estoque_movimento",
"ap": "",
"endpoint": "movimento",
"date": "yesterday",
},
key="ap",
values=["10", "21", "22", "31", "32", "33", "40", "51", "52", "53"],
)

flow_parameters = posicao_parameters + movimento_parameters


vitacare_clocks = generate_dump_api_schedules(
interval=timedelta(days=1),
start_date=datetime(2023, 1, 1, 5, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
flow_run_parameters=flow_parameters,
runs_interval_minutes=1,
)

vitacare_daily_update_schedule = Schedule(clocks=untuple(vitacare_clocks))
Loading

0 comments on commit b8dfb4e

Please sign in to comment.