From 830c52f98179b800a816981a367c7f4ab3c6da10 Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 23 Oct 2023 11:53:08 -0300 Subject: [PATCH] ajuste na logica de recaptura bilhetagem auxiliar --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 57 +++++++++------ pipelines/rj_smtr/constants.py | 4 + pipelines/rj_smtr/flows.py | 10 ++- pipelines/rj_smtr/tasks.py | 73 ++++++++++++++++++- 4 files changed, 117 insertions(+), 27 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 096e5d3e3..dd1c9865d 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -30,7 +30,7 @@ default_materialization_flow, ) -from pipelines.rj_smtr.tasks import get_current_timestamp +from pipelines.rj_smtr.tasks import get_rounded_timestamp, join_dicts from pipelines.rj_smtr.constants import constants @@ -114,7 +114,7 @@ ) as bilhetagem_transacao_tratamento: # Configuração # - timestamp = get_current_timestamp() + timestamp = get_rounded_timestamp(interval_minutes=60) rename_flow_run = rename_current_flow_run_now_time( prefix=bilhetagem_transacao_tratamento.name + " ", @@ -123,13 +123,17 @@ LABELS = get_current_flow_labels() - # Recapturas - + # Recaptura Transação + transacao_recapture_params = join_dicts( + original_dict=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value, + dict_to_join={"timestamp": timestamp}, + ) run_recaptura_trasacao = create_flow_run( flow_name=bilhetagem_recaptura.name, - project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + project_name="staging", labels=LABELS, - parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value, + parameters=transacao_recapture_params, ) wait_recaptura_trasacao = wait_for_flow_run( @@ -139,34 +143,41 @@ raise_final_state=True, ) - runs_recaptura_auxiliar = create_flow_run.map( - flow_name=unmapped(bilhetagem_recaptura.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + # Captura Auxiliar + + auxiliar_capture_params = join_dicts( + original_dict=constants.BILHETAGEM_CAPTURE_PARAMS.value, + dict_to_join={"timestamp": timestamp}, + ) + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_auxiliar_captura.name), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), + parameters=auxiliar_capture_params, labels=unmapped(LABELS), ) - runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao) - - wait_recaptura_auxiliar = wait_for_flow_run.map( - runs_recaptura_auxiliar, + wait_captura = wait_for_flow_run.map( + runs_captura, stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), ) - # Captura - runs_captura = create_flow_run.map( - flow_name=unmapped(bilhetagem_auxiliar_captura.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + # Recaptura Auxiliar + + runs_recaptura_auxiliar = create_flow_run.map( + flow_name=unmapped(bilhetagem_recaptura.name), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), + parameters=auxiliar_capture_params, labels=unmapped(LABELS), ) - runs_captura.set_upstream(wait_recaptura_auxiliar) + runs_recaptura_auxiliar.set_upstream(wait_captura) - wait_captura = wait_for_flow_run.map( - runs_captura, + wait_recaptura_auxiliar = wait_for_flow_run.map( + runs_recaptura_auxiliar, stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), @@ -190,6 +201,6 @@ bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) bilhetagem_transacao_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], ) bilhetagem_transacao_tratamento.schedule = every_hour diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 0f1c8dff0..63a959419 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -223,6 +223,7 @@ class constants(Enum): # pylint: disable=c0103 }, "primary_key": ["CD_LINHA"], # id column to nest data on "interval_minutes": 60, + "truncate_hour": True, }, { "table_id": "grupo", @@ -241,6 +242,7 @@ class constants(Enum): # pylint: disable=c0103 }, "primary_key": ["CD_GRUPO"], # id column to nest data on "interval_minutes": 60, + "truncate_hour": True, }, { "table_id": "grupo_linha", @@ -259,6 +261,7 @@ class constants(Enum): # pylint: disable=c0103 }, "primary_key": ["CD_GRUPO", "CD_LINHA"], "interval_minutes": 60, + "truncate_hour": True, }, { "table_id": "matriz_integracao", @@ -280,6 +283,7 @@ class constants(Enum): # pylint: disable=c0103 "cd_integracao", ], # id column to nest data on "interval_minutes": 60, + "truncate_hour": True, }, ] diff --git a/pipelines/rj_smtr/flows.py b/pipelines/rj_smtr/flows.py index 18a7fb1a3..7340e6a3b 100644 --- a/pipelines/rj_smtr/flows.py +++ b/pipelines/rj_smtr/flows.py @@ -26,7 +26,7 @@ from pipelines.rj_smtr.tasks import ( create_date_hour_partition, create_local_partition_path, - get_current_timestamp, + get_rounded_timestamp, parse_timestamp_to_string, transform_raw_to_nested_structure, create_dbt_run_vars, @@ -52,6 +52,7 @@ table_id = Parameter("table_id", default=None) dataset_id = Parameter("dataset_id", default=None) partition_date_only = Parameter("partition_date_only", default=None) + timestamp = Parameter("timestamp", default=None) # Parâmetros Captura # extract_params = Parameter("extract_params", default=None) @@ -70,16 +71,21 @@ checkpoint=False, ) + current_timestamp = get_rounded_timestamp( + timestamp=timestamp, interval_minutes=interval_minutes + ) + with case(recapture, True): _, recapture_timestamps, recapture_previous_errors = query_logs( dataset_id=dataset_id, table_id=table_id, + datetime_filter=current_timestamp, interval_minutes=interval_minutes, recapture_window_days=recapture_window_days, ) with case(recapture, False): - capture_timestamp = [get_current_timestamp()] + capture_timestamp = [current_timestamp] capture_previous_errors = task( lambda: [None], checkpoint=False, name="assign_none_to_previous_errors" )() diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 79cd84751..4bb7e481d 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -243,13 +243,54 @@ def create_dbt_run_vars( @task -def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime: +def get_rounded_timestamp( + timestamp: Union[str, datetime, None] = None, + interval_minutes: Union[int, None] = None, +) -> datetime: + """ + Calculate rounded timestamp for flow run. + + Args: + timestamp (Union[str, datetime, None]): timestamp to be used as reference + interval_minutes (Union[int, None], optional): interval in minutes between each recapture + + Returns: + datetime: timestamp for flow run + """ + if isinstance(timestamp, str): + timestamp = datetime.fromisoformat(timestamp) + + if not timestamp: + timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) + + timestamp = timestamp.replace(second=0, microsecond=0) + + if interval_minutes: + if interval_minutes >= 60: + hours = interval_minutes / 60 + interval_minutes = round(((hours) % 1) * 60) + + if interval_minutes == 0: + rounded_minutes = interval_minutes + else: + rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes + + timestamp = timestamp.replace(minute=rounded_minutes) + + return timestamp + + +@task +def get_current_timestamp( + timestamp=None, truncate_minute: bool = True, truncate_hour: bool = False +) -> datetime: """ Get current timestamp for flow run. Args: timestamp: timestamp to be used as reference (optionally, it can be a string) truncate_minute: whether to truncate the timestamp to the minute or not + truncate_hour: whether to truncate the timestamp to the hour or not Returns: datetime: timestamp for flow run @@ -259,7 +300,9 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) if truncate_minute: - return timestamp.replace(second=0, microsecond=0) + timestamp = timestamp.replace(second=0, microsecond=0) + if truncate_hour: + timestamp = timestamp.replace(minute=0) return timestamp @@ -385,6 +428,7 @@ def query_logs( max_recaptures (int, optional): maximum number of recaptures to be done interval_minutes (int, optional): interval in minutes between each recapture recapture_window_days (int, optional): Number of days to query for erros + truncate_hour: whether to truncate the timestamp to the hour or not Returns: lists: errors (bool), @@ -1265,3 +1309,28 @@ def unpack_mapped_results_nout2( """ return [r[0] for r in mapped_results], [r[1] for r in mapped_results] + + +@task(checkpoint=False) +def join_dicts( + original_dict: Union[dict, list[dict]], dict_to_join: dict +) -> Union[dict, list[dict]]: + """ + Task to join a dict or list of dicts with another dict + + Args: + original_dict (Union[dict, list[dict]]): The input dict or list of dicts + dict_to_join (dict): The dict to be joined with original_dict + + Returns: + Union[dict, list[dict]]: The joined value + """ + + if isinstance(original_dict, list): + return [d | dict_to_join for d in original_dict] + elif isinstance(original_dict, dict): + return original_dict | dict_to_join + else: + raise ValueError( + f"original_dict must be dict or list, received: {type(original_dict)}" + )