diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 096e5d3e3..04f6eb61f 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -30,7 +30,7 @@ default_materialization_flow, ) -from pipelines.rj_smtr.tasks import get_current_timestamp +from pipelines.rj_smtr.tasks import get_rounded_timestamp from pipelines.rj_smtr.constants import constants @@ -63,6 +63,23 @@ bilhetagem_transacao_captura.schedule = every_minute +# BILHETAGEM GPS + +bilhetagem_tracking_captura = deepcopy(default_capture_flow) +bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura" +bilhetagem_tracking_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_tracking_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_tracking_captura = set_default_parameters( + flow=bilhetagem_tracking_captura, + default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS + | constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value, +) + +bilhetagem_tracking_captura.schedule = every_minute # BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # @@ -114,7 +131,9 @@ ) as bilhetagem_transacao_tratamento: # Configuração # - timestamp = get_current_timestamp() + timestamp = get_rounded_timestamp( + interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value + ) rename_flow_run = rename_current_flow_run_now_time( prefix=bilhetagem_transacao_tratamento.name + " ", @@ -123,7 +142,7 @@ LABELS = get_current_flow_labels() - # Recapturas + # Recaptura Transação run_recaptura_trasacao = create_flow_run( flow_name=bilhetagem_recaptura.name, @@ -139,34 +158,35 @@ raise_final_state=True, ) - runs_recaptura_auxiliar = create_flow_run.map( - flow_name=unmapped(bilhetagem_recaptura.name), + # Captura Auxiliar + + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_auxiliar_captura.name), project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) - runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao) - - wait_recaptura_auxiliar = wait_for_flow_run.map( - runs_recaptura_auxiliar, + wait_captura = wait_for_flow_run.map( + runs_captura, stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), ) - # Captura - runs_captura = create_flow_run.map( - flow_name=unmapped(bilhetagem_auxiliar_captura.name), + # Recaptura Auxiliar + + runs_recaptura_auxiliar = create_flow_run.map( + flow_name=unmapped(bilhetagem_recaptura.name), project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) - runs_captura.set_upstream(wait_recaptura_auxiliar) + runs_recaptura_auxiliar.set_upstream(wait_captura) - wait_captura = wait_for_flow_run.map( - runs_captura, + wait_recaptura_auxiliar = wait_for_flow_run.map( + runs_recaptura_auxiliar, stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), @@ -193,3 +213,35 @@ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) bilhetagem_transacao_tratamento.schedule = every_hour + + +with Flow( + "SMTR: Bilhetagem GPS Validador - Tratamento", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as bilhetagem_gps_tratamento: + timestamp = get_rounded_timestamp( + interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value + ) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=bilhetagem_transacao_tratamento.name + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + # Recaptura GPS + + run_recaptura_gps = create_flow_run( + flow_name=bilhetagem_recaptura.name, + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + labels=LABELS, + parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value, + ) + + wait_recaptura_gps = wait_for_flow_run( + run_recaptura_gps, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index ccf1c6c44..a4a8c375f 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -180,6 +180,10 @@ class constants(Enum): # pylint: disable=c0103 "engine": "postgresql", "host": "10.5.115.1", }, + "tracking_db": { + "engine": "postgresql", + "host": "10.5.15.25", + }, }, "source_type": "db", } @@ -203,8 +207,29 @@ class constants(Enum): # pylint: disable=c0103 "interval_minutes": 1, } + BILHETAGEM_TRACKING_CAPTURE_PARAMS = { + "table_id": "gps_validador", + "partition_date_only": False, + "extract_params": { + "database": "tracking_db", + "query": """ + SELECT + * + FROM + tracking_detalhe + WHERE + data_tracking BETWEEN '{start}' + AND '{end}' + """, + }, + "primary_key": ["id"], + "interval_minutes": 1, + } + BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" + BILHETAGEM_TRATAMENTO_INTERVAL = 60 + BILHETAGEM_CAPTURE_PARAMS = [ { "table_id": "linha", @@ -217,11 +242,13 @@ class constants(Enum): # pylint: disable=c0103 FROM LINHA WHERE - DT_INCLUSAO >= '{start}' + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' """, }, "primary_key": ["CD_LINHA"], # id column to nest data on - "interval_minutes": 60, + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + "truncate_hour": True, }, { "table_id": "grupo", @@ -234,11 +261,13 @@ class constants(Enum): # pylint: disable=c0103 FROM GRUPO WHERE - DT_INCLUSAO >= '{start}' + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' """, }, "primary_key": ["CD_GRUPO"], # id column to nest data on - "interval_minutes": 60, + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + "truncate_hour": True, }, { "table_id": "grupo_linha", @@ -251,11 +280,13 @@ class constants(Enum): # pylint: disable=c0103 FROM GRUPO_LINHA WHERE - DT_INCLUSAO >= '{start}' + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' """, }, "primary_key": ["CD_GRUPO", "CD_LINHA"], - "interval_minutes": 60, + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + "truncate_hour": True, }, { "table_id": "matriz_integracao", @@ -268,14 +299,16 @@ class constants(Enum): # pylint: disable=c0103 FROM matriz_integracao WHERE - dt_inclusao >= '{start}' + dt_inclusao BETWEEN '{start}' + AND '{end}' """, }, "primary_key": [ "cd_versao_matriz", "cd_integracao", ], # id column to nest data on - "interval_minutes": 60, + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + "truncate_hour": True, }, ] diff --git a/pipelines/rj_smtr/flows.py b/pipelines/rj_smtr/flows.py index 18a7fb1a3..0dddf166b 100644 --- a/pipelines/rj_smtr/flows.py +++ b/pipelines/rj_smtr/flows.py @@ -26,7 +26,7 @@ from pipelines.rj_smtr.tasks import ( create_date_hour_partition, create_local_partition_path, - get_current_timestamp, + get_rounded_timestamp, parse_timestamp_to_string, transform_raw_to_nested_structure, create_dbt_run_vars, @@ -70,16 +70,19 @@ checkpoint=False, ) + current_timestamp = get_rounded_timestamp(interval_minutes=interval_minutes) + with case(recapture, True): _, recapture_timestamps, recapture_previous_errors = query_logs( dataset_id=dataset_id, table_id=table_id, + datetime_filter=current_timestamp, interval_minutes=interval_minutes, recapture_window_days=recapture_window_days, ) with case(recapture, False): - capture_timestamp = [get_current_timestamp()] + capture_timestamp = [current_timestamp] capture_previous_errors = task( lambda: [None], checkpoint=False, name="assign_none_to_previous_errors" )() diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 79cd84751..236988282 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -242,6 +242,44 @@ def create_dbt_run_vars( ############### +@task +def get_rounded_timestamp( + timestamp: Union[str, datetime, None] = None, + interval_minutes: Union[int, None] = None, +) -> datetime: + """ + Calculate rounded timestamp for flow run. + + Args: + timestamp (Union[str, datetime, None]): timestamp to be used as reference + interval_minutes (Union[int, None], optional): interval in minutes between each recapture + + Returns: + datetime: timestamp for flow run + """ + if isinstance(timestamp, str): + timestamp = datetime.fromisoformat(timestamp) + + if not timestamp: + timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) + + timestamp = timestamp.replace(second=0, microsecond=0) + + if interval_minutes: + if interval_minutes >= 60: + hours = interval_minutes / 60 + interval_minutes = round(((hours) % 1) * 60) + + if interval_minutes == 0: + rounded_minutes = interval_minutes + else: + rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes + + timestamp = timestamp.replace(minute=rounded_minutes) + + return timestamp + + @task def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime: """ @@ -260,7 +298,6 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) if truncate_minute: return timestamp.replace(second=0, microsecond=0) - return timestamp @task