Skip to content

Commit

Permalink
Cria flow generico de materialização + Adiciona tratamento transação …
Browse files Browse the repository at this point in the history
…Jaé (#513)

* create default materialization flow

* create tasks for default materialization flow

* make generate_execute_schedules more generic

* create bilhetagem materialization flow

* adapt bilhetagem schedules for the new model

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add run config and storage

* Update utils.py

* fix sub tasks

* fix fetch_dataset_sha run

* add run_date variable to materialization flow

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove discord notifications for testing

* add manual date_range / fix flow run name

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix missing table_id logic

* fix empty return

* fix empty return

* add flag_date_range when var_params is blank

* change rename logic when has date variables

* change return values of create_dbt_run_vars

* create dict aux function

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove *args from task

* change coalesce task

* fix rename task

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix task order

* add docstrings

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix line too long

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* pre-commit hook

* adjust tasks

* mudar estrutura do flow materializacao

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* adicionar schedule de bilhetagem

* adicionar schedule no flow de materialização

* ajuste nome da coluna de datetime

* ajustar nome coluna

* mudar coluna de data para datetime_transacao

* ajusta variavel date_range manual

* mudar nome parametro de variável dbt

* cria flow de orquestração materialização

* volta notificação do discord

* ajusta wait_flow_run

* mudar query para teste

* reverter query teste

* usar copy no dicionario de variaveis de data

* adjust constant run interval

* remover funcao comentada

* alterar padrão de nome dos flows

* remove imports comentados

* remove schedules nao utilizados

* remove task comentada

* mudar agent para produção

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Rodrigo Cunha <[email protected]>
  • Loading branch information
4 people authored Oct 5, 2023
1 parent 06f2c00 commit c689b4e
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 40 deletions.
116 changes: 107 additions & 9 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,138 @@

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped

# EMD Imports #

from pipelines.constants import constants as emd_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_current_flow_labels,
)


from pipelines.utils.utils import set_default_parameters

# SMTR Imports #

from pipelines.rj_smtr.flows import default_capture_flow
from pipelines.rj_smtr.flows import (
default_capture_flow,
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import (
get_current_timestamp,
)

from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.schedules import (
bilhetagem_principal_schedule,
bilhetagem_transacao_schedule,
)

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour

# Flows #

# BILHETAGEM TRANSAÇÃO - CAPTURA A CADA MINUTO #

bilhetagem_transacao_captura = deepcopy(default_capture_flow)
bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação (captura)"
bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação - Captura"
bilhetagem_transacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule

# BILHETAGEM PRINCIPAL - CAPTURA DIÁRIA DE DIVERSAS TABELAS #
# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

bilhetagem_auxiliar_captura = deepcopy(default_capture_flow)
bilhetagem_auxiliar_captura.name = "SMTR: Bilhetagem Auxiliar - Captura (subflow)"
bilhetagem_auxiliar_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_auxiliar_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_auxiliar_captura = set_default_parameters(
flow=bilhetagem_auxiliar_captura,
default_parameters={
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
},
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
bilhetagem_materializacao = deepcopy(default_materialization_flow)
bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)"
bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_materializacao_parameters = {
"dataset_id": constants.BILHETAGEM_DATASET_ID.value
} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value

bilhetagem_materializacao = set_default_parameters(
flow=bilhetagem_materializacao,
default_parameters=bilhetagem_materializacao_parameters,
)

# TRATAMENTO - RODA DE HORA EM HORA, CAPTURA AUXILIAR + MATERIALIZAÇÃO
with Flow(
"SMTR: Bilhetagem Transação - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_transacao_tratamento:
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Materialização
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_captura],
)

wait_materializacao = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_principal_captura = deepcopy(default_capture_flow)
bilhetagem_principal_captura.name = "SMTR: Bilhetagem Principal (captura)"
bilhetagem_principal_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_principal_captura.run_config = KubernetesRun(
bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_principal_captura.schedule = bilhetagem_principal_schedule
bilhetagem_transacao_tratamento.schedule = every_hour
# bilhetagem_materializacao.schedule = bilhetagem_materializacao_schedule
21 changes: 2 additions & 19 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,10 @@
generate_execute_schedules,
)

bilhetagem_principal_clocks = generate_execute_schedules(
clock_interval=timedelta(
**constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["principal_run_interval"]
),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
table_parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
dataset_id=constants.BILHETAGEM_DATASET_ID.value,
secret_path=constants.BILHETAGEM_SECRET_PATH.value,
source_type=constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
runs_interval_minutes=constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value[
"principal_runs_interval_minutes"
],
)

bilhetagem_principal_schedule = Schedule(clocks=untuple(bilhetagem_principal_clocks))

BILHETAGEM_TRANSACAO_INTERVAL = timedelta(minutes=1)
bilhetagem_transacao_clocks = generate_execute_schedules(
clock_interval=timedelta(
**constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["transacao_run_interval"]
**constants.BILHETAGEM_CAPTURE_RUN_INTERVAL.value["transacao_run_interval"]
),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
Expand Down
32 changes: 24 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,15 @@ class constants(Enum): # pylint: disable=c0103
},
"vpn_url": "http://vpn-jae.mobilidade.rio/",
"source_type": "api-json",
"transacao_run_interval": {"minutes": 1},
"principal_run_interval": {"days": 1},
"transacao_runs_interval_minutes": 0,
"principal_runs_interval_minutes": 5,
}

BILHETAGEM_CAPTURE_RUN_INTERVAL = {
"transacao_run_interval": {"minutes": 1},
"principal_run_interval": {"days": 1},
}

BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = {
"table_id": "transacao",
"partition_date_only": False,
Expand All @@ -203,11 +206,13 @@ class constants(Enum): # pylint: disable=c0103
data_processamento BETWEEN '{start}'
AND '{end}'
""",
"run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS["transacao_run_interval"],
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL["transacao_run_interval"],
},
"primary_key": ["id"], # id column to nest data on
}

BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"

BILHETAGEM_CAPTURE_PARAMS = [
{
"table_id": "linha",
Expand All @@ -222,7 +227,7 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
Expand All @@ -241,7 +246,7 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
Expand All @@ -260,7 +265,7 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
Expand All @@ -279,7 +284,7 @@ class constants(Enum): # pylint: disable=c0103
WHERE
dt_inclusao >= '{start}'
""",
"run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
Expand All @@ -289,4 +294,15 @@ class constants(Enum): # pylint: disable=c0103
], # id column to nest data on
},
]
BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"

BILHETAGEM_MATERIALIZACAO_PARAMS = {
"table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"],
"upstream": True,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_transacao",
"delay_hours": 1,
},
"version": {},
},
}
84 changes: 82 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect import Parameter
from prefect import case, Parameter
from prefect.utilities.edges import unmapped

# EMD Imports #

from pipelines.constants import constants as emd_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_now_time,
get_current_flow_labels,
get_current_flow_mode,
)
from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

# SMTR Imports #

Expand All @@ -22,13 +27,17 @@
create_local_partition_path,
get_current_timestamp,
parse_timestamp_to_string,
transform_raw_to_nested_structure,
create_dbt_run_vars,
set_last_run_timestamp,
coalesce_task,
upload_raw_data_to_gcs,
upload_staging_data_to_gcs,
transform_raw_to_nested_structure,
get_raw_from_sources,
create_request_params,
)

from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

with Flow(
"SMTR: Captura",
Expand Down Expand Up @@ -114,3 +123,74 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

with Flow(
"SMTR: Materialização",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as default_materialization_flow:
# SETUP #

dataset_id = Parameter("dataset_id", default=None)
table_id = Parameter("table_id", default=None)
raw_table_id = Parameter("raw_table_id", default=None)
dbt_alias = Parameter("dbt_alias", default=False)
upstream = Parameter("upstream", default=None)
downstream = Parameter("downstream", default=None)
exclude = Parameter("exclude", default=None)
flags = Parameter("flags", default=None)
dbt_vars = Parameter("dbt_vars", default=dict())

# treated_table_params = treat_dbt_table_params(table_params=table_params)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)

_vars, date_var, flag_date_range = create_dbt_run_vars(
dataset_id=dataset_id,
dbt_vars=dbt_vars,
table_id=table_id,
raw_dataset_id=dataset_id,
raw_table_id=raw_table_id,
mode=MODE,
)

# Rename flow run

flow_name_prefix = coalesce_task([table_id, dataset_id])

flow_name_now_time = coalesce_task([date_var, get_now_time()])

rename_flow_run = rename_current_flow_run_now_time(
prefix=default_materialization_flow.name + " " + flow_name_prefix + ": ",
now_time=flow_name_now_time,
)

dbt_client = get_k8s_dbt_client(mode=MODE, wait=rename_flow_run)

RUNS = run_dbt_model.map(
dbt_client=unmapped(dbt_client),
dataset_id=unmapped(dataset_id),
table_id=unmapped(table_id),
_vars=_vars,
dbt_alias=unmapped(dbt_alias),
upstream=unmapped(upstream),
downstream=unmapped(downstream),
exclude=unmapped(exclude),
flags=unmapped(flags),
)

with case(flag_date_range, True):
set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_var["date_range_end"],
wait=RUNS,
mode=MODE,
)


default_materialization_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
default_materialization_flow.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
Loading

0 comments on commit c689b4e

Please sign in to comment.