diff --git a/.gitignore b/.gitignore index 2a51ce841..988c67eda 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ setup.py .vscode/* *.hdf *.DS_Store - +.idea/* # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/pipelines/constants.py b/pipelines/constants.py index 900e2ebf9..6891825c6 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -146,4 +146,8 @@ class constants(Enum): # pylint: disable=c0103 "user_id": "620000269392019469", "type": "user_nickname", }, + "igorlaltuf": { + "user_id": "87545892319531008", + "type": "user_nickname", + }, } diff --git a/pipelines/rj_smtr/__init__.py b/pipelines/rj_smtr/__init__.py index 8061ae9dc..6c415b5dd 100644 --- a/pipelines/rj_smtr/__init__.py +++ b/pipelines/rj_smtr/__init__.py @@ -14,6 +14,7 @@ from pipelines.rj_smtr.materialize_to_datario.flows import * from pipelines.rj_smtr.registros_ocr_rir.flows import * from pipelines.rj_smtr.projeto_subsidio_sppo.flows import * +from pipelines.rj_smtr.br_rj_riodejaneiro_recurso.flows import * from pipelines.rj_smtr.veiculo.flows import * from pipelines.rj_smtr.example.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import * diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_recurso/__init__.py b/pipelines/rj_smtr/br_rj_riodejaneiro_recurso/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_recurso/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_recurso/flows.py new file mode 100644 index 000000000..672ba9308 --- /dev/null +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_recurso/flows.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_rj_riodejaneiro_recurso +""" +from copy import deepcopy + +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from prefect import Parameter, case, task +from prefect.tasks.control_flow import merge +from prefect.utilities.edges import unmapped + +# EMD Imports # + +from pipelines.constants import constants as emd_constants +from pipelines.utils.utils import set_default_parameters +from pipelines.utils.decorators import Flow +from pipelines.utils.tasks import ( + rename_current_flow_run_now_time, + get_current_flow_labels, +) + +# SMTR Imports # + +from pipelines.rj_smtr.constants import constants +from pipelines.rj_smtr.tasks import get_current_timestamp + +from pipelines.rj_smtr.flows import default_capture_flow + + +# SETUP # + +sppo_recurso_captura = deepcopy(default_capture_flow) +sppo_recurso_captura.name = "SMTR: Subsídio SPPO Recursos - Captura" +sppo_recurso_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +sppo_recurso_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) +sppo_recurso_captura_params = set_default_parameters( + flow=sppo_recurso_captura, + default_parameters=constants.SUBSIDIO_SPPO_RECURSO_DEFAULT_PARAM.value, +) + +with Flow( + "SMTR: Subsídio Recursos Viagens Individuais - Captura", + code_owners=["carolinagomes", "igorlaltuf"], +) as subsidio_sppo_recurso: + capture = Parameter("capture", default=True) + timestamp = get_current_timestamp() + + rename_flow_run = rename_current_flow_run_now_time( + prefix=subsidio_sppo_recurso.name + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + with case(capture, True): + run_captura = create_flow_run.map( + flow_name=sppo_recurso_captura.name, + project_name=unmapped("staging"), + parameters=sppo_recurso_captura_params, + labels=LABELS, + ) + + wait_captura_true = wait_for_flow_run.map( + run_captura, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + with case(capture, False): + wait_captura_false = task( + lambda: [None], checkpoint=False, name="assign_none_to_previous_runs" + )() + + wait_captura = merge(wait_captura_true, wait_captura_false) + +subsidio_sppo_recurso.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +subsidio_sppo_recurso.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 8a7252ab8..a2d9efba2 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -400,3 +400,26 @@ class constants(Enum): # pylint: disable=c0103 "version": {}, }, } + + # SUBSÍDIO RECURSOS VIAGENS INDIVIDUAIS + SUBSIDIO_SPPO_RECURSOS_DATASET_ID = "br_rj_riodejaneiro_recurso" + SUBSIDIO_SPPO_RECURSOS_TABLE_ID = "recurso_sppo" + SUBSIDIO_SPPO_RECURSO_API_BASE_URL = "https://api.movidesk.com/public/v1/tickets?" + SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH = "sppo_subsidio_recursos_api" + SUBSIDIO_SPPO_RECURSO_DEFAULT_PARAM = {"date_range_end": "%Y-%m-%dT%H:%M:%S.%MZ"} + SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS = { + "partition_date_only": True, + "source_type": "movidesk", + "dataset_id": "br_rj_riodejaneiro_recurso", + "extract_params": { + "$service": "serviceFull eq 'SPPO'", + "$select": "'id', 'protocol','createdDate'", + "$filter": "{dates} and serviceFull/any(serviceFull: {service})", + "$expand": "customFieldValues", + "$customFieldValues($expand=items)": "customFieldValues($expand=items)", + "$actions($select=id,description)": "actions($select=id,description)", + }, + "interval_minutes": "{timestamp}", + } + # TIMEOUT = 10 # em segundos + # BACKOFF_FACTOR = 1.5 diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index f927a02f9..a667ba671 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -31,6 +31,7 @@ get_raw_data_api, get_raw_data_gcs, get_raw_data_db, + get_raw_recursos, upload_run_logs_to_bq, get_datetime_range, read_raw_data, @@ -252,7 +253,7 @@ def create_dbt_run_vars( ############### # -# Local file managment +# Local file management # ############### @@ -668,6 +669,20 @@ def create_request_params( elif dataset_id == constants.GTFS_DATASET_ID.value: request_params = extract_params["filename"] + elif dataset_id == constants.SUBSIDIO_SPPO_RECURSOS_DATASET_ID.value: + end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ") + dates = f"createdDate le {end}" + request_params = extract_params.copy() + request_params["$filter"] = request_params["$filter"].format( + dates, request_params["$service"] + ) + + request_params["token"] = get_vault_secret( + constants.SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH.value + )["data"]["token"] + + request_url = constants.SUBSIDIO_SPPO_RECURSO_API_BASE_URL.value + return request_params, request_url @@ -725,6 +740,10 @@ def get_raw_from_sources( error, data, filetype = get_raw_data_db( host=source_path, secret_path=secret_path, **request_params ) + elif source_type == "movidesk": + error, data, filetype = get_raw_recursos( + request_url=source_path, request_params=request_params + ) else: raise NotImplementedError(f"{source_type} not supported") diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 389a31d54..2d73a3f45 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -22,7 +22,8 @@ import pymysql import psycopg2 import psycopg2.extras - +import time +from typing import Dict from prefect.schedules.clocks import IntervalClock @@ -828,3 +829,55 @@ def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFra log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error, data + + +def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]: + """ + Returns a dataframe with recursos data from movidesk api. + """ + all_records = False + top = 1000 + skip = 0 + error = None + filetype = "json" + data = [] + + while not all_records: + try: + # request_params["$top"] = top + # request_params["$skip"] = skip + + log(f"Request params: {request_params}") + + response = requests.get( + request_url, + params=request_params, + timeout=constants.MAX_TIMEOUT_SECONDS.value, + ) + response.raise_for_status() + + paginated_data = response.json() + + log(f"Dados (iniciais): {paginated_data}") + + if isinstance(paginated_data, dict): + paginated_data = [paginated_data] + + if len(paginated_data) == top: + skip += top + time.sleep(20) + else: + all_records = True + + data += paginated_data + log(f"Dados (paginados): {len(data)}") + + except Exception as error: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + data = [] + break + + log(f"Request concluído, com status: {data}.") + + return error, data, filetype