Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Criar Captura e Materialização Ressarcimento #541

Merged
merged 34 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4f5007c
adicionar tabelas de ressarcimento
pixuimpou Oct 26, 2023
b0107fe
criar flow ressarcimento
pixuimpou Oct 26, 2023
b234c00
alterar partition
pixuimpou Oct 26, 2023
2bbbfa6
mudar projeto para staging
pixuimpou Oct 26, 2023
8309ceb
adicionar serializacao de date
pixuimpou Oct 26, 2023
431ff17
corrigir parametros das tabelas
pixuimpou Oct 26, 2023
a9c0d2d
Merge branch 'master' into staging/bilhetagem-jae-liquidacao
pixuimpou Oct 27, 2023
994bb44
adicionar tabela auxiliar consorcio
pixuimpou Oct 27, 2023
a4097fb
Merge branch 'staging/bilhetagem-jae-liquidacao' of https://github.co…
pixuimpou Oct 27, 2023
a361b96
ajustar default params
pixuimpou Oct 27, 2023
ffed6cd
Merge branch 'master' into staging/bilhetagem-jae-liquidacao
mergify[bot] Oct 27, 2023
59ea93a
adicionar pessoa juridica e operadora transporte
pixuimpou Nov 1, 2023
1887123
Merge branch 'master' into staging/bilhetagem-jae-liquidacao
mergify[bot] Nov 1, 2023
c41ca0f
adicionar tabela linha consorcio
pixuimpou Nov 1, 2023
29609d4
Merge branch 'staging/bilhetagem-jae-liquidacao' of https://github.co…
pixuimpou Nov 1, 2023
4c98fee
Merge branch 'master' into staging/bilhetagem-jae-liquidacao
mergify[bot] Nov 1, 2023
f58147f
adicionar incremental na captura liquidação
pixuimpou Nov 1, 2023
dd3e8fe
criar flow liquidação
pixuimpou Nov 1, 2023
dc587e5
Merge branch 'staging/bilhetagem-jae-liquidacao' of https://github.co…
pixuimpou Nov 1, 2023
370acba
adicionar parametros de controle de run
pixuimpou Nov 1, 2023
1c68bff
comentar source tables para teste
pixuimpou Nov 1, 2023
cd7781c
alterar agent para dev
pixuimpou Nov 1, 2023
4eddb6e
adicionar parametros default materialização
pixuimpou Nov 1, 2023
f185477
usar campo de data no daterange
pixuimpou Nov 1, 2023
4a22f37
descomentar parametros teste
pixuimpou Nov 6, 2023
feb0856
corrigir nout
pixuimpou Nov 6, 2023
40da664
adicionar log para testes
pixuimpou Nov 6, 2023
ab0bc7a
corrigir nome subflow
pixuimpou Nov 6, 2023
32a3b52
adicionar schedule e reference tasks
pixuimpou Nov 7, 2023
9dbfb15
ajuste função reference task
pixuimpou Nov 7, 2023
21b5f6f
mudar ambiente para prd
pixuimpou Nov 7, 2023
9a1334e
alterar query logs para escritas concorrentes
pixuimpou Nov 8, 2023
6d7b156
alterações review
pixuimpou Nov 8, 2023
7a3ad6e
remover imports não utilizados
pixuimpou Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 174 additions & 19 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour, every_minute
from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day_hour_five

# Flows #

Expand All @@ -58,7 +58,7 @@

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM GPS
# BILHETAGEM GPS - CAPTURA A CADA MINUTO #

bilhetagem_tracking_captura = deepcopy(default_capture_flow)
bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura"
Expand All @@ -76,6 +76,23 @@

bilhetagem_tracking_captura.schedule = every_minute

# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE #

bilhetagem_ressarcimento_captura = deepcopy(default_capture_flow)
bilhetagem_ressarcimento_captura.name = (
"SMTR: Bilhetagem Ressarcimento - Captura (subflow)"
)
bilhetagem_ressarcimento_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_ressarcimento_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_ressarcimento_captura = set_default_parameters(
flow=bilhetagem_ressarcimento_captura,
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)

# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

bilhetagem_auxiliar_captura = deepcopy(default_capture_flow)
Expand All @@ -91,16 +108,20 @@
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
bilhetagem_materializacao = deepcopy(default_materialization_flow)
bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)"
bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao.run_config = KubernetesRun(
# MATERIALIZAÇÃO #

# Transação
bilhetagem_materializacao_transacao = deepcopy(default_materialization_flow)
bilhetagem_materializacao_transacao.name = (
"SMTR: Bilhetagem Transação - Materialização (subflow)"
)
bilhetagem_materializacao_transacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao_transacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_materializacao_parameters = {
bilhetagem_materializacao_transacao_parameters = {
"source_table_ids": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
]
Expand All @@ -109,15 +130,52 @@
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"]
]
+ [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value],
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value
} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value

bilhetagem_materializacao = set_default_parameters(
flow=bilhetagem_materializacao,
default_parameters=bilhetagem_materializacao_parameters,
bilhetagem_materializacao_transacao = set_default_parameters(
flow=bilhetagem_materializacao_transacao,
default_parameters=bilhetagem_materializacao_transacao_parameters,
)

# RECAPTURA
# Ordem Pagamento

bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow)
bilhetagem_materializacao_ordem_pagamento.name = (
"SMTR: Bilhetagem Ordem Pagamento - Materialização (subflow)"
)
bilhetagem_materializacao_ordem_pagamento.storage = GCS(
emd_constants.GCS_FLOWS_BUCKET.value
)
bilhetagem_materializacao_ordem_pagamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_materializacao_ordem_pagamento_parameters = {
"source_table_ids": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
]
+ [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ [
d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
],
"capture_intervals_minutes": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"]
]
+ [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ [
d["interval_minutes"]
for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
],
} | constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value

bilhetagem_materializacao_ordem_pagamento = set_default_parameters(
flow=bilhetagem_materializacao_ordem_pagamento,
default_parameters=bilhetagem_materializacao_ordem_pagamento_parameters,
)


# RECAPTURA #

bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)"
Expand All @@ -128,10 +186,11 @@
| {"recapture": True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO #

with Flow(
"SMTR: Bilhetagem Transação - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_transacao_tratamento:
# Configuração #

Expand Down Expand Up @@ -220,7 +279,7 @@
with case(materialize, True):
# Materialização
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao.name,
flow_name=bilhetagem_materializacao_transacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
Expand All @@ -229,7 +288,9 @@
wait_recaptura_transacao,
],
parameters={
"timestamp": get_current_timestamp(timestamp=timestamp, return_str=True)
"timestamp": get_current_timestamp(
timestamp=timestamp, return_str=True
),
},
)

Expand All @@ -250,7 +311,7 @@

with Flow(
"SMTR: Bilhetagem GPS Validador - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
Expand Down Expand Up @@ -286,3 +347,97 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_gps_tratamento.schedule = every_hour

with Flow(
"SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_ordem_pagamento_captura_tratamento:
capture = Parameter("capture", default=True)
materialize = Parameter("materialize", default=True)

timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_ordem_pagamento_captura_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Captura #
with case(capture, True):
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_ressarcimento_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Recaptura #

runs_recaptura = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura.set_upstream(wait_captura)

wait_recaptura_true = wait_for_flow_run.map(
runs_recaptura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

with case(capture, False):
wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")()

wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)

# Materialização #

with case(materialize, True):
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao_ordem_pagamento.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_recaptura],
parameters={
"timestamp": get_current_timestamp(
timestamp=timestamp, return_str=True
),
},
)

wait_materializacao = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
[wait_materializacao, wait_recaptura]
)

bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(
emd_constants.GCS_FLOWS_BUCKET.value
)
bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)


bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day_hour_five
Loading