Skip to content

Commit

Permalink
Cria validação dos dados de integração da Jaé com base na Matriz da S…
Browse files Browse the repository at this point in the history
…MTR (#371)

* cria nova validação de integrações

* add data_inicio_matriz

* cria tabela matriz_integracao com base na matriz da smtr

* corrige integração com transação duplicada

* cria validação das integrações usando a matriz da smtr

* cria flow de materialização da nova validação dos dados da Jaé

* schedule_cron opcional

* cria materialização da matriz de integração da smtr

* add matriz_integracao_smtr / add validacao_dados_jae

* add validacao_dados_jae

* wait integracao e transacao

* add change log pr 371

* ajusta schedule

* corrige mensagem de erro

* descomenta refs

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Dec 30, 2024
1 parent 3081fd0 commit ee19094
Show file tree
Hide file tree
Showing 30 changed files with 866 additions and 501 deletions.
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
from pipelines.treatment.datario.flows import * # noqa
from pipelines.treatment.monitoramento.flows import * # noqa
from pipelines.treatment.planejamento.flows import * # noqa
from pipelines.treatment.validacao_dados_jae.flows import * # noqa
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.4.9] - 2024-12-30

### Alterado
- Desativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.4.8] - 2024-12-16

### Alterado
Expand Down
5 changes: 2 additions & 3 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
rename_current_flow_run_now_time,
)
from pipelines.migration.utils import set_default_parameters
from pipelines.schedules import (
from pipelines.schedules import ( # every_day_hour_seven,
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -354,7 +353,7 @@
handler_skip_if_running,
]

bilhetagem_validacao_jae.schedule = every_day_hour_seven
# bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
4 changes: 4 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog - gtfs

## [1.1.8] - 2024-12-30

### Alterado
- Exclui modelo `matriz_integracao` da materialização (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.1.7] - 2024-12-13

Expand Down
3 changes: 2 additions & 1 deletion pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@
+ " "
+ constants.PLANEJAMENTO_MATERIALIZACAO_DATASET_ID.value,
_vars=dbt_vars,
exclude="calendario aux_calendario_manual viagem_planejada_planejamento",
exclude="calendario aux_calendario_manual viagem_planejada_planejamento \
matriz_integracao",
).set_upstream(task=wait_captura)

run_dbt_failed = task_value_is_none(wait_run_dbt_model)
Expand Down
6 changes: 6 additions & 0 deletions pipelines/treatment/datario/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - datario

## [1.0.1] - 2024-12-30

### Alterado

- Remove parâmetro `schedule_cron` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.0.0] - 2024-12-16

### Adicionado
Expand Down
2 changes: 0 additions & 2 deletions pipelines/treatment/datario/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.treatment.templates.utils import DBTSelector


Expand All @@ -17,6 +16,5 @@ class constants(Enum): # pylint: disable=c0103

DATARIO_SELECTOR = DBTSelector(
name="datario",
schedule_cron=create_daily_cron(hour=0),
initial_datetime=datetime(2024, 12, 16),
)
6 changes: 6 additions & 0 deletions pipelines/treatment/planejamento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - planejamento

## [1.1.0] - 2024-12-30

### Adicionado

- Cria flow de materialização da matriz de integração da SMTR (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.0.1] - 2024-11-25

### Alterado
Expand Down
5 changes: 5 additions & 0 deletions pipelines/treatment/planejamento/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ class constants(Enum): # pylint: disable=c0103
schedule_cron=create_daily_cron(hour=1),
initial_datetime=datetime(2024, 9, 1, 0, 0, 0),
)

MATRIZ_INTEGRACAO_SMTR_SELECTOR = DBTSelector(
name="matriz_integracao_smtr",
initial_datetime=datetime(2024, 12, 30, 0, 0, 0),
)
7 changes: 7 additions & 0 deletions pipelines/treatment/planejamento/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@
selector=constants.PLANEJAMENTO_DIARIO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)

MATRIZ_INTEGRACAO_SMTR_MATERIALIZACAO = create_default_materialization_flow(
flow_name="matriz_integracao_smtr - materializacao",
selector=constants.MATRIZ_INTEGRACAO_SMTR_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
generate_schedule=False,
)
6 changes: 5 additions & 1 deletion pipelines/treatment/templates/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class DBTSelector:
def __init__(
self,
name: str,
schedule_cron: str,
initial_datetime: datetime,
schedule_cron: str = None,
incremental_delay_hours: int = 0,
):
self.name = name
Expand Down Expand Up @@ -98,6 +98,8 @@ def is_up_to_date(self, env: str, timestamp: datetime) -> bool:
Returns:
bool: se está atualizado ou não
"""
if self.schedule_cron is None:
raise ValueError("O selector não possui agendamento")
last_materialization = self.get_last_materialized_datetime(env=env)

last_schedule = cron_get_last_date(cron_expr=self.schedule_cron, timestamp=timestamp)
Expand All @@ -115,6 +117,8 @@ def get_next_schedule_datetime(self, timestamp: datetime) -> datetime:
Returns:
datetime: próximo datetime do cron
"""
if self.schedule_cron is None:
raise ValueError("O selector não possui agendamento")
return cron_get_next_date(cron_expr=self.schedule_cron, timestamp=timestamp)

def set_redis_materialized_datetime(self, env: str, timestamp: datetime):
Expand Down
7 changes: 7 additions & 0 deletions pipelines/treatment/validacao_dados_jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - validacao_dados_jae

## [1.0.0] - 2024-12-30

### Adicionado

- Cria flow de materialização da validação dos dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)
Empty file.
22 changes: 22 additions & 0 deletions pipelines/treatment/validacao_dados_jae/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para materialização da validação dos dados da Jaé
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.treatment.templates.utils import DBTSelector


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para materialização da validação dos dados da Jaé
"""

VALIDACAO_DADOS_JAE_SELECTOR = DBTSelector(
name="validacao_dados_jae",
schedule_cron=create_daily_cron(hour=7),
initial_datetime=datetime(2024, 12, 30, 0, 0, 0),
)
46 changes: 46 additions & 0 deletions pipelines/treatment/validacao_dados_jae/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
Flows de tratamento dos dados da validação dos dados da Jaé
"""

from pipelines.constants import constants as smtr_constants
from pipelines.migration.br_rj_riodejaneiro_bilhetagem.constants import (
constants as bilhetagem_constants,
)
from pipelines.schedules import create_daily_cron, create_hourly_cron
from pipelines.treatment.templates.flows import create_default_materialization_flow
from pipelines.treatment.validacao_dados_jae.constants import constants

transacao_materializacao_params = (
bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value
)

integracao_materializacao_params = (
bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS.value
)

VALIDACAO_DADOS_JAE_MATERIALIZACAO = create_default_materialization_flow(
flow_name="validacao_dados_jae - materializacao",
selector=constants.VALIDACAO_DADOS_JAE_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
{
"redis_key": f"{integracao_materializacao_params['dataset_id']}\
.{integracao_materializacao_params['table_id']}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": integracao_materializacao_params["dbt_vars"]["date_range"][
"delay_hours"
],
"schedule_cron": create_daily_cron(hour=5),
},
{
"redis_key": f"{transacao_materializacao_params['dataset_id']}\
.{transacao_materializacao_params['table_id']}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": transacao_materializacao_params["dbt_vars"]["date_range"]["delay_hours"],
"schedule_cron": create_hourly_cron(),
},
],
)
9 changes: 9 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog - bilhetagem

## [2.0.2] - 2024-12-30

### Removido
- Move `matriz_integracao.sql` para o planejamento (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)


### Alterado
- Muda deduplicação de integração para considerar integrações diferentes que compartilham transações nos modelos `transacao.sql` e `integracao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [2.0.1] - 2024-12-12

### Alterado
Expand Down
Loading

0 comments on commit ee19094

Please sign in to comment.