Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Cria pipeline de captura de recursos de viagens individuais SPPO #546

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3c733ba
Realiza ajustes iniciais
lingsv Oct 27, 2023
b7bfdca
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Oct 27, 2023
a8e3d4c
update tasks principais
lingsv Oct 30, 2023
9ffbed6
update nome de função
lingsv Oct 30, 2023
778bcfc
retirei código sensível
lingsv Oct 30, 2023
664d5fd
atualiza parâmetro de função
lingsv Oct 30, 2023
a2207e7
criei arquivo de utils para separar funções
lingsv Oct 30, 2023
6686f70
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Oct 31, 2023
479f895
ajustes nos códigos
lingsv Oct 31, 2023
2232ab9
teste de extração
lingsv Oct 31, 2023
34c2fbb
alteração para testes no prefect
lingsv Oct 31, 2023
6d8d4ef
wip
lingsv Oct 31, 2023
1476240
wip
lingsv Oct 31, 2023
a999b49
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Nov 1, 2023
9da006a
configurei o fluxo de recurso para o fluxo default
lingsv Nov 1, 2023
70065be
update pipeline de recursos
lingsv Nov 6, 2023
8211741
wip teste de flow
lingsv Nov 6, 2023
57e0010
retirei função unmapped
lingsv Nov 6, 2023
88b3e20
tira map do flow, coloca top e skip como variável de função
lingsv Nov 7, 2023
184dccc
readequação de parâmetros
lingsv Nov 7, 2023
684eca5
alteração nome do flow
lingsv Nov 7, 2023
6ff0254
altera nome do flow
lingsv Nov 7, 2023
6dd66b4
altera nomes de arquivos antigos
lingsv Nov 7, 2023
7dbac76
excluí arquivos antigos
lingsv Nov 7, 2023
d90f5a6
update nome do flow
lingsv Nov 7, 2023
021b177
wip teste
lingsv Nov 7, 2023
73cd351
restaurando formato do flow
lingsv Nov 7, 2023
c5cb62c
alterei saída da função
lingsv Nov 7, 2023
3a38350
update constants
lingsv Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ setup.py
.vscode/*
*.hdf
*.DS_Store

.idea/*

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
4 changes: 4 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,8 @@ class constants(Enum): # pylint: disable=c0103
"user_id": "620000269392019469",
"type": "user_nickname",
},
"igorlaltuf": {
"user_id": "87545892319531008",
"type": "user_nickname",
},
}
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pipelines.rj_smtr.materialize_to_datario.flows import *
from pipelines.rj_smtr.registros_ocr_rir.flows import *
from pipelines.rj_smtr.projeto_subsidio_sppo.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_recurso.flows import *
from pipelines.rj_smtr.veiculo.flows import *
from pipelines.rj_smtr.example.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import *
Expand Down
Empty file.
86 changes: 86 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_recurso/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_recurso
"""
from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect import Parameter, case, task
from prefect.tasks.control_flow import merge
from prefect.utilities.edges import unmapped

# EMD Imports #

from pipelines.constants import constants as emd_constants
from pipelines.utils.utils import set_default_parameters
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_current_flow_labels,
)

# SMTR Imports #

from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.tasks import get_current_timestamp

from pipelines.rj_smtr.flows import default_capture_flow


# SETUP #

sppo_recurso_captura = deepcopy(default_capture_flow)
sppo_recurso_captura.name = "SMTR: Subsídio SPPO Recursos - Captura"
sppo_recurso_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
sppo_recurso_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
sppo_recurso_captura_params = set_default_parameters(
flow=sppo_recurso_captura,
default_parameters=constants.SUBSIDIO_SPPO_RECURSO_DEFAULT_PARAM.value,
)

with Flow(
"SMTR: Subsídio Recursos Viagens Individuais - Captura",
code_owners=["carolinagomes", "igorlaltuf"],
) as subsidio_sppo_recurso:
capture = Parameter("capture", default=True)
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix=subsidio_sppo_recurso.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

with case(capture, True):
run_captura = create_flow_run.map(
flow_name=sppo_recurso_captura.name,
project_name=unmapped("staging"),
parameters=sppo_recurso_captura_params,
labels=LABELS,
)

wait_captura_true = wait_for_flow_run.map(
run_captura,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(capture, False):
wait_captura_false = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
)()

wait_captura = merge(wait_captura_true, wait_captura_false)

subsidio_sppo_recurso.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
subsidio_sppo_recurso.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
23 changes: 23 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,26 @@ class constants(Enum): # pylint: disable=c0103
"version": {},
},
}

# SUBSÍDIO RECURSOS VIAGENS INDIVIDUAIS
SUBSIDIO_SPPO_RECURSOS_DATASET_ID = "br_rj_riodejaneiro_recurso"
SUBSIDIO_SPPO_RECURSOS_TABLE_ID = "recurso_sppo"
SUBSIDIO_SPPO_RECURSO_API_BASE_URL = "https://api.movidesk.com/public/v1/tickets?"
SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH = "sppo_subsidio_recursos_api"
SUBSIDIO_SPPO_RECURSO_DEFAULT_PARAM = {"date_range_end": "%Y-%m-%dT%H:%M:%S.%MZ"}
SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS = {
"partition_date_only": True,
"source_type": "movidesk",
"dataset_id": "br_rj_riodejaneiro_recurso",
"extract_params": {
"$service": "serviceFull eq 'SPPO'",
"$select": "'id', 'protocol','createdDate'",
"$filter": "{dates} and serviceFull/any(serviceFull: {service})",
"$expand": "customFieldValues",
"$customFieldValues($expand=items)": "customFieldValues($expand=items)",
"$actions($select=id,description)": "actions($select=id,description)",
},
"interval_minutes": "{timestamp}",
}
# TIMEOUT = 10 # em segundos
# BACKOFF_FACTOR = 1.5
21 changes: 20 additions & 1 deletion pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_raw_data_api,
get_raw_data_gcs,
get_raw_data_db,
get_raw_recursos,
upload_run_logs_to_bq,
get_datetime_range,
read_raw_data,
Expand Down Expand Up @@ -252,7 +253,7 @@ def create_dbt_run_vars(

###############
#
# Local file managment
# Local file management
#
###############

Expand Down Expand Up @@ -668,6 +669,20 @@ def create_request_params(
elif dataset_id == constants.GTFS_DATASET_ID.value:
request_params = extract_params["filename"]

elif dataset_id == constants.SUBSIDIO_SPPO_RECURSOS_DATASET_ID.value:
end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ")
dates = f"createdDate le {end}"
request_params = extract_params.copy()
request_params["$filter"] = request_params["$filter"].format(
dates, request_params["$service"]
)

request_params["token"] = get_vault_secret(
constants.SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH.value
)["data"]["token"]

request_url = constants.SUBSIDIO_SPPO_RECURSO_API_BASE_URL.value

return request_params, request_url


Expand Down Expand Up @@ -725,6 +740,10 @@ def get_raw_from_sources(
error, data, filetype = get_raw_data_db(
host=source_path, secret_path=secret_path, **request_params
)
elif source_type == "movidesk":
error, data, filetype = get_raw_recursos(
request_url=source_path, request_params=request_params
)
else:
raise NotImplementedError(f"{source_type} not supported")

Expand Down
55 changes: 54 additions & 1 deletion pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import pymysql
import psycopg2
import psycopg2.extras

import time
from typing import Dict

from prefect.schedules.clocks import IntervalClock

Expand Down Expand Up @@ -828,3 +829,55 @@ def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFra
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return error, data


def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]:
"""
Returns a dataframe with recursos data from movidesk api.
"""
all_records = False
top = 1000
skip = 0
error = None
filetype = "json"
data = []

while not all_records:
try:
# request_params["$top"] = top
# request_params["$skip"] = skip

log(f"Request params: {request_params}")

response = requests.get(
request_url,
params=request_params,
timeout=constants.MAX_TIMEOUT_SECONDS.value,
)
response.raise_for_status()

paginated_data = response.json()

log(f"Dados (iniciais): {paginated_data}")

if isinstance(paginated_data, dict):
paginated_data = [paginated_data]

if len(paginated_data) == top:
skip += top
time.sleep(20)
else:
all_records = True

data += paginated_data
log(f"Dados (paginados): {len(data)}")

except Exception as error:
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")
data = []
break

log(f"Request concluído, com status: {data}.")

return error, data, filetype