Skip to content

Commit

Permalink
Merge branch 'main' into staging/flow-serpro
Browse files Browse the repository at this point in the history
  • Loading branch information
akaBotelho authored Jan 6, 2025
2 parents b11da43 + 753f4d5 commit 047345d
Show file tree
Hide file tree
Showing 82 changed files with 2,763 additions and 659 deletions.
6 changes: 6 additions & 0 deletions pipelines/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - pipelines

## [1.0.1] - 2024-12-13

### Adicionado

- Adiciona task `check_fail` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

## [1.0.0] - 2024-09-09

### Alterado
Expand Down
6 changes: 6 additions & 0 deletions pipelines/capture/jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - source_jae

## [1.1.0] - 2024-12-19

### Adicionado

- Cria flow `verificacao_ip` para alertar sobre falhas de conexão com o banco de dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/366)

## [1.0.1] - 2024-12-16

### Alterado
Expand Down
2 changes: 2 additions & 0 deletions pipelines/capture/jae/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@ class constants(Enum): # pylint: disable=c0103
"data_transacao",
],
)

ALERT_WEBHOOK = "alertas_bilhetagem"
36 changes: 35 additions & 1 deletion pipelines/capture/jae/flows.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
# -*- coding: utf-8 -*-
"""Flows de captura dos dados da Jaé"""
from prefect import case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.capture.jae.constants import constants
from pipelines.capture.jae.tasks import create_jae_general_extractor
from pipelines.capture.jae.tasks import (
create_database_error_discord_message,
create_jae_general_extractor,
test_jae_databases_connections,
)
from pipelines.capture.templates.flows import create_default_capture_flow
from pipelines.constants import constants as smtr_constants
from pipelines.schedules import every_hour
from pipelines.tasks import log_discord
from pipelines.utils.prefect import set_default_parameters

CAPTURA_TRANSACAO_ORDEM = create_default_capture_flow(
Expand All @@ -13,3 +28,22 @@
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)
set_default_parameters(CAPTURA_TRANSACAO_ORDEM, {"recapture": True})

with Flow("jae: verifica ip do banco de dados") as verificacao_ip:
success, failed_connections = test_jae_databases_connections()
with case(success, False):
message = create_database_error_discord_message(failed_connections=failed_connections)
send_discord_message = log_discord(
message=message,
key=constants.ALERT_WEBHOOK.value,
dados_tag=True,
)
verificacao_ip.set_reference_tasks(tasks=[send_discord_message, success])

verificacao_ip.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
verificacao_ip.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
verificacao_ip.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
verificacao_ip.schedule = every_hour
43 changes: 43 additions & 0 deletions pipelines/capture/jae/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pipelines.capture.jae.constants import constants
from pipelines.constants import constants as smtr_constants
from pipelines.utils.database import test_database_connection
from pipelines.utils.extractors.db import get_raw_db
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.secret import get_secret
Expand Down Expand Up @@ -43,3 +44,45 @@ def create_jae_general_extractor(source: SourceTable, timestamp: datetime):
password=credentials["password"],
database=database_name,
)


@task(nout=2)
def test_jae_databases_connections() -> tuple[bool, list[str]]:
"""
Testa a conexão com os bancos de dados da Jaé
Returns:
bool: Se todas as conexões foram bem-sucedidas ou não
list[str]: Lista com os nomes dos bancos de dados com falha de conexão
"""
credentials = get_secret(constants.JAE_SECRET_PATH.value)
failed_connections = []
for database_name, database in constants.JAE_DATABASE_SETTINGS.value.items():
success, _ = test_database_connection(
engine=database["engine"],
host=database["host"],
user=credentials["user"],
password=credentials["password"],
database=database_name,
)
if not success:
failed_connections.append(database_name)

return len(failed_connections) == 0, failed_connections


@task
def create_database_error_discord_message(failed_connections: list[str]) -> str:
"""
Cria a mensagem para ser enviada no Discord caso haja
problemas de conexão com os bancos da Jaé
Args:
failed_connections (list[str]): Lista com os nomes dos bancos de dados com falha de conexão
Returns:
str: Mensagem
"""
message = "Falha de conexão com o(s) banco(s) de dados:\n"
failed_connections = "\n".join(failed_connections)
message += failed_connections
return message + "\n"
2 changes: 2 additions & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@
from pipelines.migration.veiculo.flows import * # noqa
from pipelines.serpro.flows import * # noqa
from pipelines.treatment.bilhetagem.flows import * # noqa
from pipelines.treatment.datario.flows import * # noqa
from pipelines.treatment.monitoramento.flows import * # noqa
from pipelines.treatment.planejamento.flows import * # noqa
from pipelines.treatment.validacao_dados_jae.flows import * # noqa
11 changes: 5 additions & 6 deletions pipelines/janitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@


@task
def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_project="production"):
def query_active_flow_names(prefect_client=None, prefect_project="production"):
query = """
query ($prefix: String, $offset: Int, $project_name: String){
query ($offset: Int, $project_name: String){
flow(
where: {
name: {_like: $prefix},
archived: {_eq: false},
project: {name:{_eq: $project_name}}
}
Expand All @@ -32,7 +31,7 @@ def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_projec
"""
if not prefect_client:
prefect_client = Client()
variables = {"prefix": prefix, "offset": 0, "project_name": prefect_project}
variables = {"offset": 0, "project_name": prefect_project}
# flow_names = []
response = prefect_client.graphql(query=query, variables=variables)["data"]
active_flows = []
Expand Down Expand Up @@ -137,8 +136,8 @@ def get_prefect_client():


@task
def get_active_flow_names(prefix="%SMTR%"):
flow_names = query_active_flow_names(prefix=prefix)
def get_active_flow_names():
flow_names = query_active_flow_names()
log(f"Got flow_names\n{flow_names[:10]}\n...\n{flow_names[-10:-1]}")
return flow_names

Expand Down
7 changes: 7 additions & 0 deletions pipelines/migration/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - migration

## [1.0.3] - 2024-12-13

### Adicionado

- Adiciona a task `transform_raw_to_nested_structure_chunked` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)
- Adiciona parâmetros `log_param` e `args` na função `save_treated_local_func` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

## [1.0.2] - 2024-09-11

### Adicionado
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.4.9] - 2024-12-30

### Alterado
- Desativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.4.8] - 2024-12-16

### Alterado
Expand Down
7 changes: 3 additions & 4 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-12-12
DBT: 2025-01-06
"""

from copy import deepcopy
Expand Down Expand Up @@ -33,10 +33,9 @@
rename_current_flow_run_now_time,
)
from pipelines.migration.utils import set_default_parameters
from pipelines.schedules import (
from pipelines.schedules import ( # every_day_hour_seven,
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -354,7 +353,7 @@
handler_skip_if_running,
]

bilhetagem_validacao_jae.schedule = every_day_hour_seven
# bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
37 changes: 37 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,42 @@
# Changelog - gtfs

## [1.2.0] - 2025-01-03

### Adicionado
- Adicionado schedule de 5 minutos do flow de captura do gtfs (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

- Adicionado parâmetros personalizados de execução no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

### Removido
- Removido o teste de quantidade de abas na planilha da Ordem de Serviço (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

## [1.1.9] - 2025-01-02

### Alterado
- Remove teste de verificação de quilometragem da task `processa_ordem_servico` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/377)

## [1.1.8] - 2024-12-30

### Alterado
- Exclui modelo `matriz_integracao` da materialização (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.1.7] - 2024-12-13

### Adicionado

- Cria arquivo `constants.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)
- Adiciona automação dos testes do DBT no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)
- Adiciona compatibilidade com padrão "KM" na função `processa_ordem_servico` no arquivo `utils.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

### Alterado

- Remove parâmetros personalizados de execução no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)
- Troca task `transform_raw_to_nested_structure` pela `transform_raw_to_nested_structure_chunked` no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

### Corrigido

- Corrige parâmetro `supportsAllDrives` na função `download_xlsx` arquivo `utils.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

## [1.1.6] - 2024-12-04

- Adiciona o modelo `viagem_planejada_planejamento` no exclude da materialização (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/352)
Expand Down
36 changes: 36 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
Constant values for rj_smtr br_rj_riodejaneiro_gtfs
"""

from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Constant values for rj_smtr br_rj_riodejaneiro_gtfs
"""

GTFS_DATA_CHECKS_LIST = {
"calendar_gtfs": {
"dbt_expectations.expect_column_values_to_match_regex__service_id__calendar_gtfs": {
"description": "Todos os 'service\\_id' começam com 'U\\_', 'S\\_', 'D\\_' ou 'EXCEP'." # noqa
},
},
"ordem_servico_trajeto_alternativo_gtfs": {
"dbt_expectations.expect_table_aggregation_to_equal_other_table__ordem_servico_trajeto_alternativo_gtfs": { # noqa
"description": "Todos os dados de 'feed_start_date' e 'tipo_os' correspondem 1:1 entre as tabelas 'ordem_servico_trajeto_alternativo_gtfs' e 'ordem_servico_gtfs'." # noqa
},
},
"ordem_servico_trips_shapes_gtfs": {
"dbt_expectations.expect_table_aggregation_to_equal_other_table__ordem_servico_trips_shapes_gtfs": { # noqa
"description": "Todos os dados de 'feed_start_date', 'tipo_os', 'tipo_dia', 'servico' e 'faixa_horaria_inicio' correspondem 1:1 entre as tabelas 'ordem_servico_trips_shapes_gtfs' e 'ordem_servico_faixa_horaria'." # noqa
},
"dbt_utils.unique_combination_of_columns__ordem_servico_trips_shapes_gtfs": {
"description": "Todos os dados de 'feed_start_date', 'tipo_dia', 'tipo_os', 'servico', 'sentido', 'faixa_horaria_inicio' e 'shape_id' são únicos." # noqa
},
"dbt_expectations.expect_table_row_count_to_be_between__ordem_servico_trips_shapes_gtfs": { # noqa
"description": "A quantidade de registros de 'feed_start_date', 'tipo_dia', 'tipo_os', 'servico', 'faixa_horaria_inicio' e 'shape_id' está dentro do intervalo esperado." # noqa
},
},
}
Loading

0 comments on commit 047345d

Please sign in to comment.