Skip to content

Commit

Permalink
Merge pull request #549 from prefeitura-rio/staging/sms-vitai-flows
Browse files Browse the repository at this point in the history
Staging/sms vitai flows
  • Loading branch information
gabriel-milan authored Nov 1, 2023
2 parents 59cbe59 + 8d85881 commit 578a28b
Show file tree
Hide file tree
Showing 13 changed files with 890 additions and 275 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ replit.nix
test_local.py
pylint.txt
test.py
test.ipynb
test/*
test/*.ipynb
test/*.csv
Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_sms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
"""

from pipelines.rj_sms.dump_db_sivep.flows import *
from pipelines.rj_sms.pubsub.flows import *
from pipelines.rj_sms.dump_api_prontuario_vitai.flows import *
18 changes: 18 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitai/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Constants for utils.
"""
from enum import Enum


class constants(Enum):
"""
Constant values for the dump vitai flows
"""

VAULT_PATH = "estoque_vitai"
VAULT_KEY = "token"
DATASET_ID = "brutos_prontuario_vitai"
TABLE_POSICAO_ID = "estoque_posicao"
TABLE_MOVIMENTOS_ID = "estoque_movimento"
160 changes: 160 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitai/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Vitai healthrecord dumping flows
"""

from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitai.constants import (
constants as vitai_constants,
)
from pipelines.rj_sms.utils import (
create_folders,
from_json_to_csv,
download_from_api,
add_load_date_column,
create_partitions,
upload_to_datalake,
)
from pipelines.rj_sms.dump_api_prontuario_vitai.tasks import (
build_movimentos_date,
build_movimentos_url,
)
from pipelines.rj_sms.dump_api_prontuario_vitai.schedules import every_day_at_six_am


with Flow(
name="SMS: Dump Vitai - Captura Posição de Estoque", code_owners=["thiago"]
) as dump_vitai_posicao:
# Parameters
# Parameters for Vault
vault_path = vitai_constants.VAULT_PATH.value
vault_key = vitai_constants.VAULT_KEY.value
# Paramenters for GCP
dataset_id = vitai_constants.DATASET_ID.value
table_id = vitai_constants.TABLE_POSICAO_ID.value

# Start run
create_folders_task = create_folders()

download_task = download_from_api(
url="https://apidw.vitai.care/api/dw/v1/produtos/saldoAtual",
params=None,
file_folder=create_folders_task["raw"],
file_name=table_id,
vault_path=vault_path,
vault_key=vault_key,
add_load_date_to_filename=True,
)
download_task.set_upstream(create_folders_task)

conversion_task = from_json_to_csv(input_path=download_task, sep=";")
conversion_task.set_upstream(download_task)

add_load_date_column_task = add_load_date_column(
input_path=conversion_task, sep=";"
)
add_load_date_column_task.set_upstream(conversion_task)

create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(add_load_date_column_task)

upload_to_datalake_task = upload_to_datalake(
input_path=create_folders_task["partition_directory"],
dataset_id=dataset_id,
table_id=table_id,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitai_posicao.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitai_posicao.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
)

dump_vitai_posicao.schedule = every_day_at_six_am


with Flow(
name="SMS: Dump Vitai - Captura Movimentos de Estoque", code_owners=["thiago"]
) as dump_vitai_movimentos:
# Parameters
# Parameters for Vault
vault_path = vitai_constants.VAULT_PATH.value
vault_key = vitai_constants.VAULT_KEY.value
# Paramenters for GCP
dataset_id = vitai_constants.DATASET_ID.value
table_id = vitai_constants.TABLE_MOVIMENTOS_ID.value
# Parameters for Vitai
date = Parameter("date", default=None)

# Start run
create_folders_task = create_folders()

build_date_task = build_movimentos_date(date_param=date)
build_date_task.set_upstream(create_folders_task)

build_url_task = build_movimentos_url(date_param=date)
build_url_task.set_upstream(build_date_task)

download_task = download_from_api(
url=build_url_task,
params=None,
file_folder=create_folders_task["raw"],
file_name=table_id,
vault_path=vault_path,
vault_key=vault_key,
add_load_date_to_filename=True,
load_date=build_date_task,
)
download_task.set_upstream(build_url_task)

conversion_task = from_json_to_csv(input_path=download_task, sep=";")
conversion_task.set_upstream(download_task)

add_load_date_column_task = add_load_date_column(
input_path=conversion_task, sep=";", load_date=build_date_task
)
add_load_date_column_task.set_upstream(conversion_task)

create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(add_load_date_column_task)

upload_to_datalake_task = upload_to_datalake(
input_path=create_folders_task["partition_directory"],
dataset_id=dataset_id,
table_id=table_id,
if_exists="replace",
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
)
upload_to_datalake_task.set_upstream(create_partitions_task)


dump_vitai_movimentos.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_vitai_movimentos.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
)

dump_vitai_movimentos.schedule = every_day_at_six_am
23 changes: 23 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitai/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Schedules for the database dump pipeline
"""

from datetime import timedelta
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants

every_day_at_six_am = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=pendulum.datetime(2023, 1, 1, 6, 0, 0, tz="America/Sao_Paulo"),
labels=[
constants.RJ_SMS_AGENT_LABEL.value,
],
)
]
)
46 changes: 46 additions & 0 deletions pipelines/rj_sms/dump_api_prontuario_vitai/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
Tasks for dump_api_prontuario_vitai
"""

from datetime import date, timedelta
from prefect import task
from pipelines.utils.utils import log


@task
def build_movimentos_url(date_param=None):
"""
Builds a URL for querying product movements from the Vitai API.
Args:
date_param (str, optional): The date to query in the format "YYYY-MM-DD".
Defaults to yesterday's date.
Returns:
str: The URL for querying product movements from the Vitai API.
"""
if date_param is None:
date_param = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")

url = f"https://apidw.vitai.care/api/dw/v1/movimentacaoProduto/query/dataMovimentacao/{date_param}" # noqa: E501
log(f"URL built: {url}")
return url


@task
def build_movimentos_date(date_param=None):
"""
Builds a date string in the format '%Y-%m-%d' based on the given date_param or yesterday's
date if date_param is None.
Args:
date_param (str, optional): A date string in the format '%Y-%m-%d'. Defaults to None.
Returns:
str: A date string in the format '%Y-%m-%d'.
"""
if date_param is None:
date_param = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")

return date_param
Empty file.
44 changes: 0 additions & 44 deletions pipelines/rj_sms/pubsub/flows.py

This file was deleted.

36 changes: 0 additions & 36 deletions pipelines/rj_sms/pubsub/tasks.py

This file was deleted.

Loading

0 comments on commit 578a28b

Please sign in to comment.