Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging/sms farmacia estoque refactoring #507

Merged
merged 141 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
7544d44
initial commit
ThiagoTrabach Aug 10, 2023
fd4fc68
first working extract and load
ThiagoTrabach Aug 12, 2023
47838a4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2023
61a276a
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 12, 2023
3761e3c
remove teste
ThiagoTrabach Aug 12, 2023
7176ff5
improve run file for local tests
ThiagoTrabach Aug 12, 2023
035d916
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2023
7f48dd5
add filter by lastupdate to list files function
ThiagoTrabach Aug 12, 2023
14af485
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2023
c2bb8e4
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 14, 2023
6c5044a
wip fix credentials access
ThiagoTrabach Aug 14, 2023
ed03869
fix build issue
ThiagoTrabach Aug 14, 2023
8ee9181
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 14, 2023
49194d7
git add agent label
ThiagoTrabach Aug 14, 2023
5c2ca2d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 14, 2023
93d2aff
fix file path
ThiagoTrabach Aug 14, 2023
8e318ae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 14, 2023
66fbf47
fix download path
ThiagoTrabach Aug 14, 2023
8a48516
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 16, 2023
f69404a
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 16, 2023
2122380
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 16, 2023
e1a2995
add flow captura tpc
ThiagoTrabach Aug 16, 2023
edcc5c0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 16, 2023
2e262de
fix captura_tpc flow
ThiagoTrabach Aug 16, 2023
476296f
add captura vitai
ThiagoTrabach Aug 17, 2023
e5228b1
change file name saved in cloud storage
ThiagoTrabach Aug 17, 2023
6b75118
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 17, 2023
c49cc0d
wip upload organizations
ThiagoTrabach Aug 17, 2023
ced7473
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 17, 2023
825e5cb
move dump cnes to a different project
ThiagoTrabach Aug 18, 2023
ea4f03f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 18, 2023
bb52552
change dump mode for estoque
ThiagoTrabach Aug 18, 2023
5569acc
change dump mode for tpc
ThiagoTrabach Aug 18, 2023
7a957f5
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 18, 2023
a2501db
fix cnes data type
ThiagoTrabach Aug 18, 2023
df1834c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 18, 2023
a10992e
change import
ThiagoTrabach Aug 20, 2023
e0f1e7a
initial commit
ThiagoTrabach Aug 21, 2023
fbe2807
fix vitai import
ThiagoTrabach Aug 21, 2023
2f6d507
fix typo
ThiagoTrabach Aug 21, 2023
818e652
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 21, 2023
1e19b0c
remove farmacia project
ThiagoTrabach Aug 21, 2023
af89d1c
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 21, 2023
a2e9c0b
Merge branch 'master' into staging/sms-farmacia-refactor-to-parquet
mergify[bot] Aug 21, 2023
aac0e31
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 21, 2023
243fc49
Merge branch 'master' into staging/sms-farmacia-refactor-to-parquet
mergify[bot] Aug 21, 2023
d4529f8
add todo
ThiagoTrabach Aug 22, 2023
c143c81
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 22, 2023
b034237
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 22, 2023
6e2b6cd
Merge branch 'master' into staging/sms-farmacia-refactor-to-parquet
mergify[bot] Aug 22, 2023
9e36e89
wip change to csv
ThiagoTrabach Aug 22, 2023
fc2a4ea
refactor tpc dump
ThiagoTrabach Aug 23, 2023
398df79
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 23, 2023
179059a
refactor project dump vitai
ThiagoTrabach Aug 23, 2023
b076665
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 23, 2023
c2caadc
remove unecessary files
ThiagoTrabach Aug 23, 2023
86df7b9
fix init
ThiagoTrabach Aug 23, 2023
daa008c
refactor dump vitai project
ThiagoTrabach Aug 23, 2023
7b3cf33
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 23, 2023
b921e94
add to payload import date
ThiagoTrabach Aug 23, 2023
07b153c
fix missing import
ThiagoTrabach Aug 23, 2023
349568f
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 23, 2023
9ad64df
Merge branch 'master' into staging/sms-farmacia-refactor-to-parquet
mergify[bot] Aug 23, 2023
85757ff
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 25, 2023
61d42ca
Merge branch 'master' into staging/sms-farmacia-refactor-to-parquet
mergify[bot] Aug 25, 2023
e5d3ad3
fix log error message
ThiagoTrabach Aug 28, 2023
de91168
add scheduler
ThiagoTrabach Aug 28, 2023
b9d6e5f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
bd245e9
Merge branch 'staging/sms-farmacia-refactor-to-parquet' into staging/…
ThiagoTrabach Aug 29, 2023
4f4f6a2
remove unused class
ThiagoTrabach Aug 29, 2023
f0fde5e
change dump mode to prefect parameter
ThiagoTrabach Aug 29, 2023
2f657c8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 29, 2023
49cbecd
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Aug 30, 2023
16a62d6
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Sep 4, 2023
5ef96df
initial commit flow vitai
ThiagoTrabach Sep 11, 2023
792d690
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2023
753d514
fix filename for vitai and vitacare
ThiagoTrabach Sep 11, 2023
b30c906
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2023
70f1ac6
add vitacare flow to init file
ThiagoTrabach Sep 11, 2023
a57e70c
fix vitacare handling date parameter
ThiagoTrabach Sep 11, 2023
729ebbd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2023
7ca3ec3
add print for log purpose
ThiagoTrabach Sep 11, 2023
ef588ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2023
52c29a8
change method to pass params to vitacare
ThiagoTrabach Sep 11, 2023
c13fee6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2023
f4c80ab
add log to build_params task
ThiagoTrabach Sep 11, 2023
461a982
change log method
ThiagoTrabach Sep 11, 2023
7b63c0c
add error log to download api
ThiagoTrabach Sep 11, 2023
057431f
add flow to check ip
ThiagoTrabach Sep 12, 2023
48c047e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 12, 2023
f14e12a
Merge branch 'master' into staging/sms-farmacia-estoque
mergify[bot] Sep 12, 2023
f83c9a1
wip fix separator issue
ThiagoTrabach Sep 13, 2023
3b6d8f9
initial commit
ThiagoTrabach Sep 13, 2023
e7fa2e1
feat: add working flow for vitacare
ThiagoTrabach Sep 14, 2023
118b12f
change vitacare flow name
ThiagoTrabach Sep 15, 2023
583724a
flow vitai initial commit
ThiagoTrabach Sep 15, 2023
bd4a501
wip flow vitai movimentos
ThiagoTrabach Sep 15, 2023
5ff0817
wip
ThiagoTrabach Sep 15, 2023
b219922
add working version vitai movimentos flow
ThiagoTrabach Sep 15, 2023
58f227d
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Sep 15, 2023
52494fa
change table name for vitai movimentos flow
ThiagoTrabach Sep 15, 2023
3fc4449
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 15, 2023
e503d7b
add vitai flows to init
ThiagoTrabach Sep 15, 2023
91c917e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 15, 2023
2599383
fix log
ThiagoTrabach Sep 15, 2023
b54b07e
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Sep 16, 2023
bfd0e4c
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Sep 18, 2023
2fb8dfb
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Sep 19, 2023
fb923f2
remove pubsub flow
ThiagoTrabach Oct 5, 2023
2ef8a36
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Oct 5, 2023
aedcf6b
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Oct 5, 2023
f645cc9
add dbt materialization to vitai posicao flow
ThiagoTrabach Oct 5, 2023
44bb28d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2023
9409216
change how dbt materialization handle alias
ThiagoTrabach Oct 5, 2023
cd2840d
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Oct 5, 2023
2497446
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Oct 5, 2023
d687635
return dbt alias handle
ThiagoTrabach Oct 5, 2023
4438157
change vitai flow parameters
ThiagoTrabach Oct 8, 2023
2639815
change how temp folder is created
ThiagoTrabach Oct 8, 2023
acb6d6f
change agent to dev
ThiagoTrabach Oct 9, 2023
8a97bf3
change missing agent to dev
ThiagoTrabach Oct 9, 2023
e11f7da
Merge branch 'master' into staging/sms-farmacia-estoque-refactoring
mergify[bot] Oct 10, 2023
c70294d
refactor vitai posicao
ThiagoTrabach Oct 11, 2023
8d77f1e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 11, 2023
ebabcd2
refact vitai movimento flow
ThiagoTrabach Oct 11, 2023
7350470
add refactored flows
ThiagoTrabach Oct 13, 2023
d357616
add working version of dump_ftp_cnes
ThiagoTrabach Oct 14, 2023
7ec5c09
feat: v1 o dump cnes flow
ThiagoTrabach Oct 14, 2023
dee29a3
add missing docstring to files and functions
ThiagoTrabach Oct 14, 2023
285aa04
add missing docstring
ThiagoTrabach Oct 14, 2023
35c7344
conform to flake
ThiagoTrabach Oct 14, 2023
3ccc624
update gitignore
ThiagoTrabach Oct 14, 2023
753b76d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 14, 2023
1551a68
add sheets dump
ThiagoTrabach Oct 14, 2023
39f7048
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 14, 2023
46f5d09
remove pycurl package
ThiagoTrabach Oct 14, 2023
6712fe4
remove main py
ThiagoTrabach Oct 14, 2023
7b86489
fix pycurl import
ThiagoTrabach Oct 14, 2023
bf62a2b
change table-id value constant
ThiagoTrabach Oct 15, 2023
1cb69b7
change vitacare flow to handle multiple endpoints
ThiagoTrabach Oct 30, 2023
26a5b67
wip sigtap flow
ThiagoTrabach Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ replit.nix
test_local.py
pylint.txt
test.py
test*.ipynb
test/*
test/*.ipynb
test/*.csv
Expand Down
8 changes: 7 additions & 1 deletion pipelines/rj_sms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@
"""

from pipelines.rj_sms.dump_db_sivep.flows import *
from pipelines.rj_sms.pubsub.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitacare.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitai.flows import *
from pipelines.rj_sms.dump_azureblob_estoque_tpc.flows import *
from pipelines.rj_sms.dump_ftp_cnes.flows import *
from pipelines.rj_sms.dump_ftp_sigtap.flows import *
from pipelines.rj_sms.dump_sheets.flows import *
from pipelines.rj_sms.materialize_datalake.flows import *
31 changes: 31 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for utils.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump vitai flows
"""

VAULT_PATH = "estoque_vitai"
VAULT_KEY = "token"
DATASET_ID = "brutos_prontuario_vitacare"
TABLE_POSICAO_ID = "estoque_posicao"
TABLE_MOVIMENTOS_ID = "estoque_movimento"
ENDPOINT_BASE_URL = [
"http://consolidado-ap10.pepvitacare.com:8088",
"http://consolidado-ap21.pepvitacare.com:8088",
"http://consolidado-ap22.pepvitacare.com:8088",
"http://consolidado-ap31.pepvitacare.com:8089",
"http://consolidado-ap32.pepvitacare.com:8088",
"http://consolidado-ap33.pepvitacare.com:8089",
"http://consolidado-ap40.pepvitacare.com:8089",
"http://consolidado-ap51.pepvitacare.com:8089",
"http://consolidado-ap52.pepvitacare.com:8088",
"http://consolidado-ap53.pepvitacare.com:8090"]
ENDPOINT_POSICAO = "/reports/pharmacy/stocks"
ENDPOINT_MOVIMENTOS = "/reports/pharmacy/movements"
140 changes: 140 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import (
constants as vitacare_constants,
)
from pipelines.rj_sms.utils import (
create_folders,
from_json_to_csv,
download_from_api,
add_load_date_column,
create_partitions,
upload_to_datalake,
)
from pipelines.rj_sms.dump_api_prontuario_vitacare.tasks import (
build_params,
download_multiple_files)

from pipelines.rj_sms.dump_api_prontuario_vitai.schedules import every_day_at_six_am


with Flow(
name="SMS: Dump VitaCare - Captura Posição de Estoque", code_owners=["thiago"]
) as dump_vitacare_posicao:
# Set Parameters
# Vault
vault_path = vitacare_constants.VAULT_PATH.value
vault_key = vitacare_constants.VAULT_KEY.value
# GCP
dataset_id = vitacare_constants.DATASET_ID.value
table_id = vitacare_constants.TABLE_POSICAO_ID.value

# Vitacare API
endpoint_base_url = vitacare_constants.ENDPOINT_BASE_URL.value
endpoint_posicao = vitacare_constants.ENDPOINT_POSICAO.value
date = Parameter("date", default="today")

# Start run
create_folders_task = create_folders()

build_params_task = build_params(date_param=date)
build_params_task.set_upstream(create_folders_task)

download_multiple_files_task = download_multiple_files(
base_urls=endpoint_base_url,
endpoint=endpoint_posicao,
params=build_params_task,
table_id=table_id,
vault_path=vault_path,
vault_key=vault_key)
download_multiple_files_task.set_upstream(build_params_task)

create_partitions_task = create_partitions(
data_path="./data/raw", partition_directory="./data/partition_directory"
)
create_partitions_task.set_upstream(download_multiple_files_task)

upload_to_datalake_task = upload_to_datalake(
input_path="./data/partition_directory",
dataset_id=dataset_id,
table_id=table_id,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitacare_posicao.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitacare_posicao.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_DEV_AGENT_LABEL.value,
],
)

dump_vitacare_posicao.schedule = every_day_at_six_am


with Flow(
name="SMS: Dump VitaCare - Captura Posição de Estoque", code_owners=["thiago"]
) as dump_vitacare_movimento:
# Set Parameters
# Vault
vault_path = vitacare_constants.VAULT_PATH.value
vault_key = vitacare_constants.VAULT_KEY.value
# GCP
dataset_id = vitacare_constants.DATASET_ID.value
table_id = vitacare_constants.TABLE_MOVIMENTOS_ID.value

# Vitacare API
endpoint_base_url = vitacare_constants.ENDPOINT_BASE_URL.value
endpoint_movimentos = vitacare_constants.ENDPOINT_MOVIMENTOS.value
date = Parameter("date", default="yesterday")

# Start run
create_folders_task = create_folders()

build_params_task = build_params(date_param=date)
build_params_task.set_upstream(create_folders_task)

download_multiple_files_task = download_multiple_files(
base_urls=endpoint_base_url,
endpoint=endpoint_movimentos,
params=build_params_task,
table_id=table_id,
vault_path=vault_path,
vault_key=vault_key)
download_multiple_files_task.set_upstream(build_params_task)

create_partitions_task = create_partitions(
data_path="./data/raw", partition_directory="./data/partition_directory"
)
create_partitions_task.set_upstream(download_multiple_files_task)

upload_to_datalake_task = upload_to_datalake(
input_path="./data/partition_directory",
dataset_id=dataset_id,
table_id=table_id,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitacare_movimento.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitacare_movimento.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_DEV_AGENT_LABEL.value,
],
)

dump_vitacare_movimento.schedule = every_day_at_six_am
63 changes: 63 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
"""
Tasks for dump_api_prontuario_vitacare
"""

from datetime import date, timedelta
import re
from prefect import task
from pipelines.utils.utils import log
from pipelines.rj_sms.utils import (
from_json_to_csv,
download_from_api,
add_load_date_column
)


@task
def build_params(date_param="today"):
if date_param == "today":
params = {"date": str(date.today())}
elif date_param == "yesterday":
params = {"date": str(date.today() - timedelta(days=1))}
else:
{"date": date_param}
log(f"Params built: {params}")
return params


@task
def download_multiple_files(
base_urls: list, endpoint: str, params: dict, table_id:str, vault_path: str, vault_key: str
):

pattern = r"ap\d{2}"

for n, base_url in enumerate(base_urls):

ap = re.findall(pattern, base_url)[0]

log(f"Downloading {ap} ({n+1}/{len(base_urls)})")

download_task = download_from_api.run(
url=f"{base_url}{endpoint}",
params=params,
file_folder="./data/raw",
file_name=f"{table_id}-{ap}",
vault_path=vault_path,
vault_key=vault_key,
add_load_date_to_filename=True,
load_date=params["date"],
)

#
with open(download_task, 'r', encoding="UTF-8") as f:
first_line = f.readline().strip()
if first_line == '[]':
log("The json content is empty.")
else:
conversion_task = from_json_to_csv.run(input_path=download_task, sep=";")

add_load_date_column_task = add_load_date_column.run(
input_path=conversion_task, sep=";"
)
18 changes: 18 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitai/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for utils.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump vitai flows
"""

VAULT_PATH = "estoque_vitai"
VAULT_KEY = "token"
DATASET_ID = "brutos_prontuario_vitai"
TABLE_POSICAO_ID = "estoque_posicao"
TABLE_MOVIMENTOS_ID = "estoque_movimento"
Loading