Skip to content

Commit

Permalink
Cria flow para alertar sobre falhas de conexão com o banco de dados d…
Browse files Browse the repository at this point in the history
…a Jaé / Remove filtro `SMTR: ` do flow janitor / Desativa materialização da `viagem_planejada_planejamento` (#366)

* altera local da task log_discord

* cria flow de verificacao de conexao com a jae

* move task log_discord

* cria funcao para testar conexao com banco de dados

* remove prefixo do filtro de flows

* flow config

* ajusta mensagem do discord

* desativa materializacao viagem_planejada_planejamento

* add changelog

* link pr

* trata dados nulos
  • Loading branch information
pixuimpou authored Dec 19, 2024
1 parent 03bdbd9 commit af22590
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 43 deletions.
6 changes: 6 additions & 0 deletions pipelines/capture/jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - source_jae

## [1.1.0] - 2024-12-19

### Adicionado

- Cria flow `verificacao_ip` para alertar sobre falhas de conexão com o banco de dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/366)

## [1.0.1] - 2024-12-16

### Alterado
Expand Down
2 changes: 2 additions & 0 deletions pipelines/capture/jae/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@ class constants(Enum): # pylint: disable=c0103
"data_transacao",
],
)

ALERT_WEBHOOK = "alertas_bilhetagem"
36 changes: 35 additions & 1 deletion pipelines/capture/jae/flows.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
# -*- coding: utf-8 -*-
"""Flows de captura dos dados da Jaé"""
from prefect import case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.capture.jae.constants import constants
from pipelines.capture.jae.tasks import create_jae_general_extractor
from pipelines.capture.jae.tasks import (
create_database_error_discord_message,
create_jae_general_extractor,
test_jae_databases_connections,
)
from pipelines.capture.templates.flows import create_default_capture_flow
from pipelines.constants import constants as smtr_constants
from pipelines.schedules import every_hour
from pipelines.tasks import log_discord
from pipelines.utils.prefect import set_default_parameters

CAPTURA_TRANSACAO_ORDEM = create_default_capture_flow(
Expand All @@ -13,3 +28,22 @@
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)
set_default_parameters(CAPTURA_TRANSACAO_ORDEM, {"recapture": True})

with Flow("jae: verifica ip do banco de dados") as verificacao_ip:
success, failed_connections = test_jae_databases_connections()
with case(success, False):
message = create_database_error_discord_message(failed_connections=failed_connections)
send_discord_message = log_discord(
message=message,
key=constants.ALERT_WEBHOOK.value,
dados_tag=True,
)
verificacao_ip.set_reference_tasks(tasks=[send_discord_message, success])

verificacao_ip.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
verificacao_ip.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
verificacao_ip.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
verificacao_ip.schedule = every_hour
43 changes: 43 additions & 0 deletions pipelines/capture/jae/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pipelines.capture.jae.constants import constants
from pipelines.constants import constants as smtr_constants
from pipelines.utils.database import test_database_connection
from pipelines.utils.extractors.db import get_raw_db
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.secret import get_secret
Expand Down Expand Up @@ -43,3 +44,45 @@ def create_jae_general_extractor(source: SourceTable, timestamp: datetime):
password=credentials["password"],
database=database_name,
)


@task(nout=2)
def test_jae_databases_connections() -> tuple[bool, list[str]]:
"""
Testa a conexão com os bancos de dados da Jaé
Returns:
bool: Se todas as conexões foram bem-sucedidas ou não
list[str]: Lista com os nomes dos bancos de dados com falha de conexão
"""
credentials = get_secret(constants.JAE_SECRET_PATH.value)
failed_connections = []
for database_name, database in constants.JAE_DATABASE_SETTINGS.value.items():
success, _ = test_database_connection(
engine=database["engine"],
host=database["host"],
user=credentials["user"],
password=credentials["password"],
database=database_name,
)
if not success:
failed_connections.append(database_name)

return len(failed_connections) == 0, failed_connections


@task
def create_database_error_discord_message(failed_connections: list[str]) -> str:
"""
Cria a mensagem para ser enviada no Discord caso haja
problemas de conexão com os bancos da Jaé
Args:
failed_connections (list[str]): Lista com os nomes dos bancos de dados com falha de conexão
Returns:
str: Mensagem
"""
message = "Falha de conexão com o(s) banco(s) de dados:\n"
failed_connections = "\n".join(failed_connections)
message += failed_connections
return message + "\n"
11 changes: 5 additions & 6 deletions pipelines/janitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@


@task
def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_project="production"):
def query_active_flow_names(prefect_client=None, prefect_project="production"):
query = """
query ($prefix: String, $offset: Int, $project_name: String){
query ($offset: Int, $project_name: String){
flow(
where: {
name: {_like: $prefix},
archived: {_eq: false},
project: {name:{_eq: $project_name}}
}
Expand All @@ -32,7 +31,7 @@ def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_projec
"""
if not prefect_client:
prefect_client = Client()
variables = {"prefix": prefix, "offset": 0, "project_name": prefect_project}
variables = {"offset": 0, "project_name": prefect_project}
# flow_names = []
response = prefect_client.graphql(query=query, variables=variables)["data"]
active_flows = []
Expand Down Expand Up @@ -137,8 +136,8 @@ def get_prefect_client():


@task
def get_active_flow_names(prefix="%SMTR%"):
flow_names = query_active_flow_names(prefix=prefix)
def get_active_flow_names():
flow_names = query_active_flow_names()
log(f"Got flow_names\n{flow_names[:10]}\n...\n{flow_names[-10:-1]}")
return flow_names

Expand Down
7 changes: 2 additions & 5 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@
from pipelines.tasks import (
check_fail,
get_scheduled_timestamp,
log_discord,
parse_timestamp_to_string,
task_value_is_none,
)
from pipelines.treatment.templates.tasks import (
dbt_data_quality_checks,
log_discord,
run_dbt_tests,
)
from pipelines.treatment.templates.tasks import dbt_data_quality_checks, run_dbt_tests

# from pipelines.capture.templates.flows import create_default_capture_flow

Expand Down
5 changes: 4 additions & 1 deletion pipelines/migration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,10 @@ def transform_raw_to_nested_structure(
content_columns = [c for c in data.columns if c not in primary_key]
data["content"] = data.apply(
lambda row: json.dumps(
row[content_columns].to_dict(),
{
key: value if not pd.isna(value) else None
for key, value in row[content_columns].to_dict().items()
},
ensure_ascii=(
constants.CONTROLE_FINANCEIRO_DATASET_ID.value not in raw_filepath
),
Expand Down
19 changes: 19 additions & 0 deletions pipelines/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from pytz import timezone

from pipelines.constants import constants
from pipelines.utils.discord import send_discord_message
from pipelines.utils.prefect import FailedSubFlow, create_subflow_run, wait_subflow_run
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import convert_timezone


Expand Down Expand Up @@ -233,3 +235,20 @@ def check_fail(results: Union[list, str]):
return any(isinstance(result, FAIL) for result in results)
else:
return isinstance(results, FAIL)


@task
def log_discord(message: str, key: str, dados_tag: bool = False):
"""Logs message to discord channel specified
Args:
message (str): Message to post on the channel
key (str): Key to secret path storing the webhook to channel.
dados_tag (bool): Indicates whether the message will tag the data team
"""
if dados_tag:
message = (
message + f" - <@&{constants.OWNERS_DISCORD_MENTIONS.value['dados_smtr']['user_id']}>\n"
)
url = get_secret(secret_path=constants.WEBHOOKS_SECRET_PATH.value)[key]
send_discord_message(message=message, webhook_url=url)
2 changes: 1 addition & 1 deletion pipelines/treatment/planejamento/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows de tratamento dos dados de planejamento
DBT 2024-12-16
DBT 2024-12-19
"""

from pipelines.constants import constants as smtr_constants
Expand Down
19 changes: 1 addition & 18 deletions pipelines/treatment/templates/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
send_dataplex_discord_message,
)
from pipelines.utils.dataplex import DataQuality, DataQualityCheckArgs
from pipelines.utils.discord import format_send_discord_message, send_discord_message
from pipelines.utils.discord import format_send_discord_message
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.prefect import flow_is_running_local, rename_current_flow_run
from pipelines.utils.secret import get_secret
Expand Down Expand Up @@ -601,20 +601,3 @@ def dbt_data_quality_checks(

if not test_check:
raise FAIL


@task
def log_discord(message: str, key: str, dados_tag: bool = False):
"""Logs message to discord channel specified
Args:
message (str): Message to post on the channel
key (str): Key to secret path storing the webhook to channel.
dados_tag (bool): Indicates whether the message will tag the data team
"""
if dados_tag:
message = (
message + f" - <@&{constants.OWNERS_DISCORD_MENTIONS.value['dados_smtr']['user_id']}>\n"
)
url = get_secret(secret_path=constants.WEBHOOKS_SECRET_PATH.value)[key]
send_discord_message(message=message, webhook_url=url)
74 changes: 74 additions & 0 deletions pipelines/utils/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
from prefeitura_rio.pipelines_utils.logging import log
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

ENGINE_MAPPING = {
"mysql": {"driver": "pymysql", "port": "3306"},
"postgresql": {"driver": "psycopg2", "port": "5432"},
}


def create_database_url(
engine: str,
host: str,
user: str,
password: str,
database: str,
) -> str:
"""
Cria a URL para se conectar a um banco de dados
Args:
engine (str): O banco de dados (postgresql ou mysql)
host (str): O host do banco de dados
user (str): O usuário para se conectar
password (str): A senha do usuário
database (str): O nome da base (schema)
Returns:
str: a URL de conexão
"""
engine_info = ENGINE_MAPPING[engine]
driver = engine_info["driver"]
port = engine_info["port"]
return f"{engine}+{driver}://{user}:{password}@{host}:{port}/{database}"


def test_database_connection(
engine: str,
host: str,
user: str,
password: str,
database: str,
) -> tuple[bool, str]:
"""
Testa se é possível se conectar a um banco de dados
Args:
engine (str): O banco de dados (postgresql ou mysql)
host (str): O host do banco de dados
user (str): O usuário para se conectar
password (str): A senha do usuário
database (str): O nome da base (schema)
Returns:
bool: Se foi possível se conectar ou não
str: String do erro
"""
url = create_database_url(
engine=engine,
host=host,
user=user,
password=password,
database=database,
)
connection = create_engine(url)
log(f"Tentando conexão com o banco de dados {database}")
try:
with connection.connect() as _:
log("Conexão bem-sucedida!")
return True, None
except OperationalError as e:
log("Conexão falhou", level="warning")
return False, str(e)
18 changes: 10 additions & 8 deletions pipelines/utils/extractors/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from prefeitura_rio.pipelines_utils.logging import log
from sqlalchemy import create_engine

from pipelines.utils.database import create_database_url


def get_raw_db(
query: str,
Expand All @@ -27,15 +29,15 @@ def get_raw_db(
Returns:
list[str]: Dados em formato JSON
"""
engine_mapping = {
"mysql": {"driver": "pymysql", "port": "3306"},
"postgresql": {"driver": "psycopg2", "port": "5432"},
}

engine_details = engine_mapping[engine]
driver = engine_details["driver"]
port = engine_details["port"]
connection = create_engine(f"{engine}+{driver}://{user}:{password}@{host}:{port}/{database}")
url = create_database_url(
engine=engine,
host=host,
user=user,
password=password,
database=database,
)
connection = create_engine(url)
max_retries = 10
for retry in range(1, max_retries + 1):
try:
Expand Down
5 changes: 4 additions & 1 deletion pipelines/utils/pretreatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ def transform_to_nested_structure(data: pd.DataFrame, primary_keys: list) -> pd.
content_columns = [c for c in data.columns if c not in primary_keys]
data["content"] = data.apply(
lambda row: json.dumps(
row[content_columns].to_dict(),
{
key: value if not pd.isna(value) else None
for key, value in row[content_columns].to_dict().items()
}
),
axis=1,
)
Expand Down
2 changes: 0 additions & 2 deletions queries/selectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ selectors:
value: aux_calendario_manual
- method: fqn
value: calendario
- method: fqn
value: viagem_planejada_planejamento

- name: transacao_ordem
description: Materialização da tabela auxiliar de relacionamento entre as ordens de pagamento e as transações
Expand Down

0 comments on commit af22590

Please sign in to comment.