From e70db4e3946db2a9071d5c5f4903260a4510ae70 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 01:40:52 -0300 Subject: [PATCH 01/43] initial commit --- .../br_rj_riodejaneiro_onibus_gps/flows.py | 36 ++++++++++++- .../br_rj_riodejaneiro_onibus_gps/tasks.py | 53 ++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 299c58d48..2efa76653 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -47,6 +47,7 @@ create_api_url_onibus_gps, create_api_url_onibus_realocacao, pre_treatment_br_rj_riodejaneiro_onibus_realocacao, + get_realocacao_recapture_timestamps, ) from pipelines.rj_smtr.schedules import ( @@ -79,7 +80,7 @@ rebuild = Parameter("rebuild", False) # SETUP - timestamp = get_current_timestamp() + timestamp = Parameter("timestamp", default=get_current_timestamp.run()) rename_flow_run = rename_current_flow_run_now_time( prefix="GPS SPPO - Realocação: ", now_time=timestamp @@ -130,11 +131,42 @@ realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) realocacao_sppo.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], ) realocacao_sppo.schedule = every_10_minutes +with Flow( + "[Teste] SMTR: GPS SPPO - Realocação (recaptura)", + code_owners=["rodrigo"], +) as realocacao_sppo_recaptura: + + start_date = Parameter("start_date", default="") + end_date = Parameter("end_date", default="") + + timestamps = get_realocacao_recapture_timestamps(start_date, end_date) + + GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( + flow_name=unmapped(realocacao_sppo.name), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + run_name=unmapped(realocacao_sppo.name), + parameters=dict(timestamp=timestamps), + ) + + wait_for_flow_run( + GPS_SPPO_REALOCACAO_RUN, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +realocacao_sppo_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +realocacao_sppo_recaptura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) + + with Flow( "SMTR: GPS SPPO - Materialização", code_owners=["caio", "fernanda", "boris", "rodrigo"], diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index c562ae1f3..3f29bce4b 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -5,14 +5,14 @@ import traceback from datetime import datetime, timedelta -from typing import Dict +from typing import Dict, List import pandas as pd from prefect import task import pendulum # EMD Imports # -from pipelines.utils.utils import log, get_vault_secret +from pipelines.utils.utils import log, get_vault_secret, list_blobs_with_prefix # SMTR Imports # @@ -254,3 +254,52 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps( log(f"[CATCHED] Task failed with error: \n{error}", level="error") return {"data": df_gps, "error": error} + + +@task +def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: + """Get all timestamps from realocacao recapture files in a given date range. + + Args: + start_date (str): Start date in format YYYY-MM-DD + end_date (str): End date in format YYYY-MM-DD + + Returns: + timestamps (list): List of timestamps in format YYYY-MM-DD HH:MM:SS + """ + + timestamps = [] + + dates = pd.date_range(start=start_date, end=end_date) + + dataset_id = constants.GPS_SPPO_RAW_DATASET_ID.value + table_id = constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value + + dates = dates.strftime("%Y-%m-%d").to_list() + + for date in dates: + # horas = range(0,24) + horas = [0] + for hora in horas: + # minutos = range(0,60,10) + minutos = [0] + for minuto in minutos: + prefix = f"""raw/ + {dataset_id}/ + {table_id}/ + data={date}/ + hora={hora:02}/ + {date}-{hora:02}-{minuto:02}-00.json""" + blobs_list = list_blobs_with_prefix( + bucket_name="rj-smtr-dev", prefix=prefix, mode="staging" + ) + + if len(blobs_list) == 0: + timestamps.append(f"{date} {hora:02}:{minuto:02}:00") + + log( + f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n + {timestamps}""" + ) + + return timestamps From fedf057dd844e642f8dcd7023558d36d349bb45f Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 01:59:41 -0300 Subject: [PATCH 02/43] fix docs for get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 3f29bce4b..e02e3c06b 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -258,7 +258,7 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps( @task def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: - """Get all timestamps from realocacao recapture files in a given date range. + """Get timestamps with no file in realocacao to recapture it in a given date range. Args: start_date (str): Start date in format YYYY-MM-DD From 8bd0fe25df45fc9639d22336e3bf456be79f6e3a Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 14:28:12 -0300 Subject: [PATCH 03/43] add task in rj_smtr --- pipelines/rj_smtr/tasks.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 6ebd4ef79..6a5481754 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -770,3 +770,11 @@ def get_previous_date(days): now = pendulum.now(pendulum.timezone("America/Sao_Paulo")).subtract(days=days) return now.to_date_string() + + +@task +def check_param(param: str) -> bool: + """ + Check if param is None + """ + return param is None From 73c6c1d8cbaaaf121e7950b0588be3b397c63596 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 14:29:38 -0300 Subject: [PATCH 04/43] refactor timestamp param treating in realocacao --- .../br_rj_riodejaneiro_onibus_gps/flows.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 2efa76653..0ce7e2ce5 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -4,6 +4,7 @@ """ from prefect import Parameter, case +from prefect.tasks.control_flow import merge from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run @@ -41,6 +42,7 @@ set_last_run_timestamp, upload_logs_to_bq, bq_upload, + check_param, ) from pipelines.rj_smtr.br_rj_riodejaneiro_onibus_gps.tasks import ( pre_treatment_br_rj_riodejaneiro_onibus_gps, @@ -78,9 +80,18 @@ "table_id", default=constants.GPS_SPPO_REALOCACAO_TREATED_TABLE_ID.value ) rebuild = Parameter("rebuild", False) + timestamp_param = Parameter("timestamp", None) # SETUP - timestamp = Parameter("timestamp", default=get_current_timestamp.run()) + timestamp_cond = check_param(timestamp_param) + + with case(timestamp_cond, True): + timestamp_get = get_current_timestamp() + + with case(timestamp_cond, False): + timestamp_def = get_current_timestamp(timestamp_param) + + timestamp = merge(timestamp_get, timestamp_def) rename_flow_run = rename_current_flow_run_now_time( prefix="GPS SPPO - Realocação: ", now_time=timestamp @@ -135,15 +146,20 @@ ) realocacao_sppo.schedule = every_10_minutes - +REALOCACAO_SPPO_RECAPTURA_NAME = "[Teste] SMTR: GPS SPPO - Realocação (recaptura)" with Flow( - "[Teste] SMTR: GPS SPPO - Realocação (recaptura)", + REALOCACAO_SPPO_RECAPTURA_NAME, code_owners=["rodrigo"], ) as realocacao_sppo_recaptura: start_date = Parameter("start_date", default="") end_date = Parameter("end_date", default="") + rename_flow_run = rename_current_flow_run_now_time( + prefix=f"{REALOCACAO_SPPO_RECAPTURA_NAME}: ", + now_time=f"{start_date}-{end_date}", + ) + timestamps = get_realocacao_recapture_timestamps(start_date, end_date) GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( From 9605a49d73f022693d3cc04062b61c625168e7b0 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 14:30:54 -0300 Subject: [PATCH 05/43] change task importing package --- pipelines/rj_smtr/projeto_subsidio_sppo/flows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index 61e3e05dc..ea480391e 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -32,6 +32,7 @@ get_run_dates, get_join_dict, get_previous_date, + check_param, # get_local_dbt_client, # set_last_run_timestamp, ) @@ -50,8 +51,6 @@ ) from pipelines.utils.execute_dbt_model.tasks import run_dbt_model -from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import check_param - # Flows # with Flow( From bc532ea68a49bcf3d691e24b17ce392f3a71f22a Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 14:31:41 -0300 Subject: [PATCH 06/43] Remove task from projeto_subsidio_sppo --- pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py b/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py index cfc685e97..dd91da1d4 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py @@ -2,13 +2,3 @@ """ Tasks for projeto_subsidio_sppo """ - -from prefect import task - - -@task -def check_param(param: str) -> bool: - """ - Check if param is None - """ - return param is None From f2c97d91175fe81708ae8ded39206522b8a227b3 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 14:55:43 -0300 Subject: [PATCH 07/43] update params in realocacao_sppo_recaptura --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 0ce7e2ce5..19b58c368 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -157,7 +157,7 @@ rename_flow_run = rename_current_flow_run_now_time( prefix=f"{REALOCACAO_SPPO_RECAPTURA_NAME}: ", - now_time=f"{start_date}-{end_date}", + now_time=f"{get_current_timestamp.run(start_date)}-{get_current_timestamp.run(end_date)}", ) timestamps = get_realocacao_recapture_timestamps(start_date, end_date) @@ -166,7 +166,7 @@ flow_name=unmapped(realocacao_sppo.name), project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), run_name=unmapped(realocacao_sppo.name), - parameters=dict(timestamp=timestamps), + parameters=timestamps, ) wait_for_flow_run( From c07497fa3ca8515b9635729dc63428a626661103 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 15:00:01 -0300 Subject: [PATCH 08/43] fix renaming flow realocacao_sppo_recaptura --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 19b58c368..769b1ed20 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -155,11 +155,6 @@ start_date = Parameter("start_date", default="") end_date = Parameter("end_date", default="") - rename_flow_run = rename_current_flow_run_now_time( - prefix=f"{REALOCACAO_SPPO_RECAPTURA_NAME}: ", - now_time=f"{get_current_timestamp.run(start_date)}-{get_current_timestamp.run(end_date)}", - ) - timestamps = get_realocacao_recapture_timestamps(start_date, end_date) GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( From 69116e5dfd32beace74cf917709286fd9f203d32 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 15:04:53 -0300 Subject: [PATCH 09/43] update task get_realocacao_recapture_timestamps --- .../br_rj_riodejaneiro_onibus_gps/tasks.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index e02e3c06b..cb859180a 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -265,9 +265,15 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: end_date (str): End date in format YYYY-MM-DD Returns: - timestamps (list): List of timestamps in format YYYY-MM-DD HH:MM:SS + timestamps (list): List of dictionaries containing the timestamps + to recapture in format YYYY-MM-DD HH:MM:SS. """ + log( + f"Getting timestamps to recapture between {start_date} and {end_date}", + level="info", + ) + timestamps = [] dates = pd.date_range(start=start_date, end=end_date) @@ -279,10 +285,9 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: for date in dates: # horas = range(0,24) - horas = [0] + horas = [7] for hora in horas: - # minutos = range(0,60,10) - minutos = [0] + minutos = range(0, 60, 10) for minuto in minutos: prefix = f"""raw/ {dataset_id}/ @@ -291,7 +296,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: hora={hora:02}/ {date}-{hora:02}-{minuto:02}-00.json""" blobs_list = list_blobs_with_prefix( - bucket_name="rj-smtr-dev", prefix=prefix, mode="staging" + bucket_name="rj-smtr-staging", prefix=prefix, mode="staging" ) if len(blobs_list) == 0: @@ -299,7 +304,10 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: log( f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n - {timestamps}""" + {timestamps}""", + level="info", ) + timestamps = [{"timestamp": d} for d in timestamps] + return timestamps From bd0e644c1ae83edbf38fae1c65105c1b7513dc5a Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 15:20:24 -0300 Subject: [PATCH 10/43] update realocacao_sppo_recaptura --- .../rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 769b1ed20..e70ad6b5e 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -159,16 +159,18 @@ GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( flow_name=unmapped(realocacao_sppo.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped( + "staging" + ), # unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), run_name=unmapped(realocacao_sppo.name), parameters=timestamps, ) - wait_for_flow_run( + wait_for_flow_run.map( GPS_SPPO_REALOCACAO_RUN, - stream_states=True, - stream_logs=True, - raise_final_state=True, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), ) realocacao_sppo_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) From 4ed6ffdbf8cc6422b92acdd940935ab5c8861456 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 15:21:33 -0300 Subject: [PATCH 11/43] update logs from recapture_timestamps task --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index cb859180a..b08b47368 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -295,10 +295,15 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: data={date}/ hora={hora:02}/ {date}-{hora:02}-{minuto:02}-00.json""" + + log(f"Getting blobs with prefix: {prefix}", level="info") + blobs_list = list_blobs_with_prefix( bucket_name="rj-smtr-staging", prefix=prefix, mode="staging" ) + log(f"Found {len(blobs_list)} blobs", level="info") + if len(blobs_list) == 0: timestamps.append(f"{date} {hora:02}:{minuto:02}:00") From b5d5c9bb38a2e91cac36d14a3eec65f050985622 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 18:03:12 -0300 Subject: [PATCH 12/43] update query_logs + get_current_timestamp --- pipelines/rj_smtr/tasks.py | 104 ++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 6a5481754..d17a5c225 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -136,14 +136,14 @@ def build_incremental_model( # pylint: disable=too-many-arguments @task -def get_current_timestamp( - timestamp: datetime = None, truncate_minute: bool = True -) -> datetime: +def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime: """ Get current timestamp for flow run. """ if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) + if isinstance(timestamp) is str: + timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") if truncate_minute: return timestamp.replace(second=0, microsecond=0) return timestamp @@ -276,7 +276,11 @@ def save_treated_local(file_path: str, status: dict, mode: str = "staging") -> s ############### @task(nout=3) def query_logs( - dataset_id: str, table_id: str, datetime_filter=None, max_recaptures: int = 60 + dataset_id: str, + table_id: str, + datetime_filter=None, + max_recaptures: int = 60, + interval_minutes: int = 1, ): """ Queries capture logs to check for errors @@ -287,9 +291,11 @@ def query_logs( datetime_filter (pendulum.datetime.DateTime, optional): filter passed to query. This task will query the logs table for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture Returns: - list: containing timestamps for which the capture failed + lists: errors, timestamps, previous_errors """ if not datetime_filter: @@ -298,49 +304,61 @@ def query_logs( ) query = f""" - with t as ( - select - datetime(timestamp_array) as timestamp_array - from - unnest(GENERATE_TIMESTAMP_ARRAY( - timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day), - timestamp('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), - interval 1 minute) - ) as timestamp_array - where timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' + WITH + t AS ( + SELECT + DATETIME(timestamp_array) AS timestamp_array + FROM + UNNEST( + GENERATE_TIMESTAMP_ARRAY( + TIMESTAMP_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day), + TIMESTAMP('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), + INTERVAL {interval_minutes} minute) ) + AS timestamp_array + WHERE + timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' ), + logs_table AS ( + SELECT + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), + "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(sucesso AS BOOLEAN) sucesso, + SAFE_CAST(erro AS STRING) erro, + SAFE_CAST(DATA AS DATE) DATA + FROM + rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t ), - logs as ( - select - *, - timestamp_trunc(timestamp_captura, minute) as timestamp_array - from - rj-smtr.{dataset_id}.{table_id}_logs - where - data between - date(datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', - interval 1 day)) - and date('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') - and - timestamp_captura between - datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day) - and '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - order by timestamp_captura - ) - select - case - when logs.timestamp_captura is not null then logs.timestamp_captura - else t.timestamp_array - end as timestamp_captura, + logs AS ( + SELECT + *, + TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array + FROM + logs_table + WHERE + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', + INTERVAL 1 day)) + AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') + AND timestamp_captura BETWEEN + DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) + AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' + ORDER BY + timestamp_captura ) + SELECT + CASE + WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura + ELSE + t.timestamp_array + END + AS timestamp_captura, logs.erro - from + FROM t - left join + LEFT JOIN logs - on + ON logs.timestamp_array = t.timestamp_array - where - logs.sucesso is not True - order by + WHERE + logs.sucesso IS NOT TRUE + ORDER BY timestamp_captura """ log(f"Run query to check logs:\n{query}") From 947cf7de38dec3c5ed1b0a692021359842a365c2 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 18:04:15 -0300 Subject: [PATCH 13/43] update realocacao_sppo flow --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index e70ad6b5e..93bfbbd7e 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -62,8 +62,8 @@ # Flows # with Flow( - "SMTR: GPS SPPO - Realocação (captura)", - code_owners=["caio", "fernanda", "boris", "rodrigo"], + "[Teste] SMTR: GPS SPPO - Realocação (captura)", + code_owners=["rodrigo"], # ["caio", "fernanda", "boris", "rodrigo"], ) as realocacao_sppo: # SETUP # @@ -81,6 +81,8 @@ ) rebuild = Parameter("rebuild", False) timestamp_param = Parameter("timestamp", None) + recapture = Parameter("recapture", False) + previous_error = Parameter("previous_error", None) # SETUP timestamp_cond = check_param(timestamp_param) @@ -137,6 +139,8 @@ parent_table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, error=error, timestamp=timestamp, + previous_error=previous_error, + recapture=recapture, ) realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) From 027cfe35c957f531a3e55bbfa31bad825dbdbf27 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 18:05:51 -0300 Subject: [PATCH 14/43] update get_realocacao_recapture_timestamps task --- .../br_rj_riodejaneiro_onibus_gps/tasks.py | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index b08b47368..f5c556ebe 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -12,11 +12,12 @@ # EMD Imports # -from pipelines.utils.utils import log, get_vault_secret, list_blobs_with_prefix +from pipelines.utils.utils import log, get_vault_secret # SMTR Imports # from pipelines.rj_smtr.constants import constants +from pipelines.rj_smtr.tasks import query_logs, get_current_timestamp # Tasks # @@ -265,8 +266,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: end_date (str): End date in format YYYY-MM-DD Returns: - timestamps (list): List of dictionaries containing the timestamps - to recapture in format YYYY-MM-DD HH:MM:SS. + List: List of dicts containing the timestamps to recapture """ log( @@ -274,38 +274,27 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: level="info", ) + errors = [] timestamps = [] + previous_errors = [] dates = pd.date_range(start=start_date, end=end_date) - dataset_id = constants.GPS_SPPO_RAW_DATASET_ID.value - table_id = constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value - dates = dates.strftime("%Y-%m-%d").to_list() for date in dates: - # horas = range(0,24) - horas = [7] - for hora in horas: - minutos = range(0, 60, 10) - for minuto in minutos: - prefix = f"""raw/ - {dataset_id}/ - {table_id}/ - data={date}/ - hora={hora:02}/ - {date}-{hora:02}-{minuto:02}-00.json""" - - log(f"Getting blobs with prefix: {prefix}", level="info") - - blobs_list = list_blobs_with_prefix( - bucket_name="rj-smtr-staging", prefix=prefix, mode="staging" - ) - - log(f"Found {len(blobs_list)} blobs", level="info") + datetime_filter = get_current_timestamp.run(f"{date} 00:00:00") + errors_temp, timestamps_temp, previous_errors_temp = query_logs.run( + dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value, + table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, + datetime_filter=datetime_filter, + max_recaptures=2 ^ 63 - 1, + interval_minutes=10, + ) - if len(blobs_list) == 0: - timestamps.append(f"{date} {hora:02}:{minuto:02}:00") + errors = errors + errors_temp + timestamps = timestamps + timestamps_temp + previous_errors = previous_errors + previous_errors_temp log( f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n @@ -313,6 +302,21 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: level="info", ) - timestamps = [{"timestamp": d} for d in timestamps] + combined_data = [] + + for error, timestamp, previous_error in zip(errors, timestamps, previous_errors): + data = { + "error": error, + "timestamp": timestamp, + "previous_error": previous_error, + } + + combined_data.append(data) + + log( + f"""Combined data: \n + {combined_data}""", + level="info", + ) - return timestamps + return combined_data From 9977408677711eef4b553a2ea6e4ffa884a3366d Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 19:15:31 -0300 Subject: [PATCH 15/43] update get_current_timestamp --- pipelines/rj_smtr/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index d17a5c225..e36f21994 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -142,8 +142,10 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet """ if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) - if isinstance(timestamp) is str: - timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + if isinstance(timestamp, str): + timestamp = timezone(constants.TIMEZONE.value).localize( + datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + ) if truncate_minute: return timestamp.replace(second=0, microsecond=0) return timestamp From 3137f946251d233d7acd966455ef9f3e3b12d81e Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 19:33:12 -0300 Subject: [PATCH 16/43] fix get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index f5c556ebe..3d63078ca 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -292,7 +292,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: interval_minutes=10, ) - errors = errors + errors_temp + errors = errors + [errors_temp] timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp From 9cf4211761fbabbaf4cb089a3ef092408f9174d4 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 19:36:53 -0300 Subject: [PATCH 17/43] fix get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 3d63078ca..f196e3956 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -292,7 +292,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: interval_minutes=10, ) - errors = errors + [errors_temp] + errors = errors + [errors_temp] * len(timestamps_temp) timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp From 2f6f0014f131567161d622fbcd0f610ccf993d2d Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 19:37:14 -0300 Subject: [PATCH 18/43] fix get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index f196e3956..5414af433 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -292,7 +292,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: interval_minutes=10, ) - errors = errors + [errors_temp] * len(timestamps_temp) + errors = errors + ([errors_temp] * len(timestamps_temp)) timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp From 5eaf4a5a2433243feb23a1bc38e4e7704b49ba61 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 19:54:51 -0300 Subject: [PATCH 19/43] fix get_realocacao_recapture_timestamps --- .../rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 5414af433..4498583f2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -292,10 +292,19 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: interval_minutes=10, ) + if timestamps_temp == []: + log( + f"""From {date}, there are no recapture timestamps""", + level="info", + ) + continue + errors = errors + ([errors_temp] * len(timestamps_temp)) timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp + timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S%z") for timestamp in timestamps] + log( f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n {timestamps}""", @@ -309,6 +318,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: "error": error, "timestamp": timestamp, "previous_error": previous_error, + "recapture": True, } combined_data.append(data) From afc3c0c329ef5bdffad10c82838218815133bdd5 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:06:07 -0300 Subject: [PATCH 20/43] fix get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 4498583f2..2fd95ead9 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -303,7 +303,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp - timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S%z") for timestamp in timestamps] + timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S") for timestamp in timestamps] log( f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n From 60d800a11f86d98a84dc49cc9b873061bbd0f1cc Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:26:10 -0300 Subject: [PATCH 21/43] fix query_logs --- pipelines/rj_smtr/tasks.py | 54 +++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index e36f21994..05e98df65 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -321,37 +321,37 @@ def query_logs( timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' ), logs_table AS ( SELECT - SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), - "America/Sao_Paulo") AS DATETIME) timestamp_captura, - SAFE_CAST(sucesso AS BOOLEAN) sucesso, - SAFE_CAST(erro AS STRING) erro, - SAFE_CAST(DATA AS DATE) DATA + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), + "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(sucesso AS BOOLEAN) sucesso, + SAFE_CAST(erro AS STRING) erro, + SAFE_CAST(DATA AS DATE) DATA FROM - rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t + rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t ), logs AS ( + SELECT + *, + TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array + FROM + logs_table + WHERE + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', + INTERVAL 1 day)) + AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') + AND timestamp_captura BETWEEN + DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) + AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' + ORDER BY + timestamp_captura ) SELECT - *, - TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array - FROM - logs_table - WHERE - DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', - INTERVAL 1 day)) - AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') - AND timestamp_captura BETWEEN - DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) - AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - ORDER BY - timestamp_captura ) - SELECT - CASE - WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura - ELSE - t.timestamp_array - END - AS timestamp_captura, - logs.erro + CASE + WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura + ELSE + t.timestamp_array + END + AS timestamp_captura, + logs.erro FROM t LEFT JOIN From ee9552a05e4236217c7af948ffa9e211563128d5 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:26:59 -0300 Subject: [PATCH 22/43] Change agent from realocacao_sppo + recaptura --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 93bfbbd7e..7eff07f78 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -146,7 +146,7 @@ realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) realocacao_sppo.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) realocacao_sppo.schedule = every_10_minutes @@ -180,7 +180,7 @@ realocacao_sppo_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) realocacao_sppo_recaptura.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) From c1ee1448fcaa7430aaa8fa6a04dc45f0262f87a2 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:28:06 -0300 Subject: [PATCH 23/43] change agent subsidio_sppo_apuracao for testing --- pipelines/rj_smtr/projeto_subsidio_sppo/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index ea480391e..4e9549a30 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -233,6 +233,6 @@ subsidio_sppo_apuracao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) subsidio_sppo_apuracao.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value] + image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_DEV_AGENT_LABEL.value] ) subsidio_sppo_apuracao.schedule = every_day_hour_five From b2b698601daef22fd8fb68fc4940a67862b4e660 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:54:22 -0300 Subject: [PATCH 24/43] fix query_logs --- pipelines/rj_smtr/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 05e98df65..65084cd85 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -297,7 +297,9 @@ def query_logs( interval_minutes (int, optional): interval in minutes between each recapture Returns: - lists: errors, timestamps, previous_errors + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) """ if not datetime_filter: From 4755d0d2af7ced1bff073d602a34b58726b5ea92 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:55:45 -0300 Subject: [PATCH 25/43] revert name + agent sppo_recaptura + realocacao --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 7eff07f78..3175d89a3 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -62,8 +62,8 @@ # Flows # with Flow( - "[Teste] SMTR: GPS SPPO - Realocação (captura)", - code_owners=["rodrigo"], # ["caio", "fernanda", "boris", "rodrigo"], + "SMTR: GPS SPPO - Realocação (captura)", + code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as realocacao_sppo: # SETUP # @@ -150,10 +150,10 @@ ) realocacao_sppo.schedule = every_10_minutes -REALOCACAO_SPPO_RECAPTURA_NAME = "[Teste] SMTR: GPS SPPO - Realocação (recaptura)" +REALOCACAO_SPPO_RECAPTURA_NAME = "SMTR: GPS SPPO - Realocação (recaptura)" with Flow( REALOCACAO_SPPO_RECAPTURA_NAME, - code_owners=["rodrigo"], + code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as realocacao_sppo_recaptura: start_date = Parameter("start_date", default="") From 6eb9097abdf0ee762a75f22fe99d4a52bf43d636 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 20:56:13 -0300 Subject: [PATCH 26/43] revert agent subsidio_sppo_apuracao --- pipelines/rj_smtr/projeto_subsidio_sppo/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index 4e9549a30..ea480391e 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -233,6 +233,6 @@ subsidio_sppo_apuracao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) subsidio_sppo_apuracao.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_DEV_AGENT_LABEL.value] + image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value] ) subsidio_sppo_apuracao.schedule = every_day_hour_five From b251da966fd59df5719e53a07dc4f05d741a003b Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 21:11:34 -0300 Subject: [PATCH 27/43] fix datetime filter > current timestamp --- .../br_rj_riodejaneiro_onibus_gps/tasks.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 2fd95ead9..95880057b 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -277,6 +277,7 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: errors = [] timestamps = [] previous_errors = [] + flag_break = False dates = pd.date_range(start=start_date, end=end_date) @@ -284,6 +285,16 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: for date in dates: datetime_filter = get_current_timestamp.run(f"{date} 00:00:00") + + if datetime_filter > get_current_timestamp.run(): + flag_break = True + datetime_filter = get_current_timestamp.run() + log( + """Datetime filter is greater than current timestamp, + using current timestamp instead""", + level="warning", + ) + errors_temp, timestamps_temp, previous_errors_temp = query_logs.run( dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value, table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, @@ -303,6 +314,13 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: timestamps = timestamps + timestamps_temp previous_errors = previous_errors + previous_errors_temp + if flag_break: + log( + "Breaking loop because datetime filter is greater than current timestamp", + level="warning", + ) + break + timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S") for timestamp in timestamps] log( From 828592cc22b26b7d2328c7383942ba2e8f68a4ee Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 21:34:17 -0300 Subject: [PATCH 28/43] fix get_realocacao_recapture_timestamps round --- .../rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index 95880057b..c30e47b5f 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -286,9 +286,17 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: for date in dates: datetime_filter = get_current_timestamp.run(f"{date} 00:00:00") - if datetime_filter > get_current_timestamp.run(): + current_timestamp = get_current_timestamp.run() + + if datetime_filter > current_timestamp: flag_break = True - datetime_filter = get_current_timestamp.run() + + # Round down to the nearest 10 minutes + current_timestamp = current_timestamp.replace( + second=0, microsecond=0, minute=((current_timestamp.minute // 10) * 10) + ) + + datetime_filter = current_timestamp log( """Datetime filter is greater than current timestamp, using current timestamp instead""", From 9d336e126408e5750c5512044696812d7ce0c59f Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Fri, 4 Aug 2023 21:36:17 -0300 Subject: [PATCH 29/43] update log in get_realocacao_recapture_timestamps --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index c30e47b5f..f19c5818b 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -298,8 +298,8 @@ def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: datetime_filter = current_timestamp log( - """Datetime filter is greater than current timestamp, - using current timestamp instead""", + f"""Datetime filter is greater than current timestamp, + using current timestamp instead ({datetime_filter})""", level="warning", ) From c1dbd7a6ead99a00ef91dce0493cdfc5f2e8b664 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Sat, 5 Aug 2023 00:36:28 -0300 Subject: [PATCH 30/43] add get_project_name function --- pipelines/rj_smtr/utils.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 0cb52ed93..e207b70c8 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -383,3 +383,10 @@ def data_info_str(data: pd.DataFrame): buffer = io.StringIO() data.info(buf=buffer) return buffer.getvalue() + + +def get_project_name(project_name: str): + if project_name == "prod": + return constants.PREFECT_DEFAULT_PROJECT.value + else: + return "staging" From b386126afea142db5d603832da726a5e92e96b61 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Sat, 5 Aug 2023 00:37:41 -0300 Subject: [PATCH 31/43] update get_project_name --- pipelines/rj_smtr/utils.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index e207b70c8..8bae36809 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -385,8 +385,18 @@ def data_info_str(data: pd.DataFrame): return buffer.getvalue() -def get_project_name(project_name: str): - if project_name == "prod": +def get_project_name(mode: str): + """ + Get project name based on mode + + Args: + mode (str): mode + + Returns: + str: project name + """ + + if mode == "prod": return constants.PREFECT_DEFAULT_PROJECT.value else: return "staging" From a25b8efcff5c5bfe7ee56ac3d2e0c01a6e05927f Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Sat, 5 Aug 2023 00:38:14 -0300 Subject: [PATCH 32/43] add get_project_name to realocacao_sppo_recaptura --- .../rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 3175d89a3..cb15fe3bd 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -44,6 +44,7 @@ bq_upload, check_param, ) + from pipelines.rj_smtr.br_rj_riodejaneiro_onibus_gps.tasks import ( pre_treatment_br_rj_riodejaneiro_onibus_gps, create_api_url_onibus_gps, @@ -57,8 +58,11 @@ every_minute, every_10_minutes, ) + from pipelines.utils.execute_dbt_model.tasks import run_dbt_model +from pipelines.rj_smtr.utils import get_project_name + # Flows # with Flow( @@ -159,13 +163,15 @@ start_date = Parameter("start_date", default="") end_date = Parameter("end_date", default="") + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + PROJECT_NAME = get_project_name(MODE) + timestamps = get_realocacao_recapture_timestamps(start_date, end_date) GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( flow_name=unmapped(realocacao_sppo.name), - project_name=unmapped( - "staging" - ), # unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped(PROJECT_NAME), run_name=unmapped(realocacao_sppo.name), parameters=timestamps, ) From 1a5f3c6fbd8e8a32cac72a747674f9a1ccd865ff Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:08:26 +0000 Subject: [PATCH 33/43] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index debde2756..11db4ccd3 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -158,7 +158,6 @@ REALOCACAO_SPPO_RECAPTURA_NAME, code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as realocacao_sppo_recaptura: - start_date = Parameter("start_date", default="") end_date = Parameter("end_date", default="") From 4f764ce97c65882edb958770fe63b3b4d264b65a Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 5 Sep 2023 17:43:53 -0300 Subject: [PATCH 34/43] change get_recapture_timestamps task --- .../br_rj_riodejaneiro_onibus_gps/tasks.py | 104 +-------- pipelines/rj_smtr/tasks.py | 218 ++++++++++-------- 2 files changed, 125 insertions(+), 197 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py index f19c5818b..c562ae1f3 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py @@ -5,7 +5,7 @@ import traceback from datetime import datetime, timedelta -from typing import Dict, List +from typing import Dict import pandas as pd from prefect import task import pendulum @@ -17,7 +17,6 @@ # SMTR Imports # from pipelines.rj_smtr.constants import constants -from pipelines.rj_smtr.tasks import query_logs, get_current_timestamp # Tasks # @@ -255,104 +254,3 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps( log(f"[CATCHED] Task failed with error: \n{error}", level="error") return {"data": df_gps, "error": error} - - -@task -def get_realocacao_recapture_timestamps(start_date: str, end_date: str) -> List: - """Get timestamps with no file in realocacao to recapture it in a given date range. - - Args: - start_date (str): Start date in format YYYY-MM-DD - end_date (str): End date in format YYYY-MM-DD - - Returns: - List: List of dicts containing the timestamps to recapture - """ - - log( - f"Getting timestamps to recapture between {start_date} and {end_date}", - level="info", - ) - - errors = [] - timestamps = [] - previous_errors = [] - flag_break = False - - dates = pd.date_range(start=start_date, end=end_date) - - dates = dates.strftime("%Y-%m-%d").to_list() - - for date in dates: - datetime_filter = get_current_timestamp.run(f"{date} 00:00:00") - - current_timestamp = get_current_timestamp.run() - - if datetime_filter > current_timestamp: - flag_break = True - - # Round down to the nearest 10 minutes - current_timestamp = current_timestamp.replace( - second=0, microsecond=0, minute=((current_timestamp.minute // 10) * 10) - ) - - datetime_filter = current_timestamp - log( - f"""Datetime filter is greater than current timestamp, - using current timestamp instead ({datetime_filter})""", - level="warning", - ) - - errors_temp, timestamps_temp, previous_errors_temp = query_logs.run( - dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value, - table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, - datetime_filter=datetime_filter, - max_recaptures=2 ^ 63 - 1, - interval_minutes=10, - ) - - if timestamps_temp == []: - log( - f"""From {date}, there are no recapture timestamps""", - level="info", - ) - continue - - errors = errors + ([errors_temp] * len(timestamps_temp)) - timestamps = timestamps + timestamps_temp - previous_errors = previous_errors + previous_errors_temp - - if flag_break: - log( - "Breaking loop because datetime filter is greater than current timestamp", - level="warning", - ) - break - - timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S") for timestamp in timestamps] - - log( - f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n - {timestamps}""", - level="info", - ) - - combined_data = [] - - for error, timestamp, previous_error in zip(errors, timestamps, previous_errors): - data = { - "error": error, - "timestamp": timestamp, - "previous_error": previous_error, - "recapture": True, - } - - combined_data.append(data) - - log( - f"""Combined data: \n - {combined_data}""", - level="info", - ) - - return combined_data diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 4225d8e89..84c0efec8 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -26,7 +26,7 @@ bq_project, get_table_min_max_value, get_last_run_timestamp, - log_critical, + query_logs_func, ) from pipelines.utils.execute_dbt_model.utils import get_dbt_client from pipelines.utils.utils import log, get_redis_client, get_vault_secret @@ -302,99 +302,13 @@ def query_logs( previous_errors (list of previous errors) """ - if not datetime_filter: - datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( - second=0, microsecond=0 - ) - elif isinstance(datetime_filter, str): - datetime_filter = datetime.fromisoformat(datetime_filter).replace( - second=0, microsecond=0 - ) - - query = f""" - WITH - t AS ( - SELECT - DATETIME(timestamp_array) AS timestamp_array - FROM - UNNEST( - GENERATE_TIMESTAMP_ARRAY( - TIMESTAMP_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day), - TIMESTAMP('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), - INTERVAL {interval_minutes} minute) ) - AS timestamp_array - WHERE - timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' ), - logs_table AS ( - SELECT - SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), - "America/Sao_Paulo") AS DATETIME) timestamp_captura, - SAFE_CAST(sucesso AS BOOLEAN) sucesso, - SAFE_CAST(erro AS STRING) erro, - SAFE_CAST(DATA AS DATE) DATA - FROM - rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t - ), - logs AS ( - SELECT - *, - TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array - FROM - logs_table - WHERE - DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', - INTERVAL 1 day)) - AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') - AND timestamp_captura BETWEEN - DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) - AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - ORDER BY - timestamp_captura ) - SELECT - CASE - WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura - ELSE - t.timestamp_array - END - AS timestamp_captura, - logs.erro - FROM - t - LEFT JOIN - logs - ON - logs.timestamp_array = t.timestamp_array - WHERE - logs.sucesso IS NOT TRUE - ORDER BY - timestamp_captura - """ - log(f"Run query to check logs:\n{query}") - results = bd.read_sql(query=query, billing_project_id=bq_project()) - if len(results) > 0: - results["timestamp_captura"] = ( - pd.to_datetime(results["timestamp_captura"]) - .dt.tz_localize(constants.TIMEZONE.value) - .to_list() - ) - log(f"Recapture data for the following {len(results)} timestamps:\n{results}") - if len(results) > max_recaptures: - message = f""" - [SPPO - Recaptures] - Encontradas {len(results)} timestamps para serem recapturadas. - Essa run processará as seguintes: - ##### - {results[:max_recaptures]} - ##### - Sobraram as seguintes para serem recapturadas na próxima run: - ##### - {results[max_recaptures:]} - ##### - """ - log_critical(message) - results = results[:max_recaptures] - return True, results["timestamp_captura"].to_list(), results["erro"].to_list() - return False, [], [] + return query_logs_func( + dataset_id=dataset_id, + table_id=table_id, + datetime_filter=datetime_filter, + max_recaptures=max_recaptures, + interval_minutes=interval_minutes, + ) @task @@ -821,3 +735,119 @@ def check_param(param: str) -> bool: Check if param is None """ return param is None + + +@task +def get_recapture_timestamps( + current_timestamp: datetime, + start_date: str, + end_date: str, + dataset_id: str, + table_id: str, + interval_minutes: int = 10, +) -> List: + """Get timestamps with no file to recapture it in a given date range. + + Args: + current_timestamp (datetime): Current timestamp + start_date (str): Start date in format YYYY-MM-DD + end_date (str): End date in format YYYY-MM-DD + dataset_id (str): Dataset id + table_id (str): Table id + interval_minutes (int, optional): Interval in minutes to recapture. Defaults to 10. + + Returns: + List: List of dicts containing the timestamps to recapture + """ + + log( + f"Getting timestamps to recapture between {start_date} and {end_date}", + level="info", + ) + + errors = [] + timestamps = [] + previous_errors = [] + flag_break = False + + dates = pd.date_range(start=start_date, end=end_date) + + dates = dates.strftime("%Y-%m-%d").to_list() + + for date in dates: + datetime_filter = datetime.now(tz=timezone(constants.TIMEZONE.value)).replace( + second=0, microsecond=0 + ) + + if datetime_filter > current_timestamp: + flag_break = True + + # Round down to the nearest interval_minutes minutes + current_timestamp = current_timestamp.replace( + second=0, + microsecond=0, + minute=( + (current_timestamp.minute // interval_minutes) * interval_minutes + ), + ) + + datetime_filter = current_timestamp + log( + f"""Datetime filter is greater than current timestamp, + using current timestamp instead ({datetime_filter})""", + level="warning", + ) + + errors_temp, timestamps_temp, previous_errors_temp = query_logs_func( + dataset_id=dataset_id, + table_id=table_id, + datetime_filter=datetime_filter, + max_recaptures=2 ^ 63 - 1, + interval_minutes=interval_minutes, + ) + + if timestamps_temp == []: + log( + f"""From {date}, there are no recapture timestamps""", + level="info", + ) + continue + + errors = errors + ([errors_temp] * len(timestamps_temp)) + timestamps = timestamps + timestamps_temp + previous_errors = previous_errors + previous_errors_temp + + if flag_break: + log( + "Breaking loop because datetime filter is greater than current timestamp", + level="warning", + ) + break + + timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S") for timestamp in timestamps] + + log( + f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n + {timestamps}""", + level="info", + ) + + combined_data = [] + + for error, timestamp, previous_error in zip(errors, timestamps, previous_errors): + data = { + "error": error, + "timestamp": timestamp, + "previous_error": previous_error, + "recapture": True, + } + + combined_data.append(data) + + log( + f"""Combined data: \n + {combined_data}""", + level="info", + ) + + return combined_data From 2d1cafbb1bb01bf693da1f897f2c97b2a704e809 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 5 Sep 2023 17:45:09 -0300 Subject: [PATCH 35/43] add function query_logs_func in utils --- pipelines/rj_smtr/utils.py | 122 +++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index bf7831899..386ad9c90 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -10,6 +10,8 @@ import basedosdados as bd from basedosdados import Table import pandas as pd +import pendulum +from datetime import datetime from pipelines.rj_smtr.implicit_ftp import ImplicitFtpTls from pipelines.utils.utils import log @@ -402,3 +404,123 @@ def get_project_name(mode: str): return constants.PREFECT_DEFAULT_PROJECT.value else: return "staging" + + +def query_logs_func( + dataset_id: str, + table_id: str, + datetime_filter=None, + max_recaptures: int = 60, + interval_minutes: int = 1, +): + """ + Queries capture logs to check for errors + + Args: + dataset_id (str): dataset_id on BigQuery + table_id (str): table_id on BigQuery + datetime_filter (pendulum.datetime.DateTime, optional): + filter passed to query. This task will query the logs table + for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture + + Returns: + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) + """ + + if not datetime_filter: + datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( + second=0, microsecond=0 + ) + elif isinstance(datetime_filter, str): + datetime_filter = datetime.fromisoformat(datetime_filter).replace( + second=0, microsecond=0 + ) + + query = f""" + WITH + t AS ( + SELECT + DATETIME(timestamp_array) AS timestamp_array + FROM + UNNEST( + GENERATE_TIMESTAMP_ARRAY( + TIMESTAMP_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day), + TIMESTAMP('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), + INTERVAL {interval_minutes} minute) ) + AS timestamp_array + WHERE + timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' ), + logs_table AS ( + SELECT + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), + "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(sucesso AS BOOLEAN) sucesso, + SAFE_CAST(erro AS STRING) erro, + SAFE_CAST(DATA AS DATE) DATA + FROM + rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t + ), + logs AS ( + SELECT + *, + TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array + FROM + logs_table + WHERE + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', + INTERVAL 1 day)) + AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') + AND timestamp_captura BETWEEN + DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) + AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' + ORDER BY + timestamp_captura ) + SELECT + CASE + WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura + ELSE + t.timestamp_array + END + AS timestamp_captura, + logs.erro + FROM + t + LEFT JOIN + logs + ON + logs.timestamp_array = t.timestamp_array + WHERE + logs.sucesso IS NOT TRUE + ORDER BY + timestamp_captura + """ + log(f"Run query to check logs:\n{query}") + results = bd.read_sql(query=query, billing_project_id=bq_project()) + if len(results) > 0: + results["timestamp_captura"] = ( + pd.to_datetime(results["timestamp_captura"]) + .dt.tz_localize(constants.TIMEZONE.value) + .to_list() + ) + log(f"Recapture data for the following {len(results)} timestamps:\n{results}") + if len(results) > max_recaptures: + message = f""" + [SPPO - Recaptures] + Encontradas {len(results)} timestamps para serem recapturadas. + Essa run processará as seguintes: + ##### + {results[:max_recaptures]} + ##### + Sobraram as seguintes para serem recapturadas na próxima run: + ##### + {results[max_recaptures:]} + ##### + """ + log_critical(message) + results = results[:max_recaptures] + return True, results["timestamp_captura"].to_list(), results["erro"].to_list() + return False, [], [] From 883a13fb65bc7eeee07914bfdf7305dbcd2eb3f6 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 5 Sep 2023 17:46:28 -0300 Subject: [PATCH 36/43] update realocacao_sppo_recaptura flow params --- .../rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 11db4ccd3..834c84882 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -43,6 +43,7 @@ upload_logs_to_bq, bq_upload, check_param, + get_recapture_timestamps, ) from pipelines.rj_smtr.br_rj_riodejaneiro_onibus_gps.tasks import ( @@ -50,7 +51,6 @@ create_api_url_onibus_gps, create_api_url_onibus_realocacao, pre_treatment_br_rj_riodejaneiro_onibus_realocacao, - get_realocacao_recapture_timestamps, ) from pipelines.rj_smtr.schedules import ( @@ -165,7 +165,15 @@ MODE = get_current_flow_mode(LABELS) PROJECT_NAME = get_project_name(MODE) - timestamps = get_realocacao_recapture_timestamps(start_date, end_date) + current_timestamp = get_current_timestamp() + + timestamps = get_recapture_timestamps( + current_timestamp=current_timestamp, + start_date=start_date, + end_date=end_date, + dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value, + table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, + ) GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( flow_name=unmapped(realocacao_sppo.name), From 67dffe91f786d8b8fd4a6a884359426c60185479 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 10 Oct 2023 05:00:13 +0000 Subject: [PATCH 37/43] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/rj_smtr/tasks.py | 1 + pipelines/rj_smtr/utils.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 083f38af2..8fa352642 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -1066,6 +1066,7 @@ def get_recapture_timestamps( return combined_data + ############### # # Pretreat data diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 16f43baf2..ecfc4fb24 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -543,6 +543,7 @@ def query_logs_func( return True, results["timestamp_captura"].to_list(), results["erro"].to_list() return False, [], [] + def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-locals clock_interval: timedelta, labels: List[str], @@ -899,4 +900,3 @@ def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFra log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error, data - From d535a88c0e81f4ff59a6bbd76b7741ec3c69bb44 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 19:47:00 -0300 Subject: [PATCH 38/43] adiciona arredondamento de filtro de datahora --- pipelines/rj_smtr/tasks.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 8fa352642..b05ab84e3 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -993,6 +993,11 @@ def get_recapture_timestamps( second=0, microsecond=0 ) + # Round down to the nearest interval_minutes minutes + datetime_filter = datetime_filter.replace( + minute=(datetime_filter.minute // interval_minutes) * interval_minutes + ) + if datetime_filter > current_timestamp: flag_break = True From e178231738f97c125d686460e77aa6d2e87083b6 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 19:47:56 -0300 Subject: [PATCH 39/43] DRY query_logs_func --- pipelines/rj_smtr/utils.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index ecfc4fb24..050cf97e9 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -458,6 +458,8 @@ def query_logs_func( second=0, microsecond=0 ) + datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") + query = f""" WITH t AS ( @@ -466,12 +468,12 @@ def query_logs_func( FROM UNNEST( GENERATE_TIMESTAMP_ARRAY( - TIMESTAMP_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day), - TIMESTAMP('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), + TIMESTAMP_SUB('{datetime_filter}', INTERVAL 1 day), + TIMESTAMP('{datetime_filter}'), INTERVAL {interval_minutes} minute) ) AS timestamp_array WHERE - timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' ), + timestamp_array < '{datetime_filter}' ), logs_table AS ( SELECT SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), @@ -489,12 +491,12 @@ def query_logs_func( FROM logs_table WHERE - DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}', INTERVAL 1 day)) - AND DATE('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') + AND DATE('{datetime_filter}') AND timestamp_captura BETWEEN - DATETIME_SUB('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', INTERVAL 1 day) - AND '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' + DATETIME_SUB('{datetime_filter}', INTERVAL 1 day) + AND '{datetime_filter}' ORDER BY timestamp_captura ) SELECT From 2dd01580c34bf38c82527148f2b2371eeef63e38 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 22:05:22 -0300 Subject: [PATCH 40/43] atualiza filtro data --- pipelines/rj_smtr/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index b05ab84e3..6928ff841 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -989,7 +989,7 @@ def get_recapture_timestamps( dates = dates.strftime("%Y-%m-%d").to_list() for date in dates: - datetime_filter = datetime.now(tz=timezone(constants.TIMEZONE.value)).replace( + datetime_filter = datetime.fromisoformat(f"{date} 23:59:00").replace( second=0, microsecond=0 ) @@ -998,6 +998,8 @@ def get_recapture_timestamps( minute=(datetime_filter.minute // interval_minutes) * interval_minutes ) + datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") + if datetime_filter > current_timestamp: flag_break = True From b9e5a903e711dd85b4f2ceebbe0b038191b2654f Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 22:22:29 -0300 Subject: [PATCH 41/43] =?UTF-8?q?corrige=20convers=C3=A3o=20datahora?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_smtr/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 6928ff841..f9ec2ab86 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -998,8 +998,6 @@ def get_recapture_timestamps( minute=(datetime_filter.minute // interval_minutes) * interval_minutes ) - datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") - if datetime_filter > current_timestamp: flag_break = True From 2cbd647106996abf1b011f532986964b2dbcf2e8 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 22:47:22 -0300 Subject: [PATCH 42/43] atualiza code_owners --- pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 834c84882..abb8e9ff6 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -156,7 +156,7 @@ REALOCACAO_SPPO_RECAPTURA_NAME = "SMTR: GPS SPPO - Realocação (recaptura)" with Flow( REALOCACAO_SPPO_RECAPTURA_NAME, - code_owners=["caio", "fernanda", "boris", "rodrigo"], + code_owners=["rodrigo"], # "caio", "fernanda", "boris"], ) as realocacao_sppo_recaptura: start_date = Parameter("start_date", default="") end_date = Parameter("end_date", default="") From ff669bb6e8619d8067123afffc560427ba598b9e Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha Date: Tue, 10 Oct 2023 22:47:47 -0300 Subject: [PATCH 43/43] localiza filtro datahora --- pipelines/rj_smtr/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index f9ec2ab86..87a4ea7f3 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -998,6 +998,8 @@ def get_recapture_timestamps( minute=(datetime_filter.minute // interval_minutes) * interval_minutes ) + datetime_filter = timezone(constants.TIMEZONE.value).localize(datetime_filter) + if datetime_filter > current_timestamp: flag_break = True