Skip to content

Commit

Permalink
Merge branch 'main' into view-viagem-transacao-monitoramento
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 30, 2024
2 parents 6134cde + ee19094 commit 16835ad
Show file tree
Hide file tree
Showing 30 changed files with 866 additions and 501 deletions.
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
from pipelines.treatment.datario.flows import * # noqa
from pipelines.treatment.monitoramento.flows import * # noqa
from pipelines.treatment.planejamento.flows import * # noqa
from pipelines.treatment.validacao_dados_jae.flows import * # noqa
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.4.9] - 2024-12-30

### Alterado
- Desativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.4.8] - 2024-12-16

### Alterado
Expand Down
5 changes: 2 additions & 3 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
rename_current_flow_run_now_time,
)
from pipelines.migration.utils import set_default_parameters
from pipelines.schedules import (
from pipelines.schedules import ( # every_day_hour_seven,
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -354,7 +353,7 @@
handler_skip_if_running,
]

bilhetagem_validacao_jae.schedule = every_day_hour_seven
# bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
4 changes: 4 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog - gtfs

## [1.1.8] - 2024-12-30

### Alterado
- Exclui modelo `matriz_integracao` da materialização (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.1.7] - 2024-12-13

Expand Down
3 changes: 2 additions & 1 deletion pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@
+ " "
+ constants.PLANEJAMENTO_MATERIALIZACAO_DATASET_ID.value,
_vars=dbt_vars,
exclude="calendario aux_calendario_manual viagem_planejada_planejamento",
exclude="calendario aux_calendario_manual viagem_planejada_planejamento \
matriz_integracao",
).set_upstream(task=wait_captura)

run_dbt_failed = task_value_is_none(wait_run_dbt_model)
Expand Down
6 changes: 6 additions & 0 deletions pipelines/treatment/datario/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - datario

## [1.0.1] - 2024-12-30

### Alterado

- Remove parâmetro `schedule_cron` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.0.0] - 2024-12-16

### Adicionado
Expand Down
2 changes: 0 additions & 2 deletions pipelines/treatment/datario/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.treatment.templates.utils import DBTSelector


Expand All @@ -17,6 +16,5 @@ class constants(Enum): # pylint: disable=c0103

DATARIO_SELECTOR = DBTSelector(
name="datario",
schedule_cron=create_daily_cron(hour=0),
initial_datetime=datetime(2024, 12, 16),
)
6 changes: 6 additions & 0 deletions pipelines/treatment/planejamento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - planejamento

## [1.1.0] - 2024-12-30

### Adicionado

- Cria flow de materialização da matriz de integração da SMTR (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [1.0.1] - 2024-11-25

### Alterado
Expand Down
5 changes: 5 additions & 0 deletions pipelines/treatment/planejamento/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ class constants(Enum): # pylint: disable=c0103
schedule_cron=create_daily_cron(hour=1),
initial_datetime=datetime(2024, 9, 1, 0, 0, 0),
)

MATRIZ_INTEGRACAO_SMTR_SELECTOR = DBTSelector(
name="matriz_integracao_smtr",
initial_datetime=datetime(2024, 12, 30, 0, 0, 0),
)
7 changes: 7 additions & 0 deletions pipelines/treatment/planejamento/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@
selector=constants.PLANEJAMENTO_DIARIO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)

MATRIZ_INTEGRACAO_SMTR_MATERIALIZACAO = create_default_materialization_flow(
flow_name="matriz_integracao_smtr - materializacao",
selector=constants.MATRIZ_INTEGRACAO_SMTR_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
generate_schedule=False,
)
6 changes: 5 additions & 1 deletion pipelines/treatment/templates/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class DBTSelector:
def __init__(
self,
name: str,
schedule_cron: str,
initial_datetime: datetime,
schedule_cron: str = None,
incremental_delay_hours: int = 0,
):
self.name = name
Expand Down Expand Up @@ -98,6 +98,8 @@ def is_up_to_date(self, env: str, timestamp: datetime) -> bool:
Returns:
bool: se está atualizado ou não
"""
if self.schedule_cron is None:
raise ValueError("O selector não possui agendamento")
last_materialization = self.get_last_materialized_datetime(env=env)

last_schedule = cron_get_last_date(cron_expr=self.schedule_cron, timestamp=timestamp)
Expand All @@ -115,6 +117,8 @@ def get_next_schedule_datetime(self, timestamp: datetime) -> datetime:
Returns:
datetime: próximo datetime do cron
"""
if self.schedule_cron is None:
raise ValueError("O selector não possui agendamento")
return cron_get_next_date(cron_expr=self.schedule_cron, timestamp=timestamp)

def set_redis_materialized_datetime(self, env: str, timestamp: datetime):
Expand Down
7 changes: 7 additions & 0 deletions pipelines/treatment/validacao_dados_jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - validacao_dados_jae

## [1.0.0] - 2024-12-30

### Adicionado

- Cria flow de materialização da validação dos dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)
Empty file.
22 changes: 22 additions & 0 deletions pipelines/treatment/validacao_dados_jae/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para materialização da validação dos dados da Jaé
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.treatment.templates.utils import DBTSelector


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para materialização da validação dos dados da Jaé
"""

VALIDACAO_DADOS_JAE_SELECTOR = DBTSelector(
name="validacao_dados_jae",
schedule_cron=create_daily_cron(hour=7),
initial_datetime=datetime(2024, 12, 30, 0, 0, 0),
)
46 changes: 46 additions & 0 deletions pipelines/treatment/validacao_dados_jae/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
Flows de tratamento dos dados da validação dos dados da Jaé
"""

from pipelines.constants import constants as smtr_constants
from pipelines.migration.br_rj_riodejaneiro_bilhetagem.constants import (
constants as bilhetagem_constants,
)
from pipelines.schedules import create_daily_cron, create_hourly_cron
from pipelines.treatment.templates.flows import create_default_materialization_flow
from pipelines.treatment.validacao_dados_jae.constants import constants

transacao_materializacao_params = (
bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value
)

integracao_materializacao_params = (
bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS.value
)

VALIDACAO_DADOS_JAE_MATERIALIZACAO = create_default_materialization_flow(
flow_name="validacao_dados_jae - materializacao",
selector=constants.VALIDACAO_DADOS_JAE_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
{
"redis_key": f"{integracao_materializacao_params['dataset_id']}\
.{integracao_materializacao_params['table_id']}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": integracao_materializacao_params["dbt_vars"]["date_range"][
"delay_hours"
],
"schedule_cron": create_daily_cron(hour=5),
},
{
"redis_key": f"{transacao_materializacao_params['dataset_id']}\
.{transacao_materializacao_params['table_id']}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": transacao_materializacao_params["dbt_vars"]["date_range"]["delay_hours"],
"schedule_cron": create_hourly_cron(),
},
],
)
9 changes: 9 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog - bilhetagem

## [2.0.2] - 2024-12-30

### Removido
- Move `matriz_integracao.sql` para o planejamento (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)


### Alterado
- Muda deduplicação de integração para considerar integrações diferentes que compartilham transações nos modelos `transacao.sql` e `integracao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371)

## [2.0.1] - 2024-12-12

### Alterado
Expand Down
Loading

0 comments on commit 16835ad

Please sign in to comment.