Skip to content

Commit

Permalink
Merge branch 'master' into retry-get-raw-ftp
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored May 28, 2024
2 parents a97f0c4 + daf46be commit 8f59b43
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 13 deletions.
Empty file added git
Empty file.
2 changes: 1 addition & 1 deletion pipelines/rj_seconserva/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
Prefect flows for seconserva project
"""

from pipelines.rj_seconserva.dump_db_siscor.flows import *
# from pipelines.rj_seconserva.dump_db_siscor.flows import *
4 changes: 4 additions & 0 deletions pipelines/rj_segovi/dump_db_1746/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
labels=[
constants.RJ_SEGOVI_AGENT_LABEL.value,
],
cpu_limit="500m",
cpu_request="500m",
memory_limit="2Gi",
memory_request="2Gi",
)

_1746_default_parameters = {
Expand Down
18 changes: 18 additions & 0 deletions pipelines/rj_smtr/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Changelog - rj_smtr

## [1.0.1] - 2024-05-22

### Alterado

- Altera `primary_key` da constante `GTFS_TABLE_CAPTURE_PARAMS` relativa a `ordem_servico_trajeto_alternativo` (https://github.com/prefeitura-rio/pipelines/pull/690)

## [1.0.0] - 2024-05-21

### Alterado

- Inclui tratamento específico na task `transform_raw_to_nested_structure` relacionado aos flows `br_rj_riodejaneiro_gtfs` (https://github.com/prefeitura-rio/pipelines/pull/687)
- Altera constantes e parâmetros de captura e materialização relacionados aos flows `br_rj_riodejaneiro_gtfs` (https://github.com/prefeitura-rio/pipelines/pull/687)

### Corrigido

- Corrige erro `pipelines/rj_smtr/tasks.py:116:64: E226 missing whitespace around arithmetic operator` na task `build_incremental_model` (https://github.com/prefeitura-rio/pipelines/pull/687)
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from pipelines.rj_smtr.br_rj_riodejaneiro_stu.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_diretorios.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_viagem_zirix.flows import *
from pipelines.rj_smtr.controle_financeiro.flows import *
18 changes: 18 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.0.1] - 2024-05-22

### Corrigido

- Corrige exclude nos parâmetros da pipeline `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines/pull/689)

## [1.0.0] - 2024-05-17

### Alterado

- Adiciona +servicos no exclude geral dos pipelines de materialização da bilhetagem (https://github.com/prefeitura-rio/pipelines/pull/685)

### Corrigido

- Corrige parametros do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines/pull/685)
- Adiciona tabelas de validação no exclude do flow `bilhetagem_materializacao_gps_validador` (https://github.com/prefeitura-rio/pipelines/pull/685)
2 changes: 1 addition & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@

bilhetagem_validacao_jae = set_default_parameters(
flow=bilhetagem_validacao_jae,
default_parameters=constants.BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_GENERAL_PARAMS.value,
default_parameters=constants.BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS.value,
)

bilhetagem_validacao_jae.schedule = every_day_hour_seven
Expand Down
34 changes: 27 additions & 7 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ class constants(Enum): # pylint: disable=c0103
},
]

BILHETAGEM_EXCLUDE = "+operadoras +consorcios"
BILHETAGEM_EXCLUDE = "+operadoras +consorcios +servicos"

BILHETAGEM_JAE_DASHBOARD_DATASET_ID = "dashboard_bilhetagem_jae"

Expand Down Expand Up @@ -1258,7 +1258,7 @@ class constants(Enum): # pylint: disable=c0103
"dataset_id": BILHETAGEM_DATASET_ID,
"upstream": True,
"downstream": True,
"exclude": BILHETAGEM_EXCLUDE,
"exclude": f"{BILHETAGEM_EXCLUDE} veiculo_validacao veiculo_indicadores_dia",
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_captura",
Expand All @@ -1280,7 +1280,7 @@ class constants(Enum): # pylint: disable=c0103
BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = {
"dataset_id": "validacao_dados_jae",
"upstream": True,
"exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao\
"exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \
+ordem_pagamento_dia +integracao +servicos",
"dbt_vars": {
"run_date": {},
Expand All @@ -1289,7 +1289,7 @@ class constants(Enum): # pylint: disable=c0103
}

# GTFS
GTFS_DATASET_ID = "gtfs"
GTFS_DATASET_ID = "br_rj_riodejaneiro_gtfs"

GTFS_GENERAL_CAPTURE_PARAMS = {
"partition_date_only": True,
Expand Down Expand Up @@ -1346,12 +1346,12 @@ class constants(Enum): # pylint: disable=c0103
},
{
"table_id": "ordem_servico",
"primary_key": ["servico"],
"primary_key": ["servico", "tipo_os"],
"extract_params": {"filename": "ordem_servico"},
},
{
"table_id": "ordem_servico_trajeto_alternativo",
"primary_key": ["servico"],
"primary_key": ["servico", "tipo_os", "evento"],
"extract_params": {"filename": "ordem_servico_trajeto_alternativo"},
},
{
Expand All @@ -1361,7 +1361,7 @@ class constants(Enum): # pylint: disable=c0103
]

GTFS_MATERIALIZACAO_PARAMS = {
"dataset_id": GTFS_DATASET_ID,
"dataset_id": "gtfs",
"dbt_vars": {
"data_versao_gtfs": "",
"version": {},
Expand Down Expand Up @@ -1559,3 +1559,23 @@ class constants(Enum): # pylint: disable=c0103
ZIRIX_API_SECRET_PATH = "zirix_api"
VIAGEM_ZIRIX_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_zirix"
ZIRIX_BASE_URL = "https://integration.systemsatx.com.br/Globalbus/SMTR"

CONTROLE_FINANCEIRO_DATASET_ID = "controle_financeiro"

CONTROLE_FINANCEIRO_BASE_URL = "https://docs.google.com/spreadsheets/d/1QVfa9b8jzpQr3gac0FIlozmTaVeArtJROA343A2lMVM/\
export?format=csv&gid="

CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": CONTROLE_FINANCEIRO_DATASET_ID,
"source_type": "api-csv",
"partition_date_only": True,
}

CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS = {
"extract_params": {"sheet_id": "454453523"},
"table_id": "cb",
}
CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS = {
"extract_params": {"sheet_id": "0"},
"table_id": "cett",
}
7 changes: 7 additions & 0 deletions pipelines/rj_smtr/controle_financeiro/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - controle_financeiro

## [1.0.0] - 2024-05-22

### Adicionado

- Criados flows de captura das planilhas de controle financeiro `cb` e `cett` (https://github.com/prefeitura-rio/pipelines/pull/688)
Empty file.
58 changes: 58 additions & 0 deletions pipelines/rj_smtr/controle_financeiro/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# pylint: disable=W0511
"""
Flows for veiculos
"""


from copy import deepcopy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS

# EMD Imports #

from pipelines.constants import constants as emd_constants


# SMTR Imports #

from pipelines.rj_smtr.flows import (
default_capture_flow,
)
from pipelines.rj_smtr.constants import constants

from pipelines.utils.utils import set_default_parameters
from pipelines.rj_smtr.schedules import every_day


# Flows #

controle_cct_cb_captura = deepcopy(default_capture_flow)
controle_cct_cb_captura.name = "SMTR: Controle Financeiro CB - Captura"
controle_cct_cb_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
controle_cct_cb_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

controle_cct_cb_captura = set_default_parameters(
flow=controle_cct_cb_captura,
default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value
| constants.CONTROLE_FINANCEIRO_CB_CAPTURE_PARAMS.value,
)
controle_cct_cb_captura.schedule = every_day

controle_cct_cett_captura = deepcopy(default_capture_flow)
controle_cct_cett_captura.name = "SMTR: Controle Financeiro CETT - Captura"
controle_cct_cett_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
controle_cct_cett_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

controle_cct_cett_captura = set_default_parameters(
flow=controle_cct_cett_captura,
default_parameters=constants.CONTROLE_FINANCEIRO_CAPTURE_DEFAULT_PARAMS.value
| constants.CONTROLE_FINANCEIRO_CETT_CAPTURE_PARAMS.value,
)
controle_cct_cett_captura.schedule = every_day
19 changes: 17 additions & 2 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments

if refresh:
log("Running in full refresh mode")
log(f"DBT will run the following command:\n{run_command+' --full-refresh'}")
log(f"DBT will run the following command:\n{run_command + ' --full-refresh'}")
dbt_client.cli(run_command + " --full-refresh", sync=True)
last_mat_date = get_table_min_max_value(
query_project_id, dataset_id, mat_table_id, field_name, "max"
Expand Down Expand Up @@ -816,6 +816,10 @@ def create_request_params(
data_inicial: {request_params['data_inicial']}
data_final: {request_params['data_final']}"""
)
elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value:
request_url = (
constants.CONTROLE_FINANCEIRO_BASE_URL.value + extract_params["sheet_id"]
)

return request_params, request_url

Expand Down Expand Up @@ -1455,6 +1459,13 @@ def transform_raw_to_nested_structure(
for col in data.columns[data.dtypes == "object"].to_list():
data[col] = data[col].str.strip()

if (
constants.GTFS_DATASET_ID.value in raw_filepath
and "ordem_servico" in raw_filepath
and "tipo_os" not in data.columns
):
data["tipo_os"] = "Regular"

log(
f"Finished cleaning! Data:\n{data_info_str(data)}", level="info"
)
Expand All @@ -1466,7 +1477,11 @@ def transform_raw_to_nested_structure(
data.groupby(pk_cols)
.apply(
lambda x: x[data.columns.difference(pk_cols)].to_json(
orient="records"
orient="records",
force_ascii=(
constants.CONTROLE_FINANCEIRO_DATASET_ID.value
not in raw_filepath
),
)
)
.str.strip("[]")
Expand Down
12 changes: 10 additions & 2 deletions pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,12 @@ def save_raw_local_func(
json.dump(data, fi, default=custom_serialization)

if filetype in ("txt", "csv"):
with open(_filepath, "w", encoding="utf-8") as file:
if constants.CONTROLE_FINANCEIRO_DATASET_ID.value in _filepath:
encoding = "Windows-1252"
else:
encoding = "utf-8"

with open(_filepath, "w", encoding=encoding) as file:
file.write(data)

log(f"Raw data saved to: {_filepath}")
Expand Down Expand Up @@ -923,7 +928,10 @@ def save_treated_local_func(
_filepath = filepath.format(mode=mode, filetype="csv")
Path(_filepath).parent.mkdir(parents=True, exist_ok=True)
if error is None:
data.to_csv(_filepath, index=False)
data.to_csv(
_filepath,
index=False,
)
log(f"Treated data saved to: {_filepath}")
return _filepath

Expand Down

0 comments on commit 8f59b43

Please sign in to comment.