Skip to content

Commit

Permalink
Merge branch 'add_format_sql' of https://github.com/prefeitura-rio/pi…
Browse files Browse the repository at this point in the history
…pelines_rj_smtr into add_format_sql
  • Loading branch information
Hellcassius committed Nov 8, 2024
2 parents af71a0b + 14aa5e4 commit bf58da8
Show file tree
Hide file tree
Showing 27 changed files with 1,116 additions and 280 deletions.
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class constants(Enum): # pylint: disable=c0103
"databases": {
"principal_db": {
"engine": "mysql",
"host": "10.5.114.121",
"host": "10.5.114.227",
},
"tarifa_db": {
"engine": "postgresql",
Expand Down
6 changes: 6 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - gtfs

## [1.1.3] - 2024-10-30

### Alterado

- Alterado arquivo `utils.py` em razão das novas faixas horárias (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/295)

## [1.1.2] - 2024-10-21

### Alterado
Expand Down
12 changes: 9 additions & 3 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for gtfs
DBT 2024-09-24
DBT 2024-11-07
"""

from prefect import Parameter, case, task
Expand Down Expand Up @@ -42,7 +42,8 @@
upload_raw_data_to_gcs,
upload_staging_data_to_gcs,
)
from pipelines.schedules import every_5_minutes

# from pipelines.schedules import every_5_minutes
from pipelines.tasks import get_scheduled_timestamp, parse_timestamp_to_string

# from pipelines.capture.templates.flows import create_default_capture_flow
Expand Down Expand Up @@ -145,11 +146,16 @@
filename=unmapped(filename),
)

data_versao_gtfs_str = parse_timestamp_to_string(
timestamp=data_versao_gtfs_task, pattern="%Y-%m-%d"
)

raw_filepaths, primary_keys = get_raw_gtfs_files(
os_control=os_control,
local_filepath=local_filepaths,
regular_sheet_index=regular_sheet_index,
upload_from_gcs=upload_from_gcs,
data_versao_gtfs=data_versao_gtfs_str,
)

transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map(
Expand Down Expand Up @@ -242,7 +248,7 @@
handler_initialize_sentry,
handler_skip_if_running,
]
gtfs_captura_nova.schedule = every_5_minutes
# gtfs_captura_nova.schedule = every_5_minutes


# with Flow(
Expand Down
7 changes: 6 additions & 1 deletion pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ def get_os_info(last_captured_os: str = None, data_versao_gtfs: str = None) -> d

@task(nout=2)
def get_raw_gtfs_files(
os_control, local_filepath: list, regular_sheet_index: int = None, upload_from_gcs: bool = False
os_control,
local_filepath: list,
regular_sheet_index: int = None,
upload_from_gcs: bool = False,
data_versao_gtfs: str = None,
):
"""
Downloads raw files and processes them.
Expand Down Expand Up @@ -241,6 +245,7 @@ def get_raw_gtfs_files(
file_bytes=file_bytes_os,
local_filepath=local_filepath,
raw_filepaths=raw_filepaths,
data_versao_gtfs=data_versao_gtfs,
)

else:
Expand Down
326 changes: 240 additions & 86 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.3] - 2024-10-29

### Alterado

- Altera o flow `materialize_sppo` para utilizar as tasks que rodam os testes do DBT (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)

## [1.0.2] - 2024-08-25

### Adicionado

- Cria arquivo `constants.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287)

### Alterado

- Altera a task `get_raw` para verificar se a captura está vazia (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/287)

## [1.0.1] - 2024-08-19

### Alterado
Expand Down
36 changes: 36 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_onibus_gps/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
Constant values for rj_smtr br_rj_riodejaneiro_onibus_gps
"""

from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Constant values for rj_smtr br_rj_riodejaneiro_onibus_gps
"""

GPS_DATA_CHECKS_LIST = {
"gps_sppo": {
"unique_columns__gps_sppo": {"description": "Todos os registros são únicos"},
"not_null__timestamp_gps__gps_sppo": {
"description": "Todos os registros possuem timestamp_gps não nulo"
},
"not_null__id_veiculo__gps_sppo": {
"description": "Todos os registros possuem id_veiculo não nulo"
},
"not_null__servico__gps_sppo": {
"description": "Todos os registros possuem servico não nulo"
},
"not_null__latitude__gps_sppo": {
"description": "Todos os registros possuem latitude não nula"
},
"not_null__longitude__gps_sppo": {
"description": "Todos os registros possuem longitude não nula"
},
"not_null__status__gps_sppo": {
"description": "Todos os registros possuem servico não nulo"
},
}
}
173 changes: 108 additions & 65 deletions pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

from pipelines.constants import constants
from pipelines.constants import constants as emd_constants
from pipelines.migration.br_rj_riodejaneiro_onibus_gps.constants import (
constants as gps_constants,
)
from pipelines.migration.br_rj_riodejaneiro_onibus_gps.tasks import (
clean_br_rj_riodejaneiro_onibus_gps,
create_api_url_onibus_gps,
Expand Down Expand Up @@ -61,6 +64,11 @@
every_hour_minute_six,
every_minute,
)
from pipelines.treatment.templates.tasks import (
check_dbt_test_run,
dbt_data_quality_checks,
run_dbt_tests,
)

# from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

Expand Down Expand Up @@ -164,6 +172,8 @@
default=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
truncate_minutes = Parameter("truncate_minutes", default=True)
test_only = Parameter("test_only", default=False)
run_time_test = Parameter("run_time_test", default="01:00:00")

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode()
Expand All @@ -174,85 +184,118 @@
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

# Set specific run parameters #
with case(rematerialization, False):
date_range_false = get_materialization_date_range(
with case(test_only, False):
with case(rematerialization, False):
date_range_false = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=materialize_delay_hours,
truncate_minutes=truncate_minutes,
)

RUN_CLEAN_FALSE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()
with case(rematerialization, True):
date_range_true = task(
lambda start, end: {
"date_range_start": start,
"date_range_end": end,
}
)(start=date_range_start, end=date_range_end)

RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true)

RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE)

date_range = merge(date_range_true, date_range_false)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=materialize_delay_hours,
truncate_minutes=truncate_minutes,
upstream_tasks=[RUN_CLEAN],
)

RUN_CLEAN_FALSE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()
with case(rematerialization, True):
date_range_true = task(
lambda start, end: {
"date_range_start": start,
"date_range_end": end,
}
)(start=date_range_start, end=date_range_end)
# Run materialization #
with case(rebuild, True):
RUN_TRUE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
upstream=True,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
flags="--full-refresh",
)

RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true)
with case(rebuild, False):
RUN_FALSE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
upstream=True,
)

RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE)
RUN_TEST, datetime_start, datetime_end = check_dbt_test_run(
date_range_start, date_range_end, run_time_test, upstream_tasks=[RUN_FALSE]
)

date_range = merge(date_range_true, date_range_false)
_vars = {"date_range_start": datetime_start, "date_range_end": datetime_end}

with case(RUN_TEST, True):
gps_sppo_data_quality = run_dbt_tests(
dataset_id=dataset_id,
table_id=table_id,
_vars=_vars,
)
GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks(
gps_sppo_data_quality,
gps_constants.GPS_DATA_CHECKS_LIST.value,
_vars,
)

RUN = merge(RUN_TRUE, RUN_FALSE)

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
upstream_tasks=[RUN_CLEAN],
)
with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

# Run materialization #
with case(rebuild, True):
RUN_TRUE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
upstream=True,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
flags="--full-refresh",
)
SET = merge(SET_TRUE, SET_FALSE)

with case(rebuild, False):
RUN_FALSE = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
upstream=True,
)
materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET])
with case(test_only, True):

RUN = merge(RUN_TRUE, RUN_FALSE)
_vars = {"date_range_start": date_range_start, "date_range_end": date_range_end}

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
gps_sppo_data_quality = run_dbt_tests(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
_vars=_vars,
)
GPS_SPPO_DATA_QUALITY_RESULTS = dbt_data_quality_checks(
gps_sppo_data_quality,
gps_constants.GPS_DATA_CHECKS_LIST.value,
_vars,
)

with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

SET = merge(SET_TRUE, SET_FALSE)

materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET])

materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
Expand Down
6 changes: 2 additions & 4 deletions pipelines/migration/projeto_subsidio_sppo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@

from pipelines.constants import constants as smtr_constants
from pipelines.migration.projeto_subsidio_sppo.constants import constants
from pipelines.migration.tasks import ( # perform_check,
format_send_discord_message,
perform_checks_for_table,
)
from pipelines.migration.tasks import perform_checks_for_table # perform_check,
from pipelines.utils.discord import format_send_discord_message
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import log

Expand Down
Loading

0 comments on commit bf58da8

Please sign in to comment.