diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index 0f04c7542..b690b47eb 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - br_rj_riodejaneiro_bilhetagem +## [1.0.2] - 2024-05-28 + +### Alterado + +- Remove schedule do flow `bilhetagem_validacao_jae`, por conta da arquitetura das tabelas que serĂ¡ alterado (https://github.com/prefeitura-rio/pipelines/pull/691) + ## [1.0.1] - 2024-05-22 ### Corrigido diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 4746bdcc9..917ea44d2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -40,7 +40,7 @@ every_hour, every_minute, every_day_hour_five, - every_day_hour_seven, + # every_day_hour_seven, every_5_minutes, ) @@ -322,7 +322,7 @@ default_parameters=constants.BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS.value, ) -bilhetagem_validacao_jae.schedule = every_day_hour_seven +# bilhetagem_validacao_jae.schedule = every_day_hour_seven # RECAPTURA # diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 83208bb11..9187820f8 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -1561,21 +1561,3 @@ class constants(Enum): # pylint: disable=c0103 ZIRIX_BASE_URL = "https://integration.systemsatx.com.br/Globalbus/SMTR" CONTROLE_FINANCEIRO_DATASET_ID = "controle_financeiro" - - CONTROLE_FINANCEIRO_BASE_URL = "https://docs.google.com/spreadsheets/d/1QVfa9b8jzpQr3gac0FIlozmTaVeArtJROA343A2lMVM/\ -export?format=csv&gid=" - - CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS = { - "dataset_id": CONTROLE_FINANCEIRO_DATASET_ID, - "source_type": "api-csv", - "partition_date_only": True, - } - - CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS = { - "extract_params": {"sheet_id": "454453523"}, - "table_id": "cb", - } - CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS = { - "extract_params": {"sheet_id": "0"}, - "table_id": "cett", - } diff --git a/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md b/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md index 829f0554f..d594800ac 100644 --- a/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md +++ b/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog - controle_financeiro + +## [1.1.0] - 2024-05-28 + +### Adicionado + +- Cria flow de captura do arquivo de retorno da Caixa, enviado via api pela CCT (https://github.com/prefeitura-rio/pipelines/pull/691) + +### Modificado + +- Move as constantes dos flows `controle_cct_cb_captura` e `controle_cct_cett_captura` para o arquivo de constantes dentro da pasta do dataset (https://github.com/prefeitura-rio/pipelines/pull/691) + ## [1.0.0] - 2024-05-22 ### Adicionado diff --git a/pipelines/rj_smtr/controle_financeiro/constants.py b/pipelines/rj_smtr/controle_financeiro/constants.py new file mode 100644 index 000000000..8c7cc462f --- /dev/null +++ b/pipelines/rj_smtr/controle_financeiro/constants.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +""" +Constant values for rj_smtr controle_financeiro +""" + +from enum import Enum +from pipelines.rj_smtr.constants import constants as smtr_constants + + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for rj_smtr controle_financeiro + """ + + SHEETS_BASE_URL = "https://docs.google.com/spreadsheets/d/\ +1QVfa9b8jzpQr3gac0FIlozmTaVeArtJROA343A2lMVM/export?format=csv&gid=" + + SHEETS_CAPTURE_DEFAULT_PARAMS = { + "dataset_id": smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value, + "source_type": "api-csv", + "partition_date_only": True, + } + + SHEETS_CB_CAPTURE_PARAMS = { + "extract_params": { + "sheet_id": "454453523", + "base_url": SHEETS_BASE_URL, + }, + "table_id": "cb", + } + SHEETS_CETT_CAPTURE_PARAMS = { + "extract_params": { + "sheet_id": "0", + "base_url": SHEETS_BASE_URL, + }, + "table_id": "cett", + } + + ARQUIVO_RETORNO_TABLE_ID = "arquivo_retorno" + + CCT_API_SECRET_PATH = "cct_api" + CCT_API_BASE_URL = "https://api.cct.mobilidade.rio/api/v1" diff --git a/pipelines/rj_smtr/controle_financeiro/flows.py b/pipelines/rj_smtr/controle_financeiro/flows.py index 3b3b6b40d..a7d9cdb24 100644 --- a/pipelines/rj_smtr/controle_financeiro/flows.py +++ b/pipelines/rj_smtr/controle_financeiro/flows.py @@ -8,21 +8,46 @@ from copy import deepcopy from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect import Parameter # EMD Imports # from pipelines.constants import constants as emd_constants - +from pipelines.utils.decorators import Flow +from pipelines.utils.tasks import ( + rename_current_flow_run_now_time, + get_now_time, + get_current_flow_labels, + get_current_flow_mode, +) +from pipelines.utils.utils import set_default_parameters # SMTR Imports # from pipelines.rj_smtr.flows import ( default_capture_flow, ) -from pipelines.rj_smtr.constants import constants +from pipelines.rj_smtr.tasks import ( + get_current_timestamp, + create_date_hour_partition, + parse_timestamp_to_string, + create_local_partition_path, + upload_raw_data_to_gcs, + transform_raw_to_nested_structure, + upload_staging_data_to_gcs, +) -from pipelines.utils.utils import set_default_parameters -from pipelines.rj_smtr.schedules import every_day +from pipelines.rj_smtr.constants import constants as smtr_constants + +from pipelines.rj_smtr.schedules import every_day, every_friday_seven_thirty + +from pipelines.rj_smtr.controle_financeiro.constants import constants +from pipelines.rj_smtr.controle_financeiro.tasks import ( + get_cct_arquivo_retorno_redis_key, + create_cct_arquivo_retorno_params, + get_raw_cct_arquivo_retorno, + cct_arquivo_retorno_save_redis, +) # Flows # @@ -37,8 +62,8 @@ controle_cct_cb_captura = set_default_parameters( flow=controle_cct_cb_captura, - default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value - | constants.CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS.value, + default_parameters=constants.SHEETS_CAPTURE_DEFAULT_PARAMS.value + | constants.SHEETS_CB_CAPTURE_PARAMS.value, ) controle_cct_cb_captura.schedule = every_day @@ -52,7 +77,91 @@ controle_cct_cett_captura = set_default_parameters( flow=controle_cct_cett_captura, - default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value - | constants.CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS.value, + default_parameters=constants.SHEETS_CAPTURE_DEFAULT_PARAMS.value + | constants.SHEETS_CETT_CAPTURE_PARAMS.value, ) controle_cct_cett_captura.schedule = every_day + + +with Flow( + "SMTR: Controle Financeiro Arquivo Retorno - Captura", + code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"], +) as arquivo_retorno_captura: + start_date = Parameter("start_date", default=None) + end_date = Parameter("end_date", default=None) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=f"SMTR: Captura {constants.ARQUIVO_RETORNO_TABLE_ID.value}: ", + now_time=get_now_time(), + ) + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + + timestamp = get_current_timestamp() + + REDIS_KEY = get_cct_arquivo_retorno_redis_key(mode=MODE) + + headers, params = create_cct_arquivo_retorno_params( + redis_key=REDIS_KEY, + start_date=start_date, + end_date=end_date, + ) + + partitions = create_date_hour_partition( + timestamp, + partition_date_only=True, + ) + + filename = parse_timestamp_to_string(timestamp) + + filepath = create_local_partition_path( + dataset_id=smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value, + table_id=constants.ARQUIVO_RETORNO_TABLE_ID.value, + filename=filename, + partitions=partitions, + ) + + raw_filepath = get_raw_cct_arquivo_retorno( + headers=headers, + params=params, + local_filepath=filepath, + ) + + error = upload_raw_data_to_gcs( + error=None, + raw_filepath=raw_filepath, + table_id=constants.ARQUIVO_RETORNO_TABLE_ID.value, + dataset_id=smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value, + partitions=partitions, + ) + + error, staging_filepath = transform_raw_to_nested_structure( + raw_filepath=raw_filepath, + filepath=filepath, + error=error, + timestamp=timestamp, + primary_key=["id"], + ) + + staging_upload = upload_staging_data_to_gcs( + error=error, + staging_filepath=staging_filepath, + timestamp=timestamp, + table_id=constants.ARQUIVO_RETORNO_TABLE_ID.value, + dataset_id=smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value, + partitions=partitions, + ) + + cct_arquivo_retorno_save_redis( + redis_key=REDIS_KEY, + raw_filepath=raw_filepath, + upstream_tasks=[staging_upload], + ) + +arquivo_retorno_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +arquivo_retorno_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) + +arquivo_retorno_captura.schedule = every_friday_seven_thirty diff --git a/pipelines/rj_smtr/controle_financeiro/tasks.py b/pipelines/rj_smtr/controle_financeiro/tasks.py new file mode 100644 index 000000000..93acf34ae --- /dev/null +++ b/pipelines/rj_smtr/controle_financeiro/tasks.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +""" +tasks for rj-smtr.controle_financeiro +""" + +from typing import Union +from datetime import date +import requests +import pandas as pd +from prefect import task +from pipelines.utils.utils import log, get_redis_client, get_vault_secret + +from pipelines.rj_smtr.constants import constants as smtr_constants +from pipelines.rj_smtr.controle_financeiro.constants import constants +from pipelines.rj_smtr.utils import save_raw_local_func + +from pipelines.rj_smtr.controle_financeiro.utils import get_date_ranges + + +@task +def get_cct_arquivo_retorno_redis_key(mode: str) -> str: + """ + Gets the key to search and store pending dates on Redis + + Args: + mode (str): dev or prod + + Returns: + str: Redis key + """ + return ( + mode + + "." + + smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value + + "." + + constants.ARQUIVO_RETORNO_TABLE_ID.value + ) + + +@task(nout=2) +def create_cct_arquivo_retorno_params( + redis_key: str, start_date: Union[str, None], end_date: Union[str, None] +) -> tuple[dict, list[dict]]: + """ + Create parameters to get data from cct api's arquivoPublicacao + + Args: + redis_key (str): Redis key to get pending dates + start_date (str): Initial data_ordem to filter + end_date (str): Final data_ordem to filter + + Returns: + dict: headers + list[dict]: parameters + """ + auth_resp = requests.post( + f"{constants.CCT_API_BASE_URL.value}/auth/admin/email/login", + data=get_vault_secret(constants.CCT_API_SECRET_PATH.value)["data"], + ) + auth_resp.raise_for_status() + headers = {"Authorization": f"Bearer {auth_resp.json()['token']}"} + + if start_date is not None and end_date is not None: + return headers, [ + { + "dt_inicio": start_date, + "dt_fim": end_date, + } + ] + + redis_client = get_redis_client() + + log(f"Getting pending dates on Redis. key = {redis_key}") + redis_return = redis_client.get(redis_key) + log(f"Got value from Redis: {redis_return}") + + if redis_return is None: + params = [ + { + "dt_inicio": "2024-05-09", + "dt_fim": date.today().isoformat(), + } + ] + + else: + pending_dates = redis_return["pending_dates"] + + params = get_date_ranges( + last_date=redis_return["last_date"], + pending_dates=pending_dates, + ) + + return headers, params + + +@task +def get_raw_cct_arquivo_retorno( + headers: dict, params: list[dict], local_filepath: str +) -> str: + """ + Get data from cct api arquivoPublicacao + + Args: + headers (dict): Request headers + params (list[dict]): List of request query params + local_filepath (str): Path to save the data + + Returns: + str: filepath to raw data + """ + data = [] + url = f"{constants.CCT_API_BASE_URL.value}/cnab/arquivoPublicacao" + for param in params: + log( + f"""Getting raw data: + url: {url}, + params: {param}""" + ) + resp = requests.get( + url, + headers=headers, + params=param, + ) + + resp.raise_for_status() + new_data = resp.json() + + data += new_data + + log(f"returned {len(new_data)} rows") + + return save_raw_local_func(data=data, filepath=local_filepath) + + +@task +def cct_arquivo_retorno_save_redis(redis_key: str, raw_filepath: str): + """ + Set control info on Redis + + Args: + redis_key (str): Key on Redis + raw_filepath (str): Filepath to raw data + """ + df = pd.read_json(raw_filepath) + df["dataOrdem"] = pd.to_datetime(df["dataOrdem"]).dt.strftime("%Y-%m-%d") + all_returned_dates = df["dataOrdem"].unique().tolist() + df = ( + df.groupby( # pylint: disable=E1101 + [ + "idConsorcio", + "idOperadora", + "dataOrdem", + ] + )["isPago"] + .max() + .reset_index() + ) + pending_dates = df.loc[~df["isPago"]]["dataOrdem"].unique().tolist() + + log(f"The API returned the following dates: {sorted(all_returned_dates)}") + log(f"the following dates are not paid: {sorted(pending_dates)}") + + redis_client = get_redis_client() + redis_return = redis_client.get(redis_key) + + if redis_return is None: + redis_return = {} + + redis_return["last_date"] = max( + [df["dataOrdem"].max(), redis_return.get("last_date", "2024-05-09")] + ) + + redis_return["pending_dates"] = pending_dates + [ + d for d in redis_return.get("pending_dates", []) if d not in all_returned_dates + ] + + log( + f""" + Saving values on redis + last_date: {redis_return["last_date"]} + pending_dates: {sorted(redis_return["pending_dates"])} + """ + ) + + redis_client.set(redis_key, redis_return) diff --git a/pipelines/rj_smtr/controle_financeiro/utils.py b/pipelines/rj_smtr/controle_financeiro/utils.py new file mode 100644 index 000000000..7c2006cbd --- /dev/null +++ b/pipelines/rj_smtr/controle_financeiro/utils.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +""" +General purpose functions for rj_smtr.controle_financeiro +""" + +from datetime import date, timedelta +import pandas as pd + + +def get_date_ranges(last_date: str, pending_dates: list[str]) -> list[dict]: + """ + Create date ranges for consecutive dates + + Args: + dates (list[str]): A list of strings representing dates (YYYY-mm-dd) + Returns: + list[dict]: The list of date ranges + """ + initial_date = date.fromisoformat(last_date) + timedelta(days=1) + final_date = date.today() + + if len(pending_dates) == 0: + return [ + { + "dt_inicio": initial_date.isoformat(), + "dt_fim": final_date.isoformat(), + } + ] + + new_dates = [ + d.date().isoformat() + for d in pd.date_range( + initial_date, + final_date, + ) + ] + + dates = sorted(list(set(new_dates + pending_dates))) + + initial_date = dates[0] + final_date = dates[0] + + ranges = [] + if len(dates) > 1: + for i in range(1, len(dates)): + current_date = dates[i] + last_date = dates[i - 1] + + if date.fromisoformat(current_date) == date.fromisoformat( + last_date + ) + timedelta(days=1): + final_date = current_date + else: + ranges.append({"dt_inicio": initial_date, "dt_fim": final_date}) + initial_date = final_date = current_date + + ranges.append({"dt_inicio": initial_date, "dt_fim": final_date}) + return ranges diff --git a/pipelines/rj_smtr/schedules.py b/pipelines/rj_smtr/schedules.py index a8f8ec017..6323d5c98 100644 --- a/pipelines/rj_smtr/schedules.py +++ b/pipelines/rj_smtr/schedules.py @@ -163,3 +163,18 @@ ), ] ) + + +every_friday_seven_thirty = Schedule( + clocks=[ + CronClock( + cron="30 19 * * 5", + start_date=datetime( + 2024, 5, 24, 19, 30, tzinfo=timezone(constants.TIMEZONE.value) + ), + labels=[ + emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value, + ], + ) + ] +) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index c591dfe77..aaa478aef 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -817,9 +817,7 @@ def create_request_params( data_final: {request_params['data_final']}""" ) elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value: - request_url = ( - constants.CONTROLE_FINANCEIRO_BASE_URL.value + extract_params["sheet_id"] - ) + request_url = extract_params["base_url"] + extract_params["sheet_id"] return request_params, request_url