Skip to content

Commit

Permalink
Cria Lógica de Recaptura / Bilhetagem Site-to-Site (#530)
Browse files Browse the repository at this point in the history
* remove task de particao nao usada

* unifica tasks de particao de data e hora

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* corrige condicional

* change capture flow

* change generic capture flow

* atualiza esquema do flow padrao

* change default capture flow structure

* change generic capture flow

* adjust constant structure

* change bilhetagem to new capture flow structure

* fix get_storage_blob function

* fix get_storage_blob call

* organize constants order

* fix get_raw_from_sources function call

* change transform_raw_to_json to read_raw_data

* transform transform_raw_data_to_json to read_raw_data

* fix nout task parameter

* fix timedelta instantiation

* set upstream tasks

* declare raw_filepath

* update docstrings

* adjust get_raw_from_sources return

* fix errors

* change agent label to dev

* refactore source values

* update constants

* update agent

* update schedule params

* update interval

* fix get_datetime_range interval

* remove order by from queries

* fix get_raw_data_api

* change json read function

* update read_raw_data

* update save_raw_local_func

* log error

* change raw api extraction for json

* change read json function

* print log traceback

* skip pre treatment if empty df

* skip save staging if dataframe is empty / save raw

* remove skip upload if empty dataframe

* update docstring and returned values

* reorganize task order

* fix tuple

* change zip logic

* remove skip

* create gtfs zip constant

* add gtfs zip file name

* add csv to save raw / change filetype logic

* remove comments

* fix csv_args default value

* change docstring get raw api

* change raw data gcs docstring

* remove commented task

* change quadro primary key to list

* update GTFS constants

* change upload folder structure

* undo silenciamento de falha de notificação

* remove parametros de testes (gtfs)

* Update pipelines/rj_smtr/constants.py

Co-authored-by: Fernanda Scovino <[email protected]>

* corrige encadeamento de erros no flow

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove header treatment

* mudar agent dev para prd

* mudar agent de dev para prd

* ajustar retorno das funcoes

* Atualiza documentação

* adicionar retorno em get_upload_storage_blob

* Atualiza documentação

* Atualiza string

* adiciona recaptura no flow generico

* alterar labels para dev

* adicionar logica de recaptura

* criar conexão com banco de dados

* criar conexão com banco de dados

* cria função para map de multiplos retornos

* remover unmapped dos filepaths

* log para debbug

* retirar unmapped das partições

* adicionar unmapped no parametro recapture

* adicionar psycopg2

* comentários dos parametros

* adicionar conexão com postgresql

* mudar bilhetagem para extrair do db

* padronizar nomenclatura dos argumentos

* mudar label schedule para dev

* corrigir constante db bilhetagem postgresql

* alterar nomeação para runs de recaptura

* ajuste connector

* alterar IP para DNS

* Serialize datetime objects / read sql with pandas

* mudar logica do nome da run

* cria recaptura bilhetagem

* mudar host para IP / adiciona interval_minutes

* adiciona parametro interval minutes

* remove linha comentada

* remove arquivo de schedules da bilhetagem

* generaliza função query logs

* ajuste remove schedule personalizado

* unmap interval_minutes

* alteração de pasta de gravação para teste

* teste retirar timezone

* mudar timezone

* corrigir logica de recaptura

* adicionar possibilidade de recapturar mais dias

* ajustar recapture_window_days default

* adicionae recapture_window na task query_logs

* merge previous_errors

* remover log de teste

* ajustar log recaptura

* adicionar recaptura auxiliar

* criar parametros recaptura tabelas auxiliares

* comentar materializacao

* teste log

* muda logica recaptura bilhetagem

* unmapped upstream tasks

* mudar forma de upstream

* remover alterações de teste

* mudar agent para prd

* corrigir project_name

* passar tirar query_logs_func

* corrigir project_name

* remover comentários

* remover query_logs_func

* aumentar max_recaptures

---------

Co-authored-by: fernandascovino <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: eng-rodrigocunha <[email protected]>
Co-authored-by: Carolina Gomes <[email protected]>
Co-authored-by: Rodrigo Cunha <[email protected]>
  • Loading branch information
7 people authored Oct 19, 2023
1 parent 95c577b commit 9c8a092
Show file tree
Hide file tree
Showing 8 changed files with 619 additions and 282 deletions.
83 changes: 67 additions & 16 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped


# EMD Imports #

from pipelines.constants import constants as emd_constants
Expand All @@ -29,17 +30,18 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import (
get_current_timestamp,
)

from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.schedules import (
bilhetagem_transacao_schedule,
)
from pipelines.rj_smtr.tasks import get_current_timestamp

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour
from pipelines.rj_smtr.schedules import every_hour, every_minute


GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
}

# Flows #

Expand All @@ -52,7 +54,15 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule

bilhetagem_transacao_captura = set_default_parameters(
flow=bilhetagem_transacao_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

bilhetagem_transacao_captura.schedule = every_minute


# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

Expand All @@ -66,11 +76,7 @@

bilhetagem_auxiliar_captura = set_default_parameters(
flow=bilhetagem_auxiliar_captura,
default_parameters={
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
},
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
Expand All @@ -91,11 +97,23 @@
default_parameters=bilhetagem_materializacao_parameters,
)

# TRATAMENTO - RODA DE HORA EM HORA, CAPTURA AUXILIAR + MATERIALIZAÇÃO
# RECAPTURA

bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)"
bilhetagem_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_recaptura = set_default_parameters(
flow=bilhetagem_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS | {"recapture": True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
with Flow(
"SMTR: Bilhetagem Transação - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
Expand All @@ -105,6 +123,38 @@

LABELS = get_current_flow_labels()

# Recapturas

run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_trasacao = wait_for_flow_run(
run_recaptura_trasacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
Expand All @@ -113,6 +163,8 @@
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)

wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
Expand Down Expand Up @@ -141,4 +193,3 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour
# bilhetagem_materializacao.schedule = bilhetagem_materializacao_schedule
33 changes: 0 additions & 33 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py

This file was deleted.

40 changes: 13 additions & 27 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,18 @@ class constants(Enum): # pylint: disable=c0103
"databases": {
"principal_db": {
"engine": "mysql",
"host": "principal-database-replica.internal",
"host": "10.5.114.121",
},
"tarifa_db": {
"engine": "postgres",
"host": "tarifa-database-replica.internal",
"engine": "postgresql",
"host": "10.5.113.254",
},
"transacao_db": {
"engine": "postgres",
"host": "transacao-database-replica.internal",
"engine": "postgresql",
"host": "10.5.115.1",
},
},
"vpn_url": "http://vpn-jae.mobilidade.rio/",
"source_type": "api-json",
}

BILHETAGEM_CAPTURE_RUN_INTERVAL = {
"transacao_run_interval": {"minutes": 1},
"principal_run_interval": {"hours": 1},
"source_type": "db",
}

BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = {
Expand All @@ -204,9 +198,9 @@ class constants(Enum): # pylint: disable=c0103
data_processamento BETWEEN '{start}'
AND '{end}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL["transacao_run_interval"],
},
"primary_key": ["id"], # id column to nest data on
"primary_key": ["id"],
"interval_minutes": 1,
}

BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"
Expand All @@ -225,11 +219,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
},
{
"table_id": "grupo",
Expand All @@ -244,11 +236,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
},
{
"table_id": "grupo_linha",
Expand All @@ -263,11 +253,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_GRUPO", "CD_LINHA"], # id column to nest data on
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
},
{
"table_id": "matriz_integracao",
Expand All @@ -282,14 +270,12 @@ class constants(Enum): # pylint: disable=c0103
WHERE
dt_inclusao >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": [
"cd_versao_matriz",
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
},
]

Expand Down
Loading

0 comments on commit 9c8a092

Please sign in to comment.