Skip to content

Commit

Permalink
Merge branch 'main' into add_format_sql
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 19, 2024
2 parents 359f68c + af22590 commit f761dcb
Show file tree
Hide file tree
Showing 121 changed files with 5,012 additions and 1,194 deletions.
6 changes: 6 additions & 0 deletions pipelines/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - pipelines

## [1.0.1] - 2024-12-13

### Adicionado

- Adiciona task `check_fail` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/359)

## [1.0.0] - 2024-09-09

### Alterado
Expand Down
18 changes: 18 additions & 0 deletions pipelines/capture/jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Changelog - source_jae

## [1.1.0] - 2024-12-19

### Adicionado

- Cria flow `verificacao_ip` para alertar sobre falhas de conexão com o banco de dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/366)

## [1.0.1] - 2024-12-16

### Alterado
- Altera IP do banco de tracking da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/362)

## [1.0.0] - 2024-11-25

### Adicionado

- Cria flow de captura da relação entre transação e ordem de pagamento
Empty file.
89 changes: 89 additions & 0 deletions pipelines/capture/jae/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para captura de dados da Jaé
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.utils.gcp.bigquery import SourceTable


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para captura de dados da Jaé
"""

JAE_SOURCE_NAME = "jae"

JAE_DATABASE_SETTINGS = {
"principal_db": {
"engine": "mysql",
"host": "10.5.114.227",
},
"tarifa_db": {
"engine": "postgresql",
"host": "10.5.113.254",
},
"transacao_db": {
"engine": "postgresql",
"host": "10.5.115.1",
},
"tracking_db": {
"engine": "postgresql",
"host": "10.5.12.67",
},
"ressarcimento_db": {
"engine": "postgresql",
"host": "10.5.12.50",
},
"gratuidade_db": {
"engine": "postgresql",
"host": "10.5.12.107",
},
"fiscalizacao_db": {
"engine": "postgresql",
"host": "10.5.115.29",
},
}

JAE_SECRET_PATH = "smtr_jae_access_data"

TRANSACAO_ORDEM_TABLE_ID = "transacao_ordem"

JAE_TABLE_CAPTURE_PARAMS = {
TRANSACAO_ORDEM_TABLE_ID: {
"query": """
SELECT
id,
id_ordem_ressarcimento,
data_processamento,
data_transacao
FROM
transacao
WHERE
DATE(data_processamento) >= DATE('{start}')
AND DATE(data_processamento) <= DATE('{end}')
AND id_ordem_ressarcimento IS NOT NULL
""",
"database": "transacao_db",
}
}

TRANSACAO_ORDEM_SOURCE = SourceTable(
source_name=JAE_SOURCE_NAME,
table_id=TRANSACAO_ORDEM_TABLE_ID,
first_timestamp=datetime(2024, 11, 21, 0, 0, 0),
schedule_cron=create_daily_cron(hour=6),
partition_date_only=True,
max_recaptures=5,
primary_keys=[
"id",
"id_ordem_ressarcimento",
"data_processamento",
"data_transacao",
],
)

ALERT_WEBHOOK = "alertas_bilhetagem"
49 changes: 49 additions & 0 deletions pipelines/capture/jae/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""Flows de captura dos dados da Jaé"""
from prefect import case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.capture.jae.constants import constants
from pipelines.capture.jae.tasks import (
create_database_error_discord_message,
create_jae_general_extractor,
test_jae_databases_connections,
)
from pipelines.capture.templates.flows import create_default_capture_flow
from pipelines.constants import constants as smtr_constants
from pipelines.schedules import every_hour
from pipelines.tasks import log_discord
from pipelines.utils.prefect import set_default_parameters

CAPTURA_TRANSACAO_ORDEM = create_default_capture_flow(
flow_name="jae: transacao_ordem - captura",
source=constants.TRANSACAO_ORDEM_SOURCE.value,
create_extractor_task=create_jae_general_extractor,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)
set_default_parameters(CAPTURA_TRANSACAO_ORDEM, {"recapture": True})

with Flow("jae: verifica ip do banco de dados") as verificacao_ip:
success, failed_connections = test_jae_databases_connections()
with case(success, False):
message = create_database_error_discord_message(failed_connections=failed_connections)
send_discord_message = log_discord(
message=message,
key=constants.ALERT_WEBHOOK.value,
dados_tag=True,
)
verificacao_ip.set_reference_tasks(tasks=[send_discord_message, success])

verificacao_ip.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
verificacao_ip.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
verificacao_ip.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
verificacao_ip.schedule = every_hour
88 changes: 88 additions & 0 deletions pipelines/capture/jae/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
"""Tasks de captura dos dados da Jaé"""
from datetime import datetime, timedelta
from functools import partial

from prefect import task
from pytz import timezone

from pipelines.capture.jae.constants import constants
from pipelines.constants import constants as smtr_constants
from pipelines.utils.database import test_database_connection
from pipelines.utils.extractors.db import get_raw_db
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.secret import get_secret


@task(
max_retries=smtr_constants.MAX_RETRIES.value,
retry_delay=timedelta(seconds=smtr_constants.RETRY_DELAY.value),
)
def create_jae_general_extractor(source: SourceTable, timestamp: datetime):
"""Cria a extração de tabelas da Jaé"""

credentials = get_secret(constants.JAE_SECRET_PATH.value)
params = constants.JAE_TABLE_CAPTURE_PARAMS.value[source.table_id]

start = (
source.get_last_scheduled_timestamp(timestamp=timestamp)
.astimezone(tz=timezone("UTC"))
.strftime("%Y-%m-%d %H:%M:%S")
)
end = timestamp.astimezone(tz=timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S")

query = params["query"].format(start=start, end=end)
database_name = params["database"]
database = constants.JAE_DATABASE_SETTINGS.value[database_name]

return partial(
get_raw_db,
query=query,
engine=database["engine"],
host=database["host"],
user=credentials["user"],
password=credentials["password"],
database=database_name,
)


@task(nout=2)
def test_jae_databases_connections() -> tuple[bool, list[str]]:
"""
Testa a conexão com os bancos de dados da Jaé
Returns:
bool: Se todas as conexões foram bem-sucedidas ou não
list[str]: Lista com os nomes dos bancos de dados com falha de conexão
"""
credentials = get_secret(constants.JAE_SECRET_PATH.value)
failed_connections = []
for database_name, database in constants.JAE_DATABASE_SETTINGS.value.items():
success, _ = test_database_connection(
engine=database["engine"],
host=database["host"],
user=credentials["user"],
password=credentials["password"],
database=database_name,
)
if not success:
failed_connections.append(database_name)

return len(failed_connections) == 0, failed_connections


@task
def create_database_error_discord_message(failed_connections: list[str]) -> str:
"""
Cria a mensagem para ser enviada no Discord caso haja
problemas de conexão com os bancos da Jaé
Args:
failed_connections (list[str]): Lista com os nomes dos bancos de dados com falha de conexão
Returns:
str: Mensagem
"""
message = "Falha de conexão com o(s) banco(s) de dados:\n"
failed_connections = "\n".join(failed_connections)
message += failed_connections
return message + "\n"
11 changes: 11 additions & 0 deletions pipelines/capture/rioonibus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog - source_rioonibus

## [1.2.0] - 2024-11-28

### Alterado
- Altera a captura da viagem_informada para pegar o range D-2 até D+0 (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/337)

## [1.0.1] - 2024-11-25

### Alterado

- Substitui variavel de expressão cron pela função `create_daily_cron` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/333)

## [1.0.0] - 2024-10-21

### Adicionado
Expand Down
4 changes: 2 additions & 2 deletions pipelines/capture/rioonibus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from enum import Enum

from pipelines.schedules import cron_every_day_hour_7
from pipelines.schedules import create_daily_cron
from pipelines.utils.gcp.bigquery import SourceTable


Expand All @@ -23,7 +23,7 @@ class constants(Enum): # pylint: disable=c0103
source_name=RIO_ONIBUS_SOURCE_NAME,
table_id="viagem_informada",
first_timestamp=datetime(2024, 10, 16, 0, 0, 0),
schedule_cron=cron_every_day_hour_7,
schedule_cron=create_daily_cron(hour=7),
partition_date_only=True,
max_recaptures=5,
primary_keys=["id_viagem"],
Expand Down
33 changes: 20 additions & 13 deletions pipelines/capture/rioonibus/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from datetime import datetime, timedelta
from functools import partial

import pandas as pd
from prefect import task

from pipelines.capture.rioonibus.constants import constants
from pipelines.constants import constants as smtr_constants
from pipelines.utils.extractors.api import get_raw_api
from pipelines.utils.extractors.api import get_raw_api_params_list
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.secret import get_secret

Expand All @@ -16,20 +17,26 @@
max_retries=smtr_constants.MAX_RETRIES.value,
retry_delay=timedelta(seconds=smtr_constants.RETRY_DELAY.value),
)
def create_viagem_informada_extractor(source: SourceTable, timestamp: datetime):
def create_viagem_informada_extractor(
source: SourceTable, # pylint: disable=W0613
timestamp: datetime,
):
"""Cria a extração de viagens informadas na api da Rio Ônibus"""

extraction_day = timestamp.date() - timedelta(days=2)
params = {
"guidIdentificacao": get_secret(constants.RIO_ONIBUS_SECRET_PATH.value)[
"guididentificacao"
],
"datetime_processamento_inicio": extraction_day.isoformat() + "T00:00:00",
"datetime_processamento_fim": extraction_day.isoformat() + "T23:59:59",
}
end_date = timestamp.date()
start_date = end_date - timedelta(days=2)
api_key = get_secret(constants.RIO_ONIBUS_SECRET_PATH.value)["guididentificacao"]
params = [
{
"guidIdentificacao": api_key,
"datetime_processamento_inicio": d.date().isoformat() + "T00:00:00",
"datetime_processamento_fim": d.date().isoformat() + "T23:59:59",
}
for d in pd.date_range(start_date, end_date)
]

return partial(
get_raw_api,
get_raw_api_params_list,
url=constants.VIAGEM_INFORMADA_BASE_URL.value,
params=params,
raw_filetype=source.raw_filetype,
params_list=params,
)
7 changes: 7 additions & 0 deletions pipelines/capture/sonda/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - source_sonda

## [1.0.0] - 2024-11-28

### Adicionado

- Cria flow de captura de viagens informadas do BRT (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/337)
Empty file.
31 changes: 31 additions & 0 deletions pipelines/capture/sonda/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para captura de dados da SONDA
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.utils.gcp.bigquery import SourceTable


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para captura de dados da SONDA
"""

SONDA_SOURCE_NAME = "sonda"
SONDA_SECRET_PATH = "sonda_api"
VIAGEM_INFORMADA_LOGIN_URL = "http://consultaviagem.m2mfrota.com.br/AutenticarUsuario"
VIAGEM_INFORMADA_BASE_URL = "https://zn4.sinopticoplus.com/servico-dados/api/v1/obterDadosGTFS"
VIAGEM_INFORMADA_TABLE_ID = "viagem_informada"
VIAGEM_INFORMADA_SOURCE = SourceTable(
source_name=SONDA_SOURCE_NAME,
table_id=VIAGEM_INFORMADA_TABLE_ID,
first_timestamp=datetime(2024, 9, 10, 0, 0, 0),
schedule_cron=create_daily_cron(hour=7, minute=10),
partition_date_only=True,
max_recaptures=5,
primary_keys=["id_viagem"],
)
Loading

0 comments on commit f761dcb

Please sign in to comment.