Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cria flow de materializacao de transacao_madonna #676

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from pipelines.rj_smtr.br_rj_riodejaneiro_stu.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_diretorios.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_viagem_zirix.flows import *
from pipelines.rj_smtr.dashboard_bilhetagem_madonna.flows import *
Empty file.
116 changes: 116 additions & 0 deletions pipelines/rj_smtr/dashboard_bilhetagem_madonna/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
"""
Flows for dashboard_bilhetagem_madonna
"""


from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.utils.decorators import Flow

from pipelines.utils.tasks import (
get_current_flow_labels,
get_current_flow_mode,
)

from pipelines.utils.utils import set_default_parameters


from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import bilhetagem_recaptura
from pipelines.rj_smtr.flows import (
default_materialization_flow,
)

from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

from pipelines.constants import constants as emd_constants
from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_10_minutes_dev

from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

bilhetagem_materializacao_madonna = deepcopy(default_materialization_flow)
bilhetagem_materializacao_madonna.name = (
"SMTR: Bilhetagem Madonna - Materialização (subflow)"
)
bilhetagem_materializacao_madonna.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao_madonna.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
# labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)

bilhetagem_materializacao_madonna_parameters = {
"dataset_id": "dashboard_bilhetagem_madonna",
"table_id": "transacao_gentileza",
# "table_id": "transacao_madonna",
"upstream": True,
}

bilhetagem_materializacao_madonna = set_default_parameters(
flow=bilhetagem_materializacao_madonna,
default_parameters=bilhetagem_materializacao_madonna_parameters,
)


with Flow("SMTR: Bilhetagem Madonna - Tratamento") as bilhetagem_madonna:
LABELS = get_current_flow_labels()

## RECAPTURA ##

run_recaptura_transacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=emd_constants.RJ_SMTR_AGENT_LABEL.value,
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_transacao = wait_for_flow_run(
run_recaptura_transacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

## MATERIALIZAÇÃO ##

# run_materializacao_madonna = create_flow_run(
# flow_name=bilhetagem_materializacao_madonna.name,
# # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
# project_name="staging",
# labels=LABELS,
# # upstream_tasks=[
# # wait_recaptura_transacao,
# # ],
# )

# wait_materializacao_madonna = wait_for_flow_run(
# run_materializacao_madonna,
# stream_states=True,
# stream_logs=True,
# raise_final_state=True,
# )

MODE = get_current_flow_mode(LABELS)
dbt_client = get_k8s_dbt_client(mode=MODE)
RUNS = run_dbt_model(
dbt_client=dbt_client,
dataset_id="dashboard_bilhetagem_madonna",
table_id="transacao_madonna",
upstream=True,
)

bilhetagem_madonna.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_madonna.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
# labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)

bilhetagem_madonna.schedule = every_10_minutes_dev
13 changes: 13 additions & 0 deletions pipelines/rj_smtr/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
]
)

every_10_minutes_dev = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=10),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
labels=[
emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value,
],
),
]
)

every_hour = Schedule(
clocks=[
Expand Down
Loading