diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index 9f79560df..860a4f7ca 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -42,7 +42,8 @@ upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) -from pipelines.schedules import every_5_minutes + +# from pipelines.schedules import every_5_minutes from pipelines.tasks import get_scheduled_timestamp, parse_timestamp_to_string # from pipelines.capture.templates.flows import create_default_capture_flow diff --git a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md index 2a34272d7..3b9eb8f36 100644 --- a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - br_rj_riodejaneiro_onibus_gps +## [1.0.3] - 2024-10-29 + +### Alterado + +- Altera o flow `materialize_sppo` para utilizar as tasks que rodam os testes do DBT (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288) + ## [1.0.2] - 2024-08-25 ### Adicionado diff --git a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py index a6ecd62b6..10b5c939f 100644 --- a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py @@ -25,6 +25,9 @@ from pipelines.constants import constants from pipelines.constants import constants as emd_constants +from pipelines.migration.br_rj_riodejaneiro_onibus_gps.constants import ( + constants as gps_constants, +) from pipelines.migration.br_rj_riodejaneiro_onibus_gps.tasks import ( clean_br_rj_riodejaneiro_onibus_gps, create_api_url_onibus_gps, @@ -61,6 +64,11 @@ every_hour_minute_six, every_minute, ) +from pipelines.treatment.templates.tasks import ( + check_dbt_test_run, + dbt_data_quality_checks, + run_dbt_tests, +) # from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client @@ -164,6 +172,8 @@ default=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value, ) truncate_minutes = Parameter("truncate_minutes", default=True) + test_only = Parameter("test_only", default=False) + run_time_test = Parameter("run_time_test", default="01:00:00") LABELS = get_current_flow_labels() MODE = get_current_flow_mode() @@ -174,85 +184,118 @@ # dbt_client = get_local_dbt_client(host="localhost", port=3001) # Set specific run parameters # - with case(rematerialization, False): - date_range_false = get_materialization_date_range( + with case(test_only, False): + with case(rematerialization, False): + date_range_false = get_materialization_date_range( + dataset_id=dataset_id, + table_id=table_id, + raw_dataset_id=raw_dataset_id, + raw_table_id=raw_table_id, + table_run_datetime_column_name="timestamp_gps", + mode=MODE, + delay_hours=materialize_delay_hours, + truncate_minutes=truncate_minutes, + ) + + RUN_CLEAN_FALSE = task( + lambda: [None], + checkpoint=False, + name="assign_none_to_previous_runs", + )() + with case(rematerialization, True): + date_range_true = task( + lambda start, end: { + "date_range_start": start, + "date_range_end": end, + } + )(start=date_range_start, end=date_range_end) + + RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true) + + RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE) + + date_range = merge(date_range_true, date_range_false) + + dataset_sha = fetch_dataset_sha( dataset_id=dataset_id, - table_id=table_id, - raw_dataset_id=raw_dataset_id, - raw_table_id=raw_table_id, - table_run_datetime_column_name="timestamp_gps", - mode=MODE, - delay_hours=materialize_delay_hours, - truncate_minutes=truncate_minutes, + upstream_tasks=[RUN_CLEAN], ) - RUN_CLEAN_FALSE = task( - lambda: [None], - checkpoint=False, - name="assign_none_to_previous_runs", - )() - with case(rematerialization, True): - date_range_true = task( - lambda start, end: { - "date_range_start": start, - "date_range_end": end, - } - )(start=date_range_start, end=date_range_end) + # Run materialization # + with case(rebuild, True): + RUN_TRUE = run_dbt_model( + # dbt_client=dbt_client, + dataset_id=dataset_id, + table_id=table_id, + upstream=True, + exclude="+data_versao_efetiva", + _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], + flags="--full-refresh", + ) - RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true) + with case(rebuild, False): + RUN_FALSE = run_dbt_model( + # dbt_client=dbt_client, + dataset_id=dataset_id, + table_id=table_id, + exclude="+data_versao_efetiva", + _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], + upstream=True, + ) - RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE) + RUN_TEST, datetime_start, datetime_end = check_dbt_test_run( + date_range_start, date_range_end, run_time_test, upstream_tasks=[RUN_FALSE] + ) - date_range = merge(date_range_true, date_range_false) + _vars = {"date_range_start": datetime_start, "date_range_end": datetime_end} + + with case(RUN_TEST, True): + gps_sppo_data_quality = run_dbt_tests( + dataset_id=dataset_id, + table_id=table_id, + _vars=_vars, + ) + GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks( + gps_sppo_data_quality, + gps_constants.GPS_DATA_CHECKS_LIST.value, + _vars, + ) + + RUN = merge(RUN_TRUE, RUN_FALSE) + + with case(rematerialization, False): + SET_FALSE = set_last_run_timestamp( + dataset_id=dataset_id, + table_id=table_id, + timestamp=date_range["date_range_end"], + wait=RUN, + mode=MODE, + ) - dataset_sha = fetch_dataset_sha( - dataset_id=dataset_id, - upstream_tasks=[RUN_CLEAN], - ) + with case(rematerialization, True): + SET_TRUE = task( + lambda: [None], + checkpoint=False, + name="assign_none_to_previous_runs", + )() - # Run materialization # - with case(rebuild, True): - RUN_TRUE = run_dbt_model( - # dbt_client=dbt_client, - dataset_id=dataset_id, - table_id=table_id, - upstream=True, - exclude="+data_versao_efetiva", - _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], - flags="--full-refresh", - ) + SET = merge(SET_TRUE, SET_FALSE) - with case(rebuild, False): - RUN_FALSE = run_dbt_model( - # dbt_client=dbt_client, - dataset_id=dataset_id, - table_id=table_id, - exclude="+data_versao_efetiva", - _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], - upstream=True, - ) + materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET]) + with case(test_only, True): - RUN = merge(RUN_TRUE, RUN_FALSE) + _vars = {"date_range_start": date_range_start, "date_range_end": date_range_end} - with case(rematerialization, False): - SET_FALSE = set_last_run_timestamp( + gps_sppo_data_quality = run_dbt_tests( dataset_id=dataset_id, table_id=table_id, - timestamp=date_range["date_range_end"], - wait=RUN, - mode=MODE, + _vars=_vars, + ) + GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks( + gps_sppo_data_quality, + gps_constants.GPS_DATA_CHECKS_LIST.value, + _vars, ) - - with case(rematerialization, True): - SET_TRUE = task( - lambda: [None], - checkpoint=False, - name="assign_none_to_previous_runs", - )() - - SET = merge(SET_TRUE, SET_FALSE) - - materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET]) materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) materialize_sppo.run_config = KubernetesRun( diff --git a/pipelines/migration/projeto_subsidio_sppo/tasks.py b/pipelines/migration/projeto_subsidio_sppo/tasks.py index 1b540a90e..867c09ffb 100644 --- a/pipelines/migration/projeto_subsidio_sppo/tasks.py +++ b/pipelines/migration/projeto_subsidio_sppo/tasks.py @@ -10,10 +10,8 @@ from pipelines.constants import constants as smtr_constants from pipelines.migration.projeto_subsidio_sppo.constants import constants -from pipelines.migration.tasks import ( # perform_check, - format_send_discord_message, - perform_checks_for_table, -) +from pipelines.migration.tasks import perform_checks_for_table # perform_check, +from pipelines.utils.discord import format_send_discord_message from pipelines.utils.secret import get_secret from pipelines.utils.utils import log diff --git a/pipelines/migration/tasks.py b/pipelines/migration/tasks.py index 3b3c6f30f..19a88ff47 100644 --- a/pipelines/migration/tasks.py +++ b/pipelines/migration/tasks.py @@ -44,7 +44,6 @@ read_raw_data, save_raw_local_func, save_treated_local_func, - send_discord_message, upload_run_logs_to_bq, ) from pipelines.utils.secret import get_secret @@ -1660,44 +1659,6 @@ def perform_checks_for_table( return checks -def format_send_discord_message(formatted_messages: list, webhook_url: str): - """ - Format and send a message to discord - - Args: - formatted_messages (list): The formatted messages - webhook_url (str): The webhook url - - Returns: - None - """ - formatted_message = "".join(formatted_messages) - log(formatted_message) - msg_ext = len(formatted_message) - if msg_ext > 2000: - log(f"** Message too long ({msg_ext} characters), will be split into multiple messages **") - # Split message into lines - lines = formatted_message.split("\n") - message_chunks = [] - chunk = "" - for line in lines: - if len(chunk) + len(line) + 1 > 2000: # +1 for the newline character - message_chunks.append(chunk) - chunk = "" - chunk += line + "\n" - message_chunks.append(chunk) # Append the last chunk - for chunk in message_chunks: - send_discord_message( - message=chunk, - webhook_url=webhook_url, - ) - else: - send_discord_message( - message=formatted_message, - webhook_url=webhook_url, - ) - - ############### # # Utilitary tasks diff --git a/pipelines/migration/utils.py b/pipelines/migration/utils.py index a44e27fd0..0a4504d74 100644 --- a/pipelines/migration/utils.py +++ b/pipelines/migration/utils.py @@ -37,6 +37,7 @@ from pytz import timezone from pipelines.constants import constants +from pipelines.utils.discord import send_discord_message from pipelines.utils.implicit_ftp import ImplicitFtpTls from pipelines.utils.secret import get_secret @@ -54,19 +55,6 @@ def set_default_parameters(flow: prefect.Flow, default_parameters: dict) -> pref return flow -def send_discord_message( - message: str, - webhook_url: str, -) -> None: - """ - Sends a message to a Discord channel. - """ - requests.post( - webhook_url, - data={"content": message}, - ) - - def log_critical(message: str, secret_path: str = constants.CRITICAL_SECRET_PATH.value): """Logs message to critical discord channel specified diff --git a/pipelines/treatment/templates/CHANGELOG.md b/pipelines/treatment/templates/CHANGELOG.md index 140c1a162..b5fcc880a 100644 --- a/pipelines/treatment/templates/CHANGELOG.md +++ b/pipelines/treatment/templates/CHANGELOG.md @@ -1,11 +1,18 @@ # Changelog - treatment -## [1.0.1] - 2024-11-08 +## [1.0.2] - 2024-11-08 ### Alterado - Adiciona lógica para verificar fontes de dados no padrão antigo (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/237) +## [1.0.1] - 2024-10-29 + +### Adicionado + +- Adiciona as tasks `check_dbt_test_run`, `run_dbt_tests` e `dbt_data_quality_checks` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288) +- Adiciona a função `parse_dbt_test_output` no `utils.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288) + ## [1.0.0] - 2024-10-21 diff --git a/pipelines/treatment/templates/tasks.py b/pipelines/treatment/templates/tasks.py index 6de89e83d..9fc15b3b8 100644 --- a/pipelines/treatment/templates/tasks.py +++ b/pipelines/treatment/templates/tasks.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- import time from datetime import datetime, timedelta -from typing import Union +from typing import Dict, List, Union import basedosdados as bd +import prefect import requests from prefect import task from prefeitura_rio.pipelines_utils.logging import log @@ -15,11 +16,14 @@ DBTSelector, IncompleteDataError, create_dataplex_log_message, + parse_dbt_test_output, send_dataplex_discord_message, ) from pipelines.utils.dataplex import DataQuality, DataQualityCheckArgs +from pipelines.utils.discord import format_send_discord_message from pipelines.utils.gcp.bigquery import SourceTable from pipelines.utils.prefect import flow_is_running_local, rename_current_flow_run +from pipelines.utils.secret import get_secret from pipelines.utils.utils import convert_timezone, cron_get_last_date # from pipelines.utils.utils import get_last_materialization_redis_key @@ -355,3 +359,181 @@ def run_data_quality_checks( timestamp=datetime.now(tz=timezone(constants.TIMEZONE.value)), partitions=partitions, ) + + +@task(nout=3) +def check_dbt_test_run( + date_range_start: str, date_range_end: str, run_time: str +) -> tuple[bool, str, str]: + """ + Compares the specified run time with the start date's time component. + If they match, it calculates and returns the start and end date strings + for the previous day in ISO format. + + Args: + date_range_start (str): The start date of the range. + date_range_end (str): The end date of the range. + run_time (str): The time to check against in the format "HH:MM:SS". + + Returns: + Tuple[bool, str, str]: A tuple containing the following elements: + - bool: True if the run time matches the start date's time; otherwise, False. + - str: The start date of the previous day in ISO format if the time matches. + - str: The end date of the previous day in ISO format if the time matches. + """ + + datetime_start = datetime.fromisoformat(date_range_start) + datetime_end = datetime.fromisoformat(date_range_end) + + run_time = datetime.strptime(run_time, "%H:%M:%S").time() + + if datetime_start.time() == run_time: + datetime_start_str = (datetime_start - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S") + datetime_end_str = (datetime_end - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S") + return True, datetime_start_str, datetime_end_str + return False, None, None + + +@task +def run_dbt_tests( + dataset_id: str = None, + table_id: str = None, + model: str = None, + upstream: bool = None, + downstream: bool = None, + test_name: str = None, + exclude: str = None, + flags: str = None, + _vars: Union[dict, List[Dict]] = None, +) -> str: + """ + Runs a DBT test + + Args: + dataset_id (str, optional): Dataset ID of the dbt model. Defaults to None. + table_id (str, optional): Table ID of the dbt model. Defaults to None. + model (str, optional): model to be tested. Defaults to None. + upstream (bool, optional): If True, includes upstream models. Defaults to None. + downstream (bool, optional): If True, includes downstream models. Defaults to None. + test_name (str, optional): The name of the specific test to be executed. Defaults to None. + exclude (str, optional): Models to be excluded from the test execution. Defaults to None. + flags (str, optional): Additional flags for the `dbt test` command. Defaults to None. + _vars (Union[dict, List[Dict]], optional): Variables to pass to dbt. Defaults to None. + + Returns: + str: Logs resulting from the execution of the `dbt test` command. + """ + run_command = "dbt test" + + if not model: + model = dataset_id + if table_id: + model += f".{table_id}" + + if model: + run_command += " --select " + if upstream: + run_command += "+" + run_command += model + if downstream: + run_command += "+" + if test_name: + model += f",test_name:{test_name}" + + if exclude: + run_command += f" --exclude {exclude}" + + if _vars: + if isinstance(_vars, list): + vars_dict = {} + for elem in _vars: + vars_dict.update(elem) + vars_str = f'"{vars_dict}"' + run_command += f" --vars {vars_str}" + else: + vars_str = f'"{_vars}"' + run_command += f" --vars {vars_str}" + + if flags: + run_command += f" {flags}" + + root_path = get_root_path() + queries_dir = str(root_path / "queries") + + if flow_is_running_local(): + run_command += f' --profiles-dir "{queries_dir}/dev"' + + log(f"Running dbt with command: {run_command}") + dbt_task = DbtShellTask( + profiles_dir=queries_dir, + helper_script=f'cd "{queries_dir}"', + log_stderr=True, + return_all=True, + command=run_command, + ) + dbt_logs = dbt_task.run() + + log("\n".join(dbt_logs)) + dbt_logs = "\n".join(dbt_logs) + return dbt_logs + + +@task +def dbt_data_quality_checks(dbt_logs: str, checks_list: dict, params: dict): + """ + Extracts the results of DBT tests and sends a message with the information to Discord. + + Args: + dbt_logs (str): Logs from DBT containing the test results. + checks_list (dict): Dictionary with the names of the tests and their descriptions. + date_range (dict): Dictionary representing a date range. + """ + + checks_results = parse_dbt_test_output(dbt_logs) + + webhook_url = get_secret(secret_path=constants.WEBHOOKS_SECRET_PATH.value)["dataplex"] + + dados_tag = f" - <@&{constants.OWNERS_DISCORD_MENTIONS.value['dados_smtr']['user_id']}>\n" + + test_check = all(test["result"] == "PASS" for test in checks_results.values()) + + date_range = ( + params["date_range_start"] + if params["date_range_start"] == params["date_range_end"] + else f'{params["date_range_start"]} a {params["date_range_end"]}' + ) + + if "(target='dev')" in dbt_logs or "(target='hmg')" in dbt_logs: + formatted_messages = [ + ":green_circle: " if test_check else ":red_circle: ", + f"**[DEV] Data Quality Checks - {prefect.context.get('flow_name')} - {date_range}**\n\n", # noqa + ] + else: + formatted_messages = [ + ":green_circle: " if test_check else ":red_circle: ", + f"**Data Quality Checks - {prefect.context.get('flow_name')} - {date_range}**\n\n", + ] + + for table_id, tests in checks_list.items(): + formatted_messages.append( + f"*{table_id}:*\n" + + "\n".join( + f'{":white_check_mark:" if checks_results[test_id]["result"] == "PASS" else ":x:"} ' + f'{test["description"]}' + for test_id, test in tests.items() + ) + ) + + formatted_messages.append("\n\n") + formatted_messages.append( + ":tada: **Status:** Sucesso" + if test_check + else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n" + ) + + formatted_messages.append(dados_tag) + try: + format_send_discord_message(formatted_messages, webhook_url) + except Exception as e: + log(f"Falha ao enviar mensagem para o Discord: {e}", level="error") + raise diff --git a/pipelines/treatment/templates/utils.py b/pipelines/treatment/templates/utils.py index 8058fcffd..f4ab5038d 100644 --- a/pipelines/treatment/templates/utils.py +++ b/pipelines/treatment/templates/utils.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import re from datetime import datetime, timedelta from google.cloud.dataplex_v1 import DataScanJob @@ -205,3 +206,66 @@ def send_dataplex_discord_message( embed_messages=embed, timestamp=timestamp, ) + + +def parse_dbt_test_output(dbt_logs: str) -> dict: + """Parses DBT test output and returns a list of test results. + + Args: + dbt_logs: The DBT test output as a string. + + Returns: + A list of dictionaries, each representing a test result with the following keys: + - name: The test name. + - result: "PASS", "FAIL" or "ERROR". + - query: Query to see test failures. + - error: Message error. + """ + + # Remover sequências ANSI + dbt_logs = re.sub(r"\x1B[@-_][0-?]*[ -/]*[@-~]", "", dbt_logs) + + results = {} + result_pattern = r"\d+ of \d+ (PASS|FAIL|ERROR) (\d+ )?([\w_]+) .* \[(PASS|FAIL|ERROR) .*\]" + fail_pattern = r"Failure in test ([\w_]+) .*\n.*\n.*\n.* compiled Code at (.*)\n" + error_pattern = r"Error in test ([\w_]+) \(.*schema.yaml\)\n (.*)\n" + + for match in re.finditer(result_pattern, dbt_logs): + groups = match.groups() + test_name = groups[2] + results[test_name] = {"result": groups[3]} + + for match in re.finditer(fail_pattern, dbt_logs): + groups = match.groups() + test_name = groups[0] + file = groups[1] + + with open(file, "r") as arquivo: + query = arquivo.read() + + query = re.sub(r"\n+", "\n", query) + results[test_name]["query"] = query + + for match in re.finditer(error_pattern, dbt_logs): + groups = match.groups() + test_name = groups[0] + error = groups[1] + results[test_name]["error"] = error + + log_message = "" + for test, info in results.items(): + result = info["result"] + log_message += f"Test: {test} Status: {result}\n" + + if result == "FAIL": + log_message += "Query:\n" + log_message += f"{info['query']}\n" + + if result == "ERROR": + log_message += f"Error: {info['error']}\n" + + log_message += "\n" + + log(log_message) + + return results diff --git a/pipelines/utils/discord.py b/pipelines/utils/discord.py index edbdf02c6..634db29e5 100644 --- a/pipelines/utils/discord.py +++ b/pipelines/utils/discord.py @@ -8,6 +8,19 @@ from pipelines.utils.secret import get_secret +def send_discord_message( + message: str, + webhook_url: str, +) -> None: + """ + Sends a message to a Discord channel. + """ + requests.post( + webhook_url, + data={"content": message}, + ) + + def send_discord_embed_message( webhook_secret_key: str, content: str, @@ -40,3 +53,41 @@ def send_discord_embed_message( log(response.text) log(response.status_code) + + +def format_send_discord_message(formatted_messages: list, webhook_url: str): + """ + Format and send a message to discord + + Args: + formatted_messages (list): The formatted messages + webhook_url (str): The webhook url + + Returns: + None + """ + formatted_message = "".join(formatted_messages) + log(formatted_message) + msg_ext = len(formatted_message) + if msg_ext > 2000: + log(f"** Message too long ({msg_ext} characters), will be split into multiple messages **") + # Split message into lines + lines = formatted_message.split("\n") + message_chunks = [] + chunk = "" + for line in lines: + if len(chunk) + len(line) + 1 > 2000: # +1 for the newline character + message_chunks.append(chunk) + chunk = "" + chunk += line + "\n" + message_chunks.append(chunk) # Append the last chunk + for chunk in message_chunks: + send_discord_message( + message=chunk, + webhook_url=webhook_url, + ) + else: + send_discord_message( + message=formatted_message, + webhook_url=webhook_url, + ) diff --git a/queries/CHANGELOG.md b/queries/CHANGELOG.md new file mode 100644 index 000000000..6bbcbf006 --- /dev/null +++ b/queries/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - queries + +## [1.0.0] - 2024-10-29 + +### Adicionado + +- Adiciona package: `dbt-labs/dbt_utils` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288) \ No newline at end of file diff --git a/queries/dev/utils.py b/queries/dev/utils.py index 38dac7824..6539c4849 100644 --- a/queries/dev/utils.py +++ b/queries/dev/utils.py @@ -1,11 +1,18 @@ # -*- coding: utf-8 -*- +# flake8: noqa import os - -# from datetime import datetime as dt -# from datetime import timedelta +import re +import subprocess from typing import Dict, List, Union import requests +from prefect import context + +# from datetime import datetime as dt +# from datetime import timedelta +from pipelines.constants import constants as smtr_constants +from pipelines.utils.discord import format_send_discord_message +from pipelines.utils.secret import get_secret # import pandas as pd @@ -78,3 +85,161 @@ def fetch_dataset_sha(dataset_id: str): dataset_version = response.json()[0]["sha"] return {"version": dataset_version} + + +def run_dbt_tests( + dataset_id: str = None, + table_id: str = None, + model: str = None, + upstream: bool = None, + downstream: bool = None, + test_name: str = None, + exclude: str = None, + flags: str = None, + _vars: Union[dict, List[Dict]] = None, +): + """ + Run DBT test + """ + run_command = "dbt test" + + common_flags = "--profiles-dir ./dev" + + if flags: + flags = f"{common_flags} {flags}" + else: + flags = common_flags + + if not model: + model = dataset_id + if table_id: + model += f".{table_id}" + + if model: + run_command += " --select " + if upstream: + run_command += "+" + run_command += model + if downstream: + run_command += "+" + if test_name: + model += f",test_name:{test_name}" + + if exclude: + run_command += f" --exclude {exclude}" + + if _vars: + if isinstance(_vars, list): + vars_dict = {} + for elem in _vars: + vars_dict.update(elem) + vars_str = f'"{vars_dict}"' + run_command += f" --vars {vars_str}" + else: + vars_str = f'"{_vars}"' + run_command += f" --vars {vars_str}" + + if flags: + run_command += f" {flags}" + + print(f"\n>>> RUNNING: {run_command}\n") + + project_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + os.chdir(project_dir) + dbt_logs = subprocess.run(run_command, shell=True, capture_output=True, text=True) + + print(dbt_logs.stdout) + return dbt_logs.stdout + + +def parse_dbt_test_output(dbt_logs: str) -> dict: + # Remover sequências ANSI + dbt_logs = re.sub(r"\x1B[@-_][0-?]*[ -/]*[@-~]", "", dbt_logs) + + results = {} + result_pattern = r"\d+ of \d+ (PASS|FAIL|ERROR) (\d+ )?([\w_]+) .* \[(PASS|FAIL|ERROR) .*\]" + fail_pattern = r"Failure in test ([\w_]+) .*\n.*\n.*\n.* compiled Code at (.*)\n" + error_pattern = r"Error in test ([\w_]+) \(.*schema.yaml\)\n (.*)\n" + + for match in re.finditer(result_pattern, dbt_logs): + groups = match.groups() + test_name = groups[2] + results[test_name] = {"result": groups[3]} + + for match in re.finditer(fail_pattern, dbt_logs): + groups = match.groups() + test_name = groups[0] + file = groups[1] + + with open(file, "r") as arquivo: + query = arquivo.read() + + query = re.sub(r"\n+", "\n", query) + results[test_name]["query"] = query + + for match in re.finditer(error_pattern, dbt_logs): + groups = match.groups() + test_name = groups[0] + error = groups[1] + results[test_name]["error"] = error + + log_message = "" + for test, info in results.items(): + result = info["result"] + log_message += f"Test: {test} Status: {result}\n" + + if result == "FAIL": + log_message += "Query:\n" + log_message += f"{info['query']}\n" + + if result == "ERROR": + log_message += f"Error: {info['error']}\n" + + log_message += "\n" + + print(log_message) + + return results + + +def dbt_data_quality_checks( + checks_list: dict, checks_results: dict, params: dict, webhook_url: str = None +) -> bool: + + if webhook_url is None: + webhook_url = get_secret(secret_path=smtr_constants.WEBHOOKS_SECRET_PATH.value)["dataplex"] + + dados_tag = f" - <@&{smtr_constants.OWNERS_DISCORD_MENTIONS.value['dados_smtr']['user_id']}>\n" + + test_check = all(test["result"] == "PASS" for test in checks_results.values()) + + date_range = ( + params["date_range_start"] + if params["date_range_start"] == params["date_range_end"] + else f'{params["date_range_start"]} a {params["date_range_end"]}' + ) + + formatted_messages = [ + ":green_circle: " if test_check else ":red_circle: ", + f"**[DEV]Data Quality Checks - {context.get('flow_name')} - {date_range}**\n\n", + ] + + for table_id, tests in checks_list.items(): + formatted_messages.append( + f"*{table_id}:*\n" + + "\n".join( + f'{":white_check_mark:" if checks_results[test_id]["result"] == "PASS" else ":x:"} ' + f'{test["description"]}' + for test_id, test in tests.items() + ) + ) + + formatted_messages.append("\n\n") + formatted_messages.append( + ":tada: **Status:** Sucesso" + if test_check + else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n" + ) + + formatted_messages.append(dados_tag) + format_send_discord_message(formatted_messages, webhook_url) diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md new file mode 100644 index 000000000..086285d32 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - br_rj_riodejaneiro_onibus_gps + +## [1.0.1] - 2024-10-25 + +#### Alterado + +- Altera lógica do filtro do modelo `sppo_aux_registros_realocacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287) \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md index 46f965c70..e5df2aaa8 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - br_rj_riodejaneiro_veiculos +## [1.0.4] - 2024-10-25 + +#### Adicionado + +- Adiciona testes do DBT no schema (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287) + ## [1.0.3] - 2024-08-09 ### Adicionado diff --git a/queries/models/veiculo/CHANGELOG.md b/queries/models/veiculo/CHANGELOG.md index 6577475e9..c3ea77f1d 100644 --- a/queries/models/veiculo/CHANGELOG.md +++ b/queries/models/veiculo/CHANGELOG.md @@ -1,15 +1,5 @@ # Changelog - veiculo -## [1.1.3] - 2024-10-25 - -#### Alterado - -- Altera lógica do filtro do modelo `sppo_aux_registros_realocacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287) - -#### Adicionado - -- Adiciona testes do DBT no schema (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287) - ## [1.1.2] - 2024-04-25 #### Adicionado