diff --git a/git b/git new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_seconserva/__init__.py b/pipelines/rj_seconserva/__init__.py index d1d292936..042230c88 100644 --- a/pipelines/rj_seconserva/__init__.py +++ b/pipelines/rj_seconserva/__init__.py @@ -3,4 +3,4 @@ Prefect flows for seconserva project """ -from pipelines.rj_seconserva.dump_db_siscor.flows import * +# from pipelines.rj_seconserva.dump_db_siscor.flows import * diff --git a/pipelines/rj_segovi/dump_db_1746/flows.py b/pipelines/rj_segovi/dump_db_1746/flows.py index aab0b90be..32f18db19 100644 --- a/pipelines/rj_segovi/dump_db_1746/flows.py +++ b/pipelines/rj_segovi/dump_db_1746/flows.py @@ -25,6 +25,10 @@ labels=[ constants.RJ_SEGOVI_AGENT_LABEL.value, ], + cpu_limit="500m", + cpu_request="500m", + memory_limit="2Gi", + memory_request="2Gi", ) _1746_default_parameters = { diff --git a/pipelines/rj_smtr/CHANGELOG.md b/pipelines/rj_smtr/CHANGELOG.md new file mode 100644 index 000000000..3c003d919 --- /dev/null +++ b/pipelines/rj_smtr/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog - rj_smtr + +## [1.0.1] - 2024-05-22 + +### Alterado + +- Altera `primary_key` da constante `GTFS_TABLE_CAPTURE_PARAMS` relativa a `ordem_servico_trajeto_alternativo` (https://github.com/prefeitura-rio/pipelines/pull/690) + +## [1.0.0] - 2024-05-21 + +### Alterado + +- Inclui tratamento específico na task `transform_raw_to_nested_structure` relacionado aos flows `br_rj_riodejaneiro_gtfs` (https://github.com/prefeitura-rio/pipelines/pull/687) +- Altera constantes e parâmetros de captura e materialização relacionados aos flows `br_rj_riodejaneiro_gtfs` (https://github.com/prefeitura-rio/pipelines/pull/687) + +### Corrigido + +- Corrige erro `pipelines/rj_smtr/tasks.py:116:64: E226 missing whitespace around arithmetic operator` na task `build_incremental_model` (https://github.com/prefeitura-rio/pipelines/pull/687) \ No newline at end of file diff --git a/pipelines/rj_smtr/__init__.py b/pipelines/rj_smtr/__init__.py index 43dd7f178..0e3ddbbfe 100644 --- a/pipelines/rj_smtr/__init__.py +++ b/pipelines/rj_smtr/__init__.py @@ -23,3 +23,4 @@ from pipelines.rj_smtr.br_rj_riodejaneiro_stu.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_diretorios.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_viagem_zirix.flows import * +from pipelines.rj_smtr.controle_financeiro.flows import * diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md new file mode 100644 index 000000000..0f04c7542 --- /dev/null +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog - br_rj_riodejaneiro_bilhetagem + +## [1.0.1] - 2024-05-22 + +### Corrigido + +- Corrige exclude nos parâmetros da pipeline `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines/pull/689) + +## [1.0.0] - 2024-05-17 + +### Alterado + +- Adiciona +servicos no exclude geral dos pipelines de materialização da bilhetagem (https://github.com/prefeitura-rio/pipelines/pull/685) + +### Corrigido + +- Corrige parametros do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines/pull/685) +- Adiciona tabelas de validação no exclude do flow `bilhetagem_materializacao_gps_validador` (https://github.com/prefeitura-rio/pipelines/pull/685) \ No newline at end of file diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index cf5580e61..4746bdcc9 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -319,7 +319,7 @@ bilhetagem_validacao_jae = set_default_parameters( flow=bilhetagem_validacao_jae, - default_parameters=constants.BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_GENERAL_PARAMS.value, + default_parameters=constants.BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS.value, ) bilhetagem_validacao_jae.schedule = every_day_hour_seven diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index a6a424597..83208bb11 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -1197,7 +1197,7 @@ class constants(Enum): # pylint: disable=c0103 }, ] - BILHETAGEM_EXCLUDE = "+operadoras +consorcios" + BILHETAGEM_EXCLUDE = "+operadoras +consorcios +servicos" BILHETAGEM_JAE_DASHBOARD_DATASET_ID = "dashboard_bilhetagem_jae" @@ -1258,7 +1258,7 @@ class constants(Enum): # pylint: disable=c0103 "dataset_id": BILHETAGEM_DATASET_ID, "upstream": True, "downstream": True, - "exclude": BILHETAGEM_EXCLUDE, + "exclude": f"{BILHETAGEM_EXCLUDE} veiculo_validacao veiculo_indicadores_dia", "dbt_vars": { "date_range": { "table_run_datetime_column_name": "datetime_captura", @@ -1280,7 +1280,7 @@ class constants(Enum): # pylint: disable=c0103 BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = { "dataset_id": "validacao_dados_jae", "upstream": True, - "exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao\ + "exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \ +ordem_pagamento_dia +integracao +servicos", "dbt_vars": { "run_date": {}, @@ -1289,7 +1289,7 @@ class constants(Enum): # pylint: disable=c0103 } # GTFS - GTFS_DATASET_ID = "gtfs" + GTFS_DATASET_ID = "br_rj_riodejaneiro_gtfs" GTFS_GENERAL_CAPTURE_PARAMS = { "partition_date_only": True, @@ -1346,12 +1346,12 @@ class constants(Enum): # pylint: disable=c0103 }, { "table_id": "ordem_servico", - "primary_key": ["servico"], + "primary_key": ["servico", "tipo_os"], "extract_params": {"filename": "ordem_servico"}, }, { "table_id": "ordem_servico_trajeto_alternativo", - "primary_key": ["servico"], + "primary_key": ["servico", "tipo_os", "evento"], "extract_params": {"filename": "ordem_servico_trajeto_alternativo"}, }, { @@ -1361,7 +1361,7 @@ class constants(Enum): # pylint: disable=c0103 ] GTFS_MATERIALIZACAO_PARAMS = { - "dataset_id": GTFS_DATASET_ID, + "dataset_id": "gtfs", "dbt_vars": { "data_versao_gtfs": "", "version": {}, @@ -1559,3 +1559,23 @@ class constants(Enum): # pylint: disable=c0103 ZIRIX_API_SECRET_PATH = "zirix_api" VIAGEM_ZIRIX_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_zirix" ZIRIX_BASE_URL = "https://integration.systemsatx.com.br/Globalbus/SMTR" + + CONTROLE_FINANCEIRO_DATASET_ID = "controle_financeiro" + + CONTROLE_FINANCEIRO_BASE_URL = "https://docs.google.com/spreadsheets/d/1QVfa9b8jzpQr3gac0FIlozmTaVeArtJROA343A2lMVM/\ +export?format=csv&gid=" + + CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS = { + "dataset_id": CONTROLE_FINANCEIRO_DATASET_ID, + "source_type": "api-csv", + "partition_date_only": True, + } + + CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS = { + "extract_params": {"sheet_id": "454453523"}, + "table_id": "cb", + } + CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS = { + "extract_params": {"sheet_id": "0"}, + "table_id": "cett", + } diff --git a/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md b/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md new file mode 100644 index 000000000..829f0554f --- /dev/null +++ b/pipelines/rj_smtr/controle_financeiro/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - controle_financeiro + +## [1.0.0] - 2024-05-22 + +### Adicionado + +- Criados flows de captura das planilhas de controle financeiro `cb` e `cett` (https://github.com/prefeitura-rio/pipelines/pull/688) \ No newline at end of file diff --git a/pipelines/rj_smtr/controle_financeiro/__init__.py b/pipelines/rj_smtr/controle_financeiro/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_smtr/controle_financeiro/flows.py b/pipelines/rj_smtr/controle_financeiro/flows.py new file mode 100644 index 000000000..3b3b6b40d --- /dev/null +++ b/pipelines/rj_smtr/controle_financeiro/flows.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# pylint: disable=W0511 +""" +Flows for veiculos +""" + + +from copy import deepcopy +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +# EMD Imports # + +from pipelines.constants import constants as emd_constants + + +# SMTR Imports # + +from pipelines.rj_smtr.flows import ( + default_capture_flow, +) +from pipelines.rj_smtr.constants import constants + +from pipelines.utils.utils import set_default_parameters +from pipelines.rj_smtr.schedules import every_day + + +# Flows # + +controle_cct_cb_captura = deepcopy(default_capture_flow) +controle_cct_cb_captura.name = "SMTR: Controle Financeiro CB - Captura" +controle_cct_cb_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +controle_cct_cb_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +controle_cct_cb_captura = set_default_parameters( + flow=controle_cct_cb_captura, + default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value + | constants.CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS.value, +) +controle_cct_cb_captura.schedule = every_day + +controle_cct_cett_captura = deepcopy(default_capture_flow) +controle_cct_cett_captura.name = "SMTR: Controle Financeiro CETT - Captura" +controle_cct_cett_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +controle_cct_cett_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +controle_cct_cett_captura = set_default_parameters( + flow=controle_cct_cett_captura, + default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value + | constants.CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS.value, +) +controle_cct_cett_captura.schedule = every_day diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index d1f86d6bc..c591dfe77 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -113,7 +113,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments if refresh: log("Running in full refresh mode") - log(f"DBT will run the following command:\n{run_command+' --full-refresh'}") + log(f"DBT will run the following command:\n{run_command + ' --full-refresh'}") dbt_client.cli(run_command + " --full-refresh", sync=True) last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max" @@ -816,6 +816,10 @@ def create_request_params( data_inicial: {request_params['data_inicial']} data_final: {request_params['data_final']}""" ) + elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value: + request_url = ( + constants.CONTROLE_FINANCEIRO_BASE_URL.value + extract_params["sheet_id"] + ) return request_params, request_url @@ -1455,6 +1459,13 @@ def transform_raw_to_nested_structure( for col in data.columns[data.dtypes == "object"].to_list(): data[col] = data[col].str.strip() + if ( + constants.GTFS_DATASET_ID.value in raw_filepath + and "ordem_servico" in raw_filepath + and "tipo_os" not in data.columns + ): + data["tipo_os"] = "Regular" + log( f"Finished cleaning! Data:\n{data_info_str(data)}", level="info" ) @@ -1466,7 +1477,11 @@ def transform_raw_to_nested_structure( data.groupby(pk_cols) .apply( lambda x: x[data.columns.difference(pk_cols)].to_json( - orient="records" + orient="records", + force_ascii=( + constants.CONTROLE_FINANCEIRO_DATASET_ID.value + not in raw_filepath + ), ) ) .str.strip("[]") diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 47275bc61..735b80c14 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -627,7 +627,12 @@ def save_raw_local_func( json.dump(data, fi, default=custom_serialization) if filetype in ("txt", "csv"): - with open(_filepath, "w", encoding="utf-8") as file: + if constants.CONTROLE_FINANCEIRO_DATASET_ID.value in _filepath: + encoding = "Windows-1252" + else: + encoding = "utf-8" + + with open(_filepath, "w", encoding=encoding) as file: file.write(data) log(f"Raw data saved to: {_filepath}") @@ -923,7 +928,10 @@ def save_treated_local_func( _filepath = filepath.format(mode=mode, filetype="csv") Path(_filepath).parent.mkdir(parents=True, exist_ok=True) if error is None: - data.to_csv(_filepath, index=False) + data.to_csv( + _filepath, + index=False, + ) log(f"Treated data saved to: {_filepath}") return _filepath