Skip to content

Commit

Permalink
Testes no DBT (#288)
Browse files Browse the repository at this point in the history
* tratamento captura vazia

* add package dbt-labs/dbt_utils

* add flag store_failures

* altera tratamento realocação

* Revert "add flag store_failures"

This reverts commit 30d123c.

* add dbt tests

* corrige filtro realocacao

* add test_only, e lógica para testes do dbt

* add check_dbt_test_run, run_dbt_tests e dbt_data_quality_checks

* add parse_dbt_test_output e send_data_quality_discord_message

* add changelogs

* add changelogs

* ajuste config dos testes

* altera nome dos testes

* corrige nomes dos testes

* add parametro run_time_test

* altera dbt_data_quality_checks e check_dbt_test_run

* remove send_data_quality_discord_message

* add changelogs

* add funções de testes do dbt

* add tratamento de exceção na dbt_data_quality_checks

* fix config where dos tests e add macro custom_get_where_subquery

* fix changelogs

* import prefect

* move função format_send_discord_message  e send_discord_message para utils.discord

* comenta schedule every_5_minutes

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
akaBotelho and mergify[bot] authored Nov 7, 2024
1 parent 4435122 commit 16ebeb8
Show file tree
Hide file tree
Showing 15 changed files with 612 additions and 136 deletions.
3 changes: 2 additions & 1 deletion pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
upload_raw_data_to_gcs,
upload_staging_data_to_gcs,
)
from pipelines.schedules import every_5_minutes

# from pipelines.schedules import every_5_minutes
from pipelines.tasks import get_scheduled_timestamp, parse_timestamp_to_string

# from pipelines.capture.templates.flows import create_default_capture_flow
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.3] - 2024-10-29

### Alterado

- Altera o flow `materialize_sppo` para utilizar as tasks que rodam os testes do DBT (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)

## [1.0.2] - 2024-08-25

### Adicionado
Expand Down
173 changes: 108 additions & 65 deletions pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

from pipelines.constants import constants
from pipelines.constants import constants as emd_constants
from pipelines.migration.br_rj_riodejaneiro_onibus_gps.constants import (
constants as gps_constants,
)
from pipelines.migration.br_rj_riodejaneiro_onibus_gps.tasks import (
clean_br_rj_riodejaneiro_onibus_gps,
create_api_url_onibus_gps,
Expand Down Expand Up @@ -61,6 +64,11 @@
every_hour_minute_six,
every_minute,
)
from pipelines.treatment.templates.tasks import (
check_dbt_test_run,
dbt_data_quality_checks,
run_dbt_tests,
)

# from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

Expand Down Expand Up @@ -164,6 +172,8 @@
default=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
truncate_minutes = Parameter("truncate_minutes", default=True)
test_only = Parameter("test_only", default=False)
run_time_test = Parameter("run_time_test", default="01:00:00")

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode()
Expand All @@ -174,85 +184,118 @@
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

# Set specific run parameters #
with case(rematerialization, False):
date_range_false = get_materialization_date_range(
with case(test_only, False):
with case(rematerialization, False):
date_range_false = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=materialize_delay_hours,
truncate_minutes=truncate_minutes,
)

RUN_CLEAN_FALSE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()
with case(rematerialization, True):
date_range_true = task(
lambda start, end: {
"date_range_start": start,
"date_range_end": end,
}
)(start=date_range_start, end=date_range_end)

RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true)

RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE)

date_range = merge(date_range_true, date_range_false)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=materialize_delay_hours,
truncate_minutes=truncate_minutes,
upstream_tasks=[RUN_CLEAN],
)

RUN_CLEAN_FALSE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()
with case(rematerialization, True):
date_range_true = task(
lambda start, end: {
"date_range_start": start,
"date_range_end": end,
}
)(start=date_range_start, end=date_range_end)
# Run materialization #
with case(rebuild, True):
RUN_TRUE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
upstream=True,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
flags="--full-refresh",
)

RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true)
with case(rebuild, False):
RUN_FALSE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
upstream=True,
)

RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE)
RUN_TEST, datetime_start, datetime_end = check_dbt_test_run(
date_range_start, date_range_end, run_time_test, upstream_tasks=[RUN_FALSE]
)

date_range = merge(date_range_true, date_range_false)
_vars = {"date_range_start": datetime_start, "date_range_end": datetime_end}

with case(RUN_TEST, True):
gps_sppo_data_quality = run_dbt_tests(
dataset_id=dataset_id,
table_id=table_id,
_vars=_vars,
)
GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks(
gps_sppo_data_quality,
gps_constants.GPS_DATA_CHECKS_LIST.value,
_vars,
)

RUN = merge(RUN_TRUE, RUN_FALSE)

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
upstream_tasks=[RUN_CLEAN],
)
with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

# Run materialization #
with case(rebuild, True):
RUN_TRUE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
upstream=True,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
flags="--full-refresh",
)
SET = merge(SET_TRUE, SET_FALSE)

with case(rebuild, False):
RUN_FALSE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
upstream=True,
)
materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET])
with case(test_only, True):

RUN = merge(RUN_TRUE, RUN_FALSE)
_vars = {"date_range_start": date_range_start, "date_range_end": date_range_end}

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
gps_sppo_data_quality = run_dbt_tests(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
_vars=_vars,
)
GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks(
gps_sppo_data_quality,
gps_constants.GPS_DATA_CHECKS_LIST.value,
_vars,
)

with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

SET = merge(SET_TRUE, SET_FALSE)

materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET])

materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
Expand Down
6 changes: 2 additions & 4 deletions pipelines/migration/projeto_subsidio_sppo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@

from pipelines.constants import constants as smtr_constants
from pipelines.migration.projeto_subsidio_sppo.constants import constants
from pipelines.migration.tasks import ( # perform_check,
format_send_discord_message,
perform_checks_for_table,
)
from pipelines.migration.tasks import perform_checks_for_table # perform_check,
from pipelines.utils.discord import format_send_discord_message
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import log

Expand Down
39 changes: 0 additions & 39 deletions pipelines/migration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
read_raw_data,
save_raw_local_func,
save_treated_local_func,
send_discord_message,
upload_run_logs_to_bq,
)
from pipelines.utils.secret import get_secret
Expand Down Expand Up @@ -1660,44 +1659,6 @@ def perform_checks_for_table(
return checks


def format_send_discord_message(formatted_messages: list, webhook_url: str):
"""
Format and send a message to discord
Args:
formatted_messages (list): The formatted messages
webhook_url (str): The webhook url
Returns:
None
"""
formatted_message = "".join(formatted_messages)
log(formatted_message)
msg_ext = len(formatted_message)
if msg_ext > 2000:
log(f"** Message too long ({msg_ext} characters), will be split into multiple messages **")
# Split message into lines
lines = formatted_message.split("\n")
message_chunks = []
chunk = ""
for line in lines:
if len(chunk) + len(line) + 1 > 2000: # +1 for the newline character
message_chunks.append(chunk)
chunk = ""
chunk += line + "\n"
message_chunks.append(chunk) # Append the last chunk
for chunk in message_chunks:
send_discord_message(
message=chunk,
webhook_url=webhook_url,
)
else:
send_discord_message(
message=formatted_message,
webhook_url=webhook_url,
)


###############
#
# Utilitary tasks
Expand Down
14 changes: 1 addition & 13 deletions pipelines/migration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from pytz import timezone

from pipelines.constants import constants
from pipelines.utils.discord import send_discord_message
from pipelines.utils.implicit_ftp import ImplicitFtpTls
from pipelines.utils.secret import get_secret

Expand All @@ -54,19 +55,6 @@ def set_default_parameters(flow: prefect.Flow, default_parameters: dict) -> pref
return flow


def send_discord_message(
message: str,
webhook_url: str,
) -> None:
"""
Sends a message to a Discord channel.
"""
requests.post(
webhook_url,
data={"content": message},
)


def log_critical(message: str, secret_path: str = constants.CRITICAL_SECRET_PATH.value):
"""Logs message to critical discord channel specified
Expand Down
7 changes: 7 additions & 0 deletions pipelines/treatment/templates/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - treatment

## [1.0.1] - 2024-10-29

### Adicionado

- Adiciona as tasks `check_dbt_test_run`, `run_dbt_tests` e `dbt_data_quality_checks` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)
- Adiciona a função `parse_dbt_test_output` no `utils.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)

## [1.0.0] - 2024-10-21

### Adicionado
Expand Down
Loading

0 comments on commit 16ebeb8

Please sign in to comment.