diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index d7f44e3b9..568f96154 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -7,26 +7,46 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from prefect.utilities.edges import unmapped # EMD Imports # from pipelines.constants import constants as emd_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.tasks import ( + rename_current_flow_run_now_time, + get_current_flow_labels, +) + + +from pipelines.utils.utils import set_default_parameters # SMTR Imports # -from pipelines.rj_smtr.flows import default_capture_flow +from pipelines.rj_smtr.flows import ( + default_capture_flow, + default_materialization_flow, +) + +from pipelines.rj_smtr.tasks import ( + get_current_timestamp, +) from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.schedules import ( - bilhetagem_principal_schedule, bilhetagem_transacao_schedule, ) +from pipelines.rj_smtr.constants import constants + +from pipelines.rj_smtr.schedules import every_hour + # Flows # # BILHETAGEM TRANSAÇÃO - CAPTURA A CADA MINUTO # bilhetagem_transacao_captura = deepcopy(default_capture_flow) -bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação (captura)" +bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação - Captura" bilhetagem_transacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) bilhetagem_transacao_captura.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, @@ -34,13 +54,91 @@ ) bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule -# BILHETAGEM PRINCIPAL - CAPTURA DIÁRIA DE DIVERSAS TABELAS # +# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # + +bilhetagem_auxiliar_captura = deepcopy(default_capture_flow) +bilhetagem_auxiliar_captura.name = "SMTR: Bilhetagem Auxiliar - Captura (subflow)" +bilhetagem_auxiliar_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_auxiliar_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_auxiliar_captura = set_default_parameters( + flow=bilhetagem_auxiliar_captura, + default_parameters={ + "dataset_id": constants.BILHETAGEM_DATASET_ID.value, + "secret_path": constants.BILHETAGEM_SECRET_PATH.value, + "source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"], + }, +) + +# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO +bilhetagem_materializacao = deepcopy(default_materialization_flow) +bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)" +bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_materializacao.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_materializacao_parameters = { + "dataset_id": constants.BILHETAGEM_DATASET_ID.value +} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value + +bilhetagem_materializacao = set_default_parameters( + flow=bilhetagem_materializacao, + default_parameters=bilhetagem_materializacao_parameters, +) + +# TRATAMENTO - RODA DE HORA EM HORA, CAPTURA AUXILIAR + MATERIALIZAÇÃO +with Flow( + "SMTR: Bilhetagem Transação - Tratamento", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as bilhetagem_transacao_tratamento: + timestamp = get_current_timestamp() + + rename_flow_run = rename_current_flow_run_now_time( + prefix=bilhetagem_transacao_tratamento.name + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + # Captura + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_auxiliar_captura.name), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) + + wait_captura = wait_for_flow_run.map( + runs_captura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + + # Materialização + run_materializacao = create_flow_run( + flow_name=bilhetagem_materializacao.name, + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + labels=LABELS, + upstream_tasks=[wait_captura], + ) + + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) -bilhetagem_principal_captura = deepcopy(default_capture_flow) -bilhetagem_principal_captura.name = "SMTR: Bilhetagem Principal (captura)" -bilhetagem_principal_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) -bilhetagem_principal_captura.run_config = KubernetesRun( +bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_transacao_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -bilhetagem_principal_captura.schedule = bilhetagem_principal_schedule +bilhetagem_transacao_tratamento.schedule = every_hour +# bilhetagem_materializacao.schedule = bilhetagem_materializacao_schedule diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py index 2f7804811..c2ee21164 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py @@ -15,27 +15,10 @@ generate_execute_schedules, ) -bilhetagem_principal_clocks = generate_execute_schedules( - clock_interval=timedelta( - **constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["principal_run_interval"] - ), - labels=[ - emd_constants.RJ_SMTR_AGENT_LABEL.value, - ], - table_parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, - dataset_id=constants.BILHETAGEM_DATASET_ID.value, - secret_path=constants.BILHETAGEM_SECRET_PATH.value, - source_type=constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"], - runs_interval_minutes=constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value[ - "principal_runs_interval_minutes" - ], -) - -bilhetagem_principal_schedule = Schedule(clocks=untuple(bilhetagem_principal_clocks)) - +BILHETAGEM_TRANSACAO_INTERVAL = timedelta(minutes=1) bilhetagem_transacao_clocks = generate_execute_schedules( clock_interval=timedelta( - **constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["transacao_run_interval"] + **constants.BILHETAGEM_CAPTURE_RUN_INTERVAL.value["transacao_run_interval"] ), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 52e30d9f8..ee8a22cd2 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -183,12 +183,15 @@ class constants(Enum): # pylint: disable=c0103 }, "vpn_url": "http://vpn-jae.mobilidade.rio/", "source_type": "api-json", - "transacao_run_interval": {"minutes": 1}, - "principal_run_interval": {"days": 1}, "transacao_runs_interval_minutes": 0, "principal_runs_interval_minutes": 5, } + BILHETAGEM_CAPTURE_RUN_INTERVAL = { + "transacao_run_interval": {"minutes": 1}, + "principal_run_interval": {"days": 1}, + } + BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = { "table_id": "transacao", "partition_date_only": False, @@ -203,11 +206,13 @@ class constants(Enum): # pylint: disable=c0103 data_processamento BETWEEN '{start}' AND '{end}' """, - "run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS["transacao_run_interval"], + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL["transacao_run_interval"], }, "primary_key": ["id"], # id column to nest data on } + BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" + BILHETAGEM_CAPTURE_PARAMS = [ { "table_id": "linha", @@ -222,7 +227,7 @@ class constants(Enum): # pylint: disable=c0103 WHERE DT_INCLUSAO >= '{start}' """, - "run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[ + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ "principal_run_interval" ], }, @@ -241,7 +246,7 @@ class constants(Enum): # pylint: disable=c0103 WHERE DT_INCLUSAO >= '{start}' """, - "run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[ + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ "principal_run_interval" ], }, @@ -260,7 +265,7 @@ class constants(Enum): # pylint: disable=c0103 WHERE DT_INCLUSAO >= '{start}' """, - "run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[ + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ "principal_run_interval" ], }, @@ -279,7 +284,7 @@ class constants(Enum): # pylint: disable=c0103 WHERE dt_inclusao >= '{start}' """, - "run_interval": BILHETAGEM_GENERAL_CAPTURE_PARAMS[ + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ "principal_run_interval" ], }, @@ -289,4 +294,15 @@ class constants(Enum): # pylint: disable=c0103 ], # id column to nest data on }, ] - BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" + + BILHETAGEM_MATERIALIZACAO_PARAMS = { + "table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"], + "upstream": True, + "dbt_vars": { + "date_range": { + "table_run_datetime_column_name": "datetime_transacao", + "delay_hours": 1, + }, + "version": {}, + }, + } diff --git a/pipelines/rj_smtr/flows.py b/pipelines/rj_smtr/flows.py index 4860c6d07..0efb69b17 100644 --- a/pipelines/rj_smtr/flows.py +++ b/pipelines/rj_smtr/flows.py @@ -5,7 +5,8 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS -from prefect import Parameter +from prefect import case, Parameter +from prefect.utilities.edges import unmapped # EMD Imports # @@ -13,7 +14,11 @@ from pipelines.utils.decorators import Flow from pipelines.utils.tasks import ( rename_current_flow_run_now_time, + get_now_time, + get_current_flow_labels, + get_current_flow_mode, ) +from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client # SMTR Imports # @@ -22,13 +27,17 @@ create_local_partition_path, get_current_timestamp, parse_timestamp_to_string, + transform_raw_to_nested_structure, + create_dbt_run_vars, + set_last_run_timestamp, + coalesce_task, upload_raw_data_to_gcs, upload_staging_data_to_gcs, - transform_raw_to_nested_structure, get_raw_from_sources, create_request_params, ) +from pipelines.utils.execute_dbt_model.tasks import run_dbt_model with Flow( "SMTR: Captura", @@ -114,3 +123,74 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) + +with Flow( + "SMTR: Materialização", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as default_materialization_flow: + # SETUP # + + dataset_id = Parameter("dataset_id", default=None) + table_id = Parameter("table_id", default=None) + raw_table_id = Parameter("raw_table_id", default=None) + dbt_alias = Parameter("dbt_alias", default=False) + upstream = Parameter("upstream", default=None) + downstream = Parameter("downstream", default=None) + exclude = Parameter("exclude", default=None) + flags = Parameter("flags", default=None) + dbt_vars = Parameter("dbt_vars", default=dict()) + + # treated_table_params = treat_dbt_table_params(table_params=table_params) + + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + + _vars, date_var, flag_date_range = create_dbt_run_vars( + dataset_id=dataset_id, + dbt_vars=dbt_vars, + table_id=table_id, + raw_dataset_id=dataset_id, + raw_table_id=raw_table_id, + mode=MODE, + ) + + # Rename flow run + + flow_name_prefix = coalesce_task([table_id, dataset_id]) + + flow_name_now_time = coalesce_task([date_var, get_now_time()]) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=default_materialization_flow.name + " " + flow_name_prefix + ": ", + now_time=flow_name_now_time, + ) + + dbt_client = get_k8s_dbt_client(mode=MODE, wait=rename_flow_run) + + RUNS = run_dbt_model.map( + dbt_client=unmapped(dbt_client), + dataset_id=unmapped(dataset_id), + table_id=unmapped(table_id), + _vars=_vars, + dbt_alias=unmapped(dbt_alias), + upstream=unmapped(upstream), + downstream=unmapped(downstream), + exclude=unmapped(exclude), + flags=unmapped(flags), + ) + + with case(flag_date_range, True): + set_last_run_timestamp( + dataset_id=dataset_id, + table_id=table_id, + timestamp=date_var["date_range_end"], + wait=RUNS, + mode=MODE, + ) + + +default_materialization_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +default_materialization_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index a846851b5..f7d687dea 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -8,7 +8,7 @@ import os from pathlib import Path import traceback -from typing import Dict, List, Union +from typing import Dict, List, Union, Iterable import io from basedosdados import Storage, Table @@ -28,6 +28,7 @@ get_last_run_timestamp, log_critical, data_info_str, + dict_contains_keys, get_raw_data_api, get_raw_data_gcs, upload_run_logs_to_bq, @@ -1095,3 +1096,117 @@ def transform_raw_to_nested_structure( log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error, filepath + + +@task(checkpoint=False) +def coalesce_task(value_list: Iterable): + """ + Task to get the first non None value of a list + + Args: + value_list (Iterable): a iterable object with the values + Returns: + any: value_list's first non None item + """ + + try: + return next(value for value in value_list if value is not None) + except StopIteration: + return + + +@task(checkpoint=False, nout=3) +def create_dbt_run_vars( + dataset_id: str, + dbt_vars: dict, + table_id: str, + raw_dataset_id: str, + raw_table_id: str, + mode: str, +) -> tuple[list[dict], Union[list[dict], dict, None], bool]: + """ + Create the variables to be used in dbt materialization based on a dict + + Args: + dataset_id (str): the dataset_id to get the variables + dbt_vars (dict): dict containing the parameters + table_id (str): the table_id get the date_range variable + raw_dataset_id (str): the raw_dataset_id get the date_range variable + raw_table_id (str): the raw_table_id get the date_range variable + mode (str): the mode to get the date_range variable + + Returns: + tuple[list[dict]: the variables to be used in DBT + Union[list[dict], dict, None]: the date variable (date_range or run_date) + bool: a flag that indicates if the date_range variable came from Redis + """ + + log(f"Creating DBT variables. Parameter received: {dbt_vars}") + + if (not dbt_vars) or (not table_id): + log("dbt_vars or table_id are blank. Skiping task") + return [None], None, False + + final_vars = [] + date_var = None + flag_date_range = False + + if "date_range" in dbt_vars.keys(): + log("Creating date_range variable") + + # Set date_range variable manually + if dict_contains_keys( + dbt_vars["date_range"], ["date_range_start", "date_range_end"] + ): + date_var = { + "date_range_start": dbt_vars["date_range"]["date_range_start"], + "date_range_end": dbt_vars["date_range"]["date_range_end"], + } + # Create date_range using Redis + else: + raw_table_id = raw_table_id or table_id + + date_var = get_materialization_date_range.run( + dataset_id=dataset_id, + table_id=table_id, + raw_dataset_id=raw_dataset_id, + raw_table_id=raw_table_id, + table_run_datetime_column_name=dbt_vars["date_range"].get( + "table_run_datetime_column_name" + ), + mode=mode, + delay_hours=dbt_vars["date_range"].get("delay_hours", 0), + ) + + flag_date_range = True + + final_vars.append(date_var.copy()) + + log(f"date_range created: {date_var}") + + elif "run_date" in dbt_vars.keys(): + log("Creating run_date variable") + + date_var = get_run_dates.run( + dbt_vars["run_date"].get("date_range_start"), + dbt_vars["run_date"].get("date_range_end"), + ) + final_vars.append([d.copy() for d in date_var]) + + log(f"run_date created: {date_var}") + + if "version" in dbt_vars.keys(): + log("Creating version variable") + dataset_sha = fetch_dataset_sha.run(dataset_id=dataset_id) + + # if there are other variables inside the list, update each item adding the version variable + if final_vars: + final_vars = get_join_dict.run(dict_list=final_vars, new_dict=dataset_sha) + else: + final_vars.append(dataset_sha) + + log(f"version created: {dataset_sha}") + + log(f"All variables was created, final value is: {final_vars}") + + return final_vars, date_var, flag_date_range diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 1d71dd3dd..f9b98afab 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -434,7 +434,6 @@ def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-l clocks = [] for count, parameters in enumerate(table_parameters): parameter_defaults = parameters | general_flow_params - log(f"parameter_defaults: {parameter_defaults}") clocks.append( IntervalClock( @@ -448,6 +447,19 @@ def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-l return clocks +def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool: + """ + Test if the input dict has all keys present in the list + + Args: + input_dict (dict): the dict to test if has the keys + keys (list[str]): the list containing the keys to check + Returns: + bool: True if the input_dict has all the keys otherwise False + """ + return all(x in input_dict.keys() for x in keys) + + def save_raw_local_func( data: Union[dict, str], filepath: str, mode: str = "raw", filetype: str = "json" ) -> str: