Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging/sms estoque #580

Merged
merged 30 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7b28fb8
Add new files and update imports
ThiagoTrabach Nov 25, 2023
478a290
Refactor code and reorder columns in CSV file
ThiagoTrabach Nov 26, 2023
0f4b5f4
Update API endpoints in constants.py
ThiagoTrabach Nov 26, 2023
430075e
Uncomment rename_flow_task in
ThiagoTrabach Nov 26, 2023
1650104
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 26, 2023
41b7d5b
Merge branch 'master' into staging/sms-estoque
mergify[bot] Nov 26, 2023
aef7e47
Update constants for Vitai and Vitacare
ThiagoTrabach Nov 26, 2023
d2bc5e3
Update API endpoints in constants.py
ThiagoTrabach Nov 27, 2023
01bf526
Add Google Sheets dumping functionality
ThiagoTrabach Nov 28, 2023
5a7d546
Update schedules for vitacare and tpc data dumps
ThiagoTrabach Nov 28, 2023
f258929
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 28, 2023
519af12
Update schedule for TPC daily update
ThiagoTrabach Nov 28, 2023
0e2ea2c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 28, 2023
84dc346
Update schedules for daily data dump
ThiagoTrabach Nov 28, 2023
fe5f6e7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 28, 2023
27c046b
Update schedules.py with new start dates
ThiagoTrabach Nov 28, 2023
93c368c
Merge branch 'master' into staging/sms-estoque
mergify[bot] Nov 28, 2023
9695f26
Merge branch 'master' into staging/sms-estoque
mergify[bot] Nov 28, 2023
e485ef9
Merge branch 'master' into staging/sms-estoque
mergify[bot] Nov 29, 2023
53b0f88
Update GCP dataset_id and start_date in RJ SMS
ThiagoTrabach Nov 29, 2023
089da71
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 29, 2023
09dec0a
Merge branch 'master' into staging/sms-estoque
mergify[bot] Nov 29, 2023
dde5d48
Update code owners and schedules
ThiagoTrabach Nov 29, 2023
eb3d983
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 29, 2023
1ac32b4
Remove "danilo" from code_owners list
ThiagoTrabach Nov 29, 2023
7488ada
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 29, 2023
80f65b0
Update start time for SMS clocks
ThiagoTrabach Nov 30, 2023
53a18ca
Add .DS_Store file to pipelines/rj_sms directory
ThiagoTrabach Nov 30, 2023
855a3e1
Remove .DS_Store files and update .gitignore
ThiagoTrabach Nov 30, 2023
d158c9b
Add "danilo" as a code owner in multiple flows
ThiagoTrabach Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ test/*.csv
setup.py
.vscode/*
*.hdf
*.DS_Store
.idea/*
*.DS_Store


# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -148,4 +149,5 @@ dmypy.json

# Backfill stuff
backfill.csv
REVIEW.md
REVIEW.md
.DS_Store
4 changes: 4 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,8 @@ class constants(Enum): # pylint: disable=c0103
"user_id": "222842688117014528",
"type": "user_nickname",
},
"danilo": {
"user_id": "1147152438487416873",
"type": "user_nickname",
},
}
Binary file not shown.
3 changes: 3 additions & 0 deletions pipelines/rj_sms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@

from pipelines.rj_sms.dump_db_sivep.flows import *
from pipelines.rj_sms.dump_ftp_cnes.flows import *
from pipelines.rj_sms.dump_azureblob_estoque_tpc.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitai.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitacare.flows import *
from pipelines.rj_sms.dump_sheets.flows import *
31 changes: 31 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for Vitacare.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump vitai flows
"""

VAULT_PATH = "prontuario_vitacare"
DATASET_ID = "brutos_prontuario_vitacare"
BASE_URL = {
"10": "http://consolidado-ap10.pepvitacare.com:8088",
"21": "http://consolidado-ap21.pepvitacare.com:8090",
"22": "http://consolidado-ap22.pepvitacare.com:8091",
"31": "http://consolidado-ap31.pepvitacare.com:8089",
"32": "http://consolidado-ap32.pepvitacare.com:8090",
"33": "http://consolidado-ap33.pepvitacare.com:8089",
"40": "http://consolidado-ap40.pepvitacare.com:8089",
"51": "http://consolidado-ap51.pepvitacare.com:8091",
"52": "http://consolidado-ap52.pepvitacare.com:8088",
"53": "http://consolidado-ap53.pepvitacare.com:8092",
}
ENDPOINT = {
"posicao": "/reports/pharmacy/stocks",
"movimento": "/reports/pharmacy/movements",
}
138 changes: 138 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Vitacare healthrecord dumping flows
"""

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import (
constants as vitacare_constants,
)
from pipelines.rj_sms.tasks import (
get_secret,
create_folders,
cloud_function_request,
create_partitions,
upload_to_datalake,
)
from pipelines.rj_sms.dump_api_prontuario_vitacare.tasks import (
rename_flow,
build_url,
build_params,
create_filename,
save_data_to_file,
)

from pipelines.rj_sms.dump_api_prontuario_vitacare.schedules import (
vitacare_daily_update_schedule,
)


with Flow(
name="SMS: Dump VitaCare - Ingerir dados do prontuário VitaCare",
code_owners=[
"thiago",
"andre",
"danilo",
],
) as dump_vitacare:
#####################################
# Parameters
#####################################

# Flow
RENAME_FLOW = Parameter("rename_flow", default=True)

# Vault
VAULT_PATH = vitacare_constants.VAULT_PATH.value

# Vitacare API
AP = Parameter("ap", required=True, default="10")
ENDPOINT = Parameter("endpoint", required=True)
DATE = Parameter("date", default="today")

# GCP
DATASET_ID = Parameter("dataset_id", default=vitacare_constants.DATASET_ID.value)
TABLE_ID = Parameter("table_id", required=True)

#####################################
# Rename flow run
####################################

with case(RENAME_FLOW, True):
rename_flow_task = rename_flow(table_id=TABLE_ID, ap=AP)

####################################
# Tasks section #1 - Get data
#####################################

get_secret_task = get_secret(secret_path=VAULT_PATH)

create_folders_task = create_folders()
create_folders_task.set_upstream(get_secret_task) # pylint: disable=E1101

build_url_task = build_url(ap=AP, endpoint=ENDPOINT)

build_params_task = build_params(date_param=DATE)
build_params_task.set_upstream(create_folders_task) # pylint: disable=E1101

file_name_task = create_filename(table_id=TABLE_ID, ap=AP)
file_name_task.set_upstream(build_params_task)

download_task = cloud_function_request(
url=build_url_task,
credential=get_secret_task,
request_type="GET",
body_params=None,
query_params=build_params_task,
env="prod",
)
download_task.set_upstream(file_name_task) # pylint: disable=E1101

save_data_task = save_data_to_file(
data=download_task,
file_folder=create_folders_task["raw"],
table_id=TABLE_ID,
ap=AP,
add_load_date_to_filename=True,
load_date=build_params_task["date"],
)
save_data_task.set_upstream(download_task) # pylint: disable=E1101

#####################################
# Tasks section #2 - Transform data and Create table
#####################################

with case(save_data_task, True):
create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(save_data_task)

upload_to_datalake_task = upload_to_datalake(
input_path=create_folders_task["partition_directory"],
dataset_id=DATASET_ID,
table_id=TABLE_ID,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
dataset_is_public=False,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitacare.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitacare.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
)

dump_vitacare.schedule = vitacare_daily_update_schedule
58 changes: 58 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Schedules for the vitacare dump pipeline
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
import pytz


from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import (
constants as vitacare_constants,
)
from pipelines.utils.utils import untuple_clocks as untuple
from pipelines.rj_sms.utils import generate_dicts, generate_dump_api_schedules


posicao_parameters = generate_dicts(
dict_template={
"dataset_id": vitacare_constants.DATASET_ID.value,
"table_id": "estoque_posicao",
"ap": "",
"endpoint": "posicao",
"date": "today",
},
key="ap",
values=["10", "21", "22", "31", "32", "33", "40", "51", "52", "53"],
)

movimento_parameters = generate_dicts(
dict_template={
"dataset_id": vitacare_constants.DATASET_ID.value,
"table_id": "estoque_movimento",
"ap": "",
"endpoint": "movimento",
"date": "yesterday",
},
key="ap",
values=["10", "21", "22", "31", "32", "33", "40", "51", "52", "53"],
)

flow_parameters = posicao_parameters + movimento_parameters


vitacare_clocks = generate_dump_api_schedules(
interval=timedelta(days=1),
start_date=datetime(2023, 1, 1, 5, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
flow_run_parameters=flow_parameters,
runs_interval_minutes=1,
)

vitacare_daily_update_schedule = Schedule(clocks=untuple(vitacare_clocks))
Loading