diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 1b11e5dd0..abb8e9ff6 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -4,6 +4,7 @@ """ from prefect import Parameter, case +from prefect.tasks.control_flow import merge from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run @@ -41,7 +42,10 @@ set_last_run_timestamp, upload_logs_to_bq, bq_upload, + check_param, + get_recapture_timestamps, ) + from pipelines.rj_smtr.br_rj_riodejaneiro_onibus_gps.tasks import ( pre_treatment_br_rj_riodejaneiro_onibus_gps, create_api_url_onibus_gps, @@ -54,8 +58,11 @@ every_minute, every_10_minutes, ) + from pipelines.utils.execute_dbt_model.tasks import run_dbt_model +from pipelines.rj_smtr.utils import get_project_name + # Flows # with Flow( @@ -76,9 +83,20 @@ "table_id", default=constants.GPS_SPPO_REALOCACAO_TREATED_TABLE_ID.value ) rebuild = Parameter("rebuild", False) + timestamp_param = Parameter("timestamp", None) + recapture = Parameter("recapture", False) + previous_error = Parameter("previous_error", None) # SETUP - timestamp = get_current_timestamp() + timestamp_cond = check_param(timestamp_param) + + with case(timestamp_cond, True): + timestamp_get = get_current_timestamp() + + with case(timestamp_cond, False): + timestamp_def = get_current_timestamp(timestamp_param) + + timestamp = merge(timestamp_get, timestamp_def) rename_flow_run = rename_current_flow_run_now_time( prefix="GPS SPPO - Realocação: ", now_time=timestamp @@ -124,6 +142,8 @@ parent_table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, error=error, timestamp=timestamp, + previous_error=previous_error, + recapture=recapture, ) realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) @@ -133,6 +153,48 @@ ) realocacao_sppo.schedule = every_10_minutes +REALOCACAO_SPPO_RECAPTURA_NAME = "SMTR: GPS SPPO - Realocação (recaptura)" +with Flow( + REALOCACAO_SPPO_RECAPTURA_NAME, + code_owners=["rodrigo"], # "caio", "fernanda", "boris"], +) as realocacao_sppo_recaptura: + start_date = Parameter("start_date", default="") + end_date = Parameter("end_date", default="") + + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + PROJECT_NAME = get_project_name(MODE) + + current_timestamp = get_current_timestamp() + + timestamps = get_recapture_timestamps( + current_timestamp=current_timestamp, + start_date=start_date, + end_date=end_date, + dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value, + table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value, + ) + + GPS_SPPO_REALOCACAO_RUN = create_flow_run.map( + flow_name=unmapped(realocacao_sppo.name), + project_name=unmapped(PROJECT_NAME), + run_name=unmapped(realocacao_sppo.name), + parameters=timestamps, + ) + + wait_for_flow_run.map( + GPS_SPPO_REALOCACAO_RUN, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + +realocacao_sppo_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +realocacao_sppo_recaptura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + with Flow( "SMTR: GPS SPPO - Materialização", diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index d40a1d3e1..6b888abf1 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -32,6 +32,7 @@ get_run_dates, get_join_dict, get_previous_date, + check_param, # get_local_dbt_client, # set_last_run_timestamp, ) @@ -47,8 +48,6 @@ from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven from pipelines.utils.execute_dbt_model.tasks import run_dbt_model -from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import check_param - # Flows # with Flow( diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py b/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py index cfc685e97..dd91da1d4 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py @@ -2,13 +2,3 @@ """ Tasks for projeto_subsidio_sppo """ - -from prefect import task - - -@task -def check_param(param: str) -> bool: - """ - Check if param is None - """ - return param is None diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index f7d687dea..87a4ea7f3 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -26,6 +26,7 @@ bq_project, get_table_min_max_value, get_last_run_timestamp, + query_logs_func, log_critical, data_info_str, dict_contains_keys, @@ -160,6 +161,10 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet timestamp = datetime.fromisoformat(timestamp) if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) + if isinstance(timestamp, str): + timestamp = timezone(constants.TIMEZONE.value).localize( + datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + ) if truncate_minute: return timestamp.replace(second=0, microsecond=0) return timestamp @@ -272,6 +277,7 @@ def query_logs( table_id: str, datetime_filter=None, max_recaptures: int = 60, + interval_minutes: int = 1, ): """ Queries capture logs to check for errors @@ -282,92 +288,22 @@ def query_logs( datetime_filter (pendulum.datetime.DateTime, optional): filter passed to query. This task will query the logs table for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture Returns: - list: containing timestamps for which the capture failed + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) """ - if not datetime_filter: - datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( - second=0, microsecond=0 - ) - elif isinstance(datetime_filter, str): - datetime_filter = datetime.fromisoformat(datetime_filter).replace( - second=0, microsecond=0 - ) - - query = f""" - with t as ( - select - datetime(timestamp_array) as timestamp_array - from - unnest(GENERATE_TIMESTAMP_ARRAY( - timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day), - timestamp('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), - interval 1 minute) - ) as timestamp_array - where timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - ), - logs as ( - select - *, - timestamp_trunc(timestamp_captura, minute) as timestamp_array - from - rj-smtr.{dataset_id}.{table_id}_logs - where - data between - date(datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', - interval 1 day)) - and date('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') - and - timestamp_captura between - datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day) - and '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - order by timestamp_captura + return query_logs_func( + dataset_id=dataset_id, + table_id=table_id, + datetime_filter=datetime_filter, + max_recaptures=max_recaptures, + interval_minutes=interval_minutes, ) - select - case - when logs.timestamp_captura is not null then logs.timestamp_captura - else t.timestamp_array - end as timestamp_captura, - logs.erro - from - t - left join - logs - on - logs.timestamp_array = t.timestamp_array - where - logs.sucesso is not True - order by - timestamp_captura - """ - log(f"Run query to check logs:\n{query}") - results = bd.read_sql(query=query, billing_project_id=bq_project()) - if len(results) > 0: - results["timestamp_captura"] = ( - pd.to_datetime(results["timestamp_captura"]) - .dt.tz_localize(constants.TIMEZONE.value) - .to_list() - ) - log(f"Recapture data for the following {len(results)} timestamps:\n{results}") - if len(results) > max_recaptures: - message = f""" - [SPPO - Recaptures] - Encontradas {len(results)} timestamps para serem recapturadas. - Essa run processará as seguintes: - ##### - {results[:max_recaptures]} - ##### - Sobraram as seguintes para serem recapturadas na próxima run: - ##### - {results[max_recaptures:]} - ##### - """ - log_critical(message) - results = results[:max_recaptures] - return True, results["timestamp_captura"].to_list(), results["erro"].to_list() - return False, [], [] @task @@ -1007,6 +943,137 @@ def get_previous_date(days): return now.to_date_string() +@task +def check_param(param: str) -> bool: + """ + Check if param is None + """ + return param is None + + +@task +def get_recapture_timestamps( + current_timestamp: datetime, + start_date: str, + end_date: str, + dataset_id: str, + table_id: str, + interval_minutes: int = 10, +) -> List: + """Get timestamps with no file to recapture it in a given date range. + + Args: + current_timestamp (datetime): Current timestamp + start_date (str): Start date in format YYYY-MM-DD + end_date (str): End date in format YYYY-MM-DD + dataset_id (str): Dataset id + table_id (str): Table id + interval_minutes (int, optional): Interval in minutes to recapture. Defaults to 10. + + Returns: + List: List of dicts containing the timestamps to recapture + """ + + log( + f"Getting timestamps to recapture between {start_date} and {end_date}", + level="info", + ) + + errors = [] + timestamps = [] + previous_errors = [] + flag_break = False + + dates = pd.date_range(start=start_date, end=end_date) + + dates = dates.strftime("%Y-%m-%d").to_list() + + for date in dates: + datetime_filter = datetime.fromisoformat(f"{date} 23:59:00").replace( + second=0, microsecond=0 + ) + + # Round down to the nearest interval_minutes minutes + datetime_filter = datetime_filter.replace( + minute=(datetime_filter.minute // interval_minutes) * interval_minutes + ) + + datetime_filter = timezone(constants.TIMEZONE.value).localize(datetime_filter) + + if datetime_filter > current_timestamp: + flag_break = True + + # Round down to the nearest interval_minutes minutes + current_timestamp = current_timestamp.replace( + second=0, + microsecond=0, + minute=( + (current_timestamp.minute // interval_minutes) * interval_minutes + ), + ) + + datetime_filter = current_timestamp + log( + f"""Datetime filter is greater than current timestamp, + using current timestamp instead ({datetime_filter})""", + level="warning", + ) + + errors_temp, timestamps_temp, previous_errors_temp = query_logs_func( + dataset_id=dataset_id, + table_id=table_id, + datetime_filter=datetime_filter, + max_recaptures=2 ^ 63 - 1, + interval_minutes=interval_minutes, + ) + + if timestamps_temp == []: + log( + f"""From {date}, there are no recapture timestamps""", + level="info", + ) + continue + + errors = errors + ([errors_temp] * len(timestamps_temp)) + timestamps = timestamps + timestamps_temp + previous_errors = previous_errors + previous_errors_temp + + if flag_break: + log( + "Breaking loop because datetime filter is greater than current timestamp", + level="warning", + ) + break + + timestamps = [timestamp.strftime("%Y-%m-%d %H:%M:%S") for timestamp in timestamps] + + log( + f"""From {start_date} to {end_date}, there are {len(timestamps)} recapture timestamps: \n + {timestamps}""", + level="info", + ) + + combined_data = [] + + for error, timestamp, previous_error in zip(errors, timestamps, previous_errors): + data = { + "error": error, + "timestamp": timestamp, + "previous_error": previous_error, + "recapture": True, + } + + combined_data.append(data) + + log( + f"""Combined data: \n + {combined_data}""", + level="info", + ) + + return combined_data + + ############### # # Pretreat data diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index f9b98afab..050cf97e9 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -18,6 +18,10 @@ import basedosdados as bd from basedosdados import Table import pandas as pd + +import pendulum +from datetime import datetime + from google.cloud.storage.blob import Blob @@ -403,6 +407,145 @@ def data_info_str(data: pd.DataFrame): return buffer.getvalue() +def get_project_name(mode: str): + """ + Get project name based on mode + + Args: + mode (str): mode + + Returns: + str: project name + """ + + if mode == "prod": + return constants.PREFECT_DEFAULT_PROJECT.value + else: + return "staging" + + +def query_logs_func( + dataset_id: str, + table_id: str, + datetime_filter=None, + max_recaptures: int = 60, + interval_minutes: int = 1, +): + """ + Queries capture logs to check for errors + + Args: + dataset_id (str): dataset_id on BigQuery + table_id (str): table_id on BigQuery + datetime_filter (pendulum.datetime.DateTime, optional): + filter passed to query. This task will query the logs table + for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture + + Returns: + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) + """ + + if not datetime_filter: + datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( + second=0, microsecond=0 + ) + elif isinstance(datetime_filter, str): + datetime_filter = datetime.fromisoformat(datetime_filter).replace( + second=0, microsecond=0 + ) + + datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") + + query = f""" + WITH + t AS ( + SELECT + DATETIME(timestamp_array) AS timestamp_array + FROM + UNNEST( + GENERATE_TIMESTAMP_ARRAY( + TIMESTAMP_SUB('{datetime_filter}', INTERVAL 1 day), + TIMESTAMP('{datetime_filter}'), + INTERVAL {interval_minutes} minute) ) + AS timestamp_array + WHERE + timestamp_array < '{datetime_filter}' ), + logs_table AS ( + SELECT + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), + "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(sucesso AS BOOLEAN) sucesso, + SAFE_CAST(erro AS STRING) erro, + SAFE_CAST(DATA AS DATE) DATA + FROM + rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t + ), + logs AS ( + SELECT + *, + TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array + FROM + logs_table + WHERE + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}', + INTERVAL 1 day)) + AND DATE('{datetime_filter}') + AND timestamp_captura BETWEEN + DATETIME_SUB('{datetime_filter}', INTERVAL 1 day) + AND '{datetime_filter}' + ORDER BY + timestamp_captura ) + SELECT + CASE + WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura + ELSE + t.timestamp_array + END + AS timestamp_captura, + logs.erro + FROM + t + LEFT JOIN + logs + ON + logs.timestamp_array = t.timestamp_array + WHERE + logs.sucesso IS NOT TRUE + ORDER BY + timestamp_captura + """ + log(f"Run query to check logs:\n{query}") + results = bd.read_sql(query=query, billing_project_id=bq_project()) + if len(results) > 0: + results["timestamp_captura"] = ( + pd.to_datetime(results["timestamp_captura"]) + .dt.tz_localize(constants.TIMEZONE.value) + .to_list() + ) + log(f"Recapture data for the following {len(results)} timestamps:\n{results}") + if len(results) > max_recaptures: + message = f""" + [SPPO - Recaptures] + Encontradas {len(results)} timestamps para serem recapturadas. + Essa run processará as seguintes: + ##### + {results[:max_recaptures]} + ##### + Sobraram as seguintes para serem recapturadas na próxima run: + ##### + {results[max_recaptures:]} + ##### + """ + log_critical(message) + results = results[:max_recaptures] + return True, results["timestamp_captura"].to_list(), results["erro"].to_list() + return False, [], [] + + def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-locals clock_interval: timedelta, labels: List[str],