From fe0b71998316c7011c2f5fb0283d562896d3aacc Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 15:00:25 -0300 Subject: [PATCH 01/32] feat: add precipitacao_cemaden --- .../precipitacao_cemaden/__init__.py | 7 + .../precipitacao_cemaden/constants.py | 165 +++++++++++++ .../precipitacao_cemaden/flows.py | 214 +++++++++++++++++ .../precipitacao_cemaden/schedules.py | 31 +++ .../precipitacao_cemaden/tasks.py | 219 ++++++++++++++++++ .../precipitacao_cemaden/utils.py | 55 +++++ 6 files changed, 691 insertions(+) create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/__init__.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_cemaden/utils.py diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/__init__.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/__init__.py new file mode 100644 index 000000000..0d9921707 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +""" +Prefect flows for precipitacao_cemaden project +""" +############################################################################### +# Automatically managed, please do not touch +############################################################################### diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py new file mode 100644 index 000000000..4ab17e018 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py @@ -0,0 +1,165 @@ +# -*- coding: utf-8 -*- +# flake8: noqa: E501 +""" +Constant values for the rj_cor.meteorologia.precipitacao_cemaden project +""" + +from enum import Enum + + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for the precipitacao_cemaden project + """ + + RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS = { + "redis_data_key": "data_chuva_passado_cemaden", + "redis_update_key": "data_update_chuva_passado_cemaden", + "query_data": """ + WITH + last_update_date AS ( + SELECT + CAST(MAX(data_particao) AS DATETIME) AS last_update + FROM `rj-cor.clima_pluviometro.taxa_precipitacao_cemaden` + WHERE data_particao >= DATE_SUB(CURRENT_DATETIME('America/Sao_Paulo'), INTERVAL 2 DAY) + ), + cemaden AS ( -- seleciona as últimas 2h de medição antes da última atualização + SELECT + id_estacao, + acumulado_chuva_15_min, + CURRENT_DATE('America/Sao_Paulo') as data, + data_particao, + DATETIME(CONCAT(data_particao," ", horario)) AS data_update, + FROM `rj-cor.clima_pluviometro.taxa_precipitacao_cemaden` + INNER JOIN last_update_date lup ON 1=1 + WHERE data_particao >= DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY) + AND CAST(CONCAT(data_particao, " ", horario) AS DATETIME) >= DATE_SUB(lup.last_update, INTERVAL 2 HOUR) + ), + + last_measurements AS (-- soma a quantidade chuva das últimas 2h + SELECT + a.id_estacao, + "cemaden" AS sistema, + MAX(a.data_update) AS data_update, + SUM(a.acumulado_chuva_15_min) AS acumulado_chuva_15_min, + FROM cemaden a + GROUP BY a.id_estacao, sistema + ), + + h3_chuvas AS ( -- calcula qnt de chuva para cada h3 + SELECT + h3.*, + lm.id_estacao, + lm.acumulado_chuva_15_min, + lm.acumulado_chuva_15_min/power(h3.dist,5) AS p1_15min, + 1/power(h3.dist,5) AS inv_dist + FROM ( + WITH centroid_h3 AS ( + SELECT + *, + ST_CENTROID(geometry) AS geom + FROM `rj-cor.dados_mestres.h3_grid_res8` + ), + + estacoes_pluviometricas AS ( + SELECT + id_estacao AS id, + estacao, + "cemaden" AS sistema, + ST_GEOGPOINT(CAST(longitude AS FLOAT64), + CAST(latitude AS FLOAT64)) AS geom + FROM `rj-cor.clima_pluviometro.estacoes_cemaden` + ), + + estacoes_mais_proximas AS ( -- calcula distância das estações para cada centróide do h3 + SELECT AS VALUE s + FROM ( + SELECT + ARRAY_AGG( + STRUCT( + a.id, b.id, b.estacao, + ST_DISTANCE(a.geom, b.geom), + b.sistema + ) + ORDER BY ST_DISTANCE(a.geom, b.geom) + ) AS ar + FROM (SELECT id, geom FROM centroid_h3) a + CROSS JOIN( + SELECT id, estacao, sistema, geom + FROM estacoes_pluviometricas + WHERE geom is not null + ) b + WHERE a.id <> b.id + GROUP BY a.id + ) ab + CROSS JOIN UNNEST(ab.ar) s + ) + + SELECT + *, + row_number() OVER (PARTITION BY id_h3 ORDER BY dist) AS ranking + FROM estacoes_mais_proximas + ORDER BY id_h3, ranking) h3 + LEFT JOIN last_measurements lm + ON lm.id_estacao=h3.id_estacao AND lm.sistema=h3.sistema + ), + + h3_media AS ( -- calcula média de chuva para as 3 estações mais próximas + SELECT + id_h3, + CAST(sum(p1_15min)/sum(inv_dist) AS DECIMAL) AS chuva_15min, + STRING_AGG(estacao ORDER BY estacao) estacoes + FROM h3_chuvas + -- WHERE ranking < 4 + GROUP BY id_h3 + ), + + final_table AS ( + SELECT + h3_media.id_h3, + h3_media.estacoes, + nome AS bairro, + cast(round(h3_media.chuva_15min,2) AS decimal) AS chuva_15min, + FROM h3_media + LEFT JOIN `rj-cor.dados_mestres.h3_grid_res8` h3_grid + ON h3_grid.id=h3_media.id_h3 + INNER JOIN `rj-cor.dados_mestres.bairro` + ON ST_CONTAINS(`rj-cor.dados_mestres.bairro`.geometry, ST_CENTROID(h3_grid.geometry)) + ) + + SELECT + final_table.id_h3, + bairro, + chuva_15min, + estacoes, + CASE + WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN 'chuva fraca' + WHEN chuva_15min> 10 AND chuva_15min<= 50 THEN 'chuva moderada' + WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN 'chuva forte' + WHEN chuva_15min> 100 THEN 'chuva muito forte' + ELSE 'sem chuva' + END AS status, + CASE + WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN '#DAECFB'--'#00CCFF' + WHEN chuva_15min> 1 AND chuva_15min<= 50 THEN '#A9CBE8'--'#BFA230' + WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN '#77A9D5'--'#E0701F' + WHEN chuva_15min> 100 THEN '#125999'--'#FF0000' + ELSE '#ffffff' + END AS color + FROM final_table + """, + "query_update": """ + SELECT + MAX( + DATETIME( + CONCAT(data_particao," ", horario) + ) + ) AS last_update + FROM `rj-cor.clima_pluviometro.taxa_precipitacao_cemaden` + WHERE data_particao> DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY) + """, + } diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py new file mode 100644 index 000000000..32793984b --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Flows for precipitacao_cemaden. +""" +from datetime import timedelta + +from prefect import case, Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.custom import wait_for_flow_run_with_timeout +from pipelines.rj_cor.meteorologia.precipitacao_cemaden.constants import ( + constants as cemaden_constants, +) +from pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks import ( + check_to_run_dbt, + tratar_dados, + salvar_dados, + save_last_dbt_update, +) +from pipelines.rj_cor.meteorologia.precipitacao_cemaden.schedules import ( + minute_schedule, +) +from pipelines.rj_escritorio.rain_dashboard.constants import ( + constants as rain_dashboard_constants, +) +from pipelines.utils.decorators import Flow +from pipelines.utils.dump_db.constants import constants as dump_db_constants +from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, +) + +wait_for_flow_run_with_2min_timeout = wait_for_flow_run_with_timeout( + timeout=timedelta(minutes=2) +) + +with Flow( + name="COR: Meteorologia - Precipitacao CEMADEN", + code_owners=[ + "richardg867", + ], + # skip_if_running=True, +) as cor_meteorologia_precipitacao_cemaden: + + DATASET_ID = "clima_pluviometro" + TABLE_ID = "taxa_precipitacao_cemaden" + DUMP_MODE = "append" + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=False, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=False, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False) + TRIGGER_RAIN_DASHBOARD_UPDATE = Parameter( + "trigger_rain_dashboard_update", default=False, required=False + ) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dados, empty_data = tratar_dados( + dataset_id=DATASET_ID, + table_id=TABLE_ID, + mode=MATERIALIZATION_MODE, + ) + + with case(empty_data, False): + path = salvar_dados(dados=dados) + # Create table in BigQuery + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=path, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + dump_mode=DUMP_MODE, + wait=path, + ) + + run_dbt = check_to_run_dbt( + dataset_id=DATASET_ID, + table_id=TABLE_ID, + mode=MATERIALIZATION_MODE, + ) + run_dbt.set_upstream(UPLOAD_TABLE) + + with case(run_dbt, True): + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", + ) + + current_flow_labels.set_upstream(run_dbt) + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + last_dbt_update = save_last_dbt_update( + dataset_id=DATASET_ID, + table_id=TABLE_ID, + mode=MATERIALIZATION_MODE, + wait=wait_for_materialization, + ) + + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(TRIGGER_RAIN_DASHBOARD_UPDATE, True): + # Trigger rain dashboard update flow run + rain_dashboard_update_flow = create_flow_run( + flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS.value, # noqa + labels=[ + "rj-escritorio-dev", + ], + run_name="Update rain dashboard data (triggered by precipitacao_cemaden flow)", # noqa + ) + rain_dashboard_update_flow.set_upstream(wait_for_materialization) + + wait_for_rain_dashboard_update = wait_for_flow_run( + flow_run_id=rain_dashboard_update_flow, + stream_states=True, + stream_logs=True, + raise_final_state=False, + ) + + # Trigger rain dashboard update last 2h flow run + rain_dashboard_last_2h_update_flow = create_flow_run( + flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters=cemaden_constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS.value, # noqa + labels=[ + "rj-escritorio-dev", + ], + run_name="Update rain dashboard data (triggered by precipitacao_cemaden last 2h flow)", # noqa + ) + rain_dashboard_last_2h_update_flow.set_upstream( + wait_for_materialization + ) + + wait_for_rain_dashboard_last_2h_update = wait_for_flow_run( + flow_run_id=rain_dashboard_last_2h_update_flow, + stream_states=True, + stream_logs=True, + raise_final_state=False, + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +# para rodar na cloud +cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +cor_meteorologia_precipitacao_cemaden.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + # labels=[constants.RJ_COR_AGENT_LABEL.value], + labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value], +) +cor_meteorologia_precipitacao_cemaden.schedule = minute_schedule diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py new file mode 100644 index 000000000..3a17890f0 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Schedules for precipitacao_cemaden +Rodar a cada 1 minuto +""" +from datetime import timedelta, datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +minute_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=1), + start_date=datetime(2023, 1, 1, 0, 0, 30), + labels=[ + # constants.RJ_COR_AGENT_LABEL.value, + constants.RJ_IPLANRIO_AGENT_LABEL.value, + ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + # "materialize_after_dump": True, + "materialize_after_dump": False, + # "mode": "prod", + "materialize_to_datario": False, + "dump_to_gcs": False, + }, + ), + ] +) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py new file mode 100644 index 000000000..2101e7330 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Tasks for precipitacao_cemaden +""" +from datetime import timedelta +from pathlib import Path +from typing import Union, Tuple + +import numpy as np +import pandas as pd +import pendulum +from prefect import task + +# from prefect import context + +from pipelines.constants import constants +from pipelines.rj_cor.meteorologia.precipitacao_cemaden.utils import ( + parse_date_columns, +) +from pipelines.utils.utils import ( + build_redis_key, + compare_dates_between_tables_redis, + log, + to_partitions, + save_str_on_redis, + save_updated_rows_on_redis, +) + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def tratar_dados( + dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, bool]: + """ + Renomeia colunas e filtra dados com a hora e minuto do timestamp + de execução mais próximo à este + """ + + url = "http://sjc.salvar.cemaden.gov.br/resources/graficos/interativo/getJson2.php?uf=RJ" + dados = pd.read_json(url) + + drop_cols = [ + "uf", + "codibge", + "cidade", + "tipoestacao", + "status", + ] + rename_cols = { + "idestacao": "id_estacao", + "nomeestacao": "nome_estacao", + "ultimovalor": "instantaneo_chuva", + "datahoraUltimovalor": "data_medicao_utc", + "acc1hr": "acumulado_chuva_1_h", + "acc3hr": "acumulado_chuva_3_h", + "acc6hr": "acumulado_chuva_6_h", + "acc12hr": "acumulado_chuva_12_h", + "acc24hr": "acumulado_chuva_24_h", + "acc48hr": "acumulado_chuva_48_h", + "acc72hr": "acumulado_chuva_72_h", + "acc96hr": "acumulado_chuva_96_h", + } + + dados = ( + dados[(dados["codibge"] == 3304557) & (dados["tipoestacao"] == 1)] + .drop(drop_cols, axis=1) + .rename(rename_cols, axis=1) + ) + log(f"\n[DEBUG]: df.head() {dados.head()}") + + # Converte de UTC para horário São Paulo + dados["data_medicao_utc"] = pd.to_datetime(dados["data_medicao_utc"], dayfirst=True) + + see_cols = ["data_medicao_utc", "id_estacao", "acumulado_chuva_1_h"] + log(f"DEBUG: data utc {dados[see_cols]}") + + date_format = "%Y-%m-%d %H:%M:%S" + dados["data_medicao"] = dados["data_medicao_utc"].dt.strftime(date_format) + + log(f"DEBUG: df dtypes {dados.dtypes}") + see_cols = ["data_medicao", "id_estacao", "acumulado_chuva_1_h"] + log(f"DEBUG: data {dados[see_cols]}") + + # Alterando valores '-' e np.nan para NULL + dados.replace(["-", np.nan], [None], inplace=True) + + # Altera valores negativos para None + float_cols = [ + "acumulado_chuva_1_h", + "acumulado_chuva_3_h", + "acumulado_chuva_6_h", + "acumulado_chuva_12_h", + "acumulado_chuva_24_h", + "acumulado_chuva_48_h", + "acumulado_chuva_72_h", + "acumulado_chuva_96_h", + ] + dados[float_cols] = np.where(dados[float_cols] < 0, None, dados[float_cols]) + + # Elimina linhas em que o id_estacao é igual mantendo a de menor valor nas colunas float + dados.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) + dados.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + + log(f"uniquesss df >>>, {type(dados.id_estacao.unique()[0])}") + dados["id_estacao"] = dados["id_estacao"].astype(str) + + dados = save_updated_rows_on_redis( + dados, + dataset_id, + table_id, + unique_id="id_estacao", + date_column="data_medicao", + date_format=date_format, + mode=mode, + ) + + # If df is empty stop flow on flows.py + empty_data = dados.shape[0] == 0 + log(f"[DEBUG]: dataframe is empty: {empty_data}") + + # Save max date on redis to compare this with last dbt run + if not empty_data: + max_date = str(dados["data_medicao"].max()) + redis_key = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) + log(f"[DEBUG]: dataframe is not empty key: {redis_key} {max_date}") + save_str_on_redis(redis_key, "date", max_date) + + # Fixar ordem das colunas + dados = dados[ + [ + "data_medicao", + "id_estacao", + "nome_estacao", + "instantaneo_chuva", + "acumulado_chuva_1_h", + "acumulado_chuva_3_h", + "acumulado_chuva_6_h", + "acumulado_chuva_12_h", + "acumulado_chuva_24_h", + "acumulado_chuva_48_h", + "acumulado_chuva_72_h", + "acumulado_chuva_96_h", + ] + ] + + return dados, empty_data + + +@task +def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: + """ + Salvar dados tratados em csv para conseguir subir pro GCP + """ + + prepath = Path("/tmp/precipitacao_cemaden/") + prepath.mkdir(parents=True, exist_ok=True) + + partition_column = "data_medicao" + dataframe, partitions = parse_date_columns(dados, partition_column) + current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") + + # Cria partições a partir da data + to_partitions( + data=dataframe, + partition_columns=partitions, + savepath=prepath, + data_type="csv", + suffix=current_time, + ) + log(f"[DEBUG] Files saved on {prepath}") + return prepath + + +@task +def save_last_dbt_update( + dataset_id: str, + table_id: str, + mode: str = "dev", + wait=None, # pylint: disable=unused-argument +) -> None: + """ + Save on dbt last timestamp where it was updated + """ + now = pendulum.now("America/Sao_Paulo").to_datetime_string() + redis_key = build_redis_key(dataset_id, table_id, name="dbt_last_update", mode=mode) + log(f">>>>> debug saving actual date on dbt redis {redis_key} {now}") + save_str_on_redis(redis_key, "date", now) + + +@task(skip_on_upstream_skip=False) +def check_to_run_dbt( + dataset_id: str, + table_id: str, + mode: str = "dev", +) -> bool: + """ + It will run even if its upstream tasks skip. + """ + + key_table_1 = build_redis_key( + dataset_id, table_id, name="dbt_last_update", mode=mode + ) + key_table_2 = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) + + format_date_table_1 = "YYYY-MM-DD HH:mm:SS" + format_date_table_2 = "YYYY-MM-DD HH:mm:SS" + + # Returns true if date saved on table_2 (cemaden) is bigger than + # the date saved on table_1 (dbt). + run_dbt = compare_dates_between_tables_redis( + key_table_1, format_date_table_1, key_table_2, format_date_table_2 + ) + log(f">>>> debug data cemaden > data dbt: {run_dbt}") + return run_dbt diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/utils.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/utils.py new file mode 100644 index 000000000..7b2af3764 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/utils.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +""" +Utils for precipitacao_cemaden +""" +from typing import List, Tuple + +import numpy as np +import pandas as pd + + +def parse_date_columns( + dataframe: pd.DataFrame, partition_date_column: str +) -> Tuple[pd.DataFrame, List[str]]: + """ + Parses the date columns to the partition format. Reformatado para manter o formato utilizado + quando os dados foram salvos pela primeira vez (ano, mes, dia). + """ + ano_col = "ano" + mes_col = "mes" + data_col = "dia" + cols = [ano_col, mes_col, data_col] + for col in cols: + if col in dataframe.columns: + raise ValueError(f"Column {col} already exists, please review your model.") + + dataframe[partition_date_column] = dataframe[partition_date_column].astype(str) + dataframe[data_col] = pd.to_datetime( + dataframe[partition_date_column], errors="coerce" + ) + + dataframe[ano_col] = ( + dataframe[data_col] + .dt.year.fillna(-1) + .astype(int) + .astype(str) + .replace("-1", np.nan) + ) + + dataframe[mes_col] = ( + dataframe[data_col] + .dt.month.fillna(-1) + .astype(int) + .astype(str) + .replace("-1", np.nan) + ) + + dataframe[data_col] = ( + dataframe[data_col] + .dt.day.fillna(-1) + .astype(int) + .astype(str) + .replace("-1", np.nan) + ) + + return dataframe, [ano_col, mes_col, data_col] From 3685fc63cacba6614f73795733d8fe0eecdfb09e Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 15:10:37 -0300 Subject: [PATCH 02/32] fix: remover nome de estacao Cemaden --- pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 2101e7330..002e1ecd5 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -48,12 +48,12 @@ def tratar_dados( "uf", "codibge", "cidade", + "nomeestacao", "tipoestacao", "status", ] rename_cols = { "idestacao": "id_estacao", - "nomeestacao": "nome_estacao", "ultimovalor": "instantaneo_chuva", "datahoraUltimovalor": "data_medicao_utc", "acc1hr": "acumulado_chuva_1_h", @@ -135,7 +135,6 @@ def tratar_dados( [ "data_medicao", "id_estacao", - "nome_estacao", "instantaneo_chuva", "acumulado_chuva_1_h", "acumulado_chuva_3_h", From f2810bac8137d7206a96ab4f6c142d876a6c272b Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 15:11:05 -0300 Subject: [PATCH 03/32] fix: corrigir espaco de tempo na query Cemaden --- .../precipitacao_cemaden/constants.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py index 4ab17e018..72c72861a 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/constants.py @@ -26,7 +26,7 @@ class constants(Enum): # pylint: disable=c0103 cemaden AS ( -- seleciona as últimas 2h de medição antes da última atualização SELECT id_estacao, - acumulado_chuva_15_min, + acumulado_chuva_1_h, CURRENT_DATE('America/Sao_Paulo') as data, data_particao, DATETIME(CONCAT(data_particao," ", horario)) AS data_update, @@ -41,7 +41,7 @@ class constants(Enum): # pylint: disable=c0103 a.id_estacao, "cemaden" AS sistema, MAX(a.data_update) AS data_update, - SUM(a.acumulado_chuva_15_min) AS acumulado_chuva_15_min, + SUM(a.acumulado_chuva_1_h) AS acumulado_chuva_1_h, FROM cemaden a GROUP BY a.id_estacao, sistema ), @@ -50,8 +50,8 @@ class constants(Enum): # pylint: disable=c0103 SELECT h3.*, lm.id_estacao, - lm.acumulado_chuva_15_min, - lm.acumulado_chuva_15_min/power(h3.dist,5) AS p1_15min, + lm.acumulado_chuva_1_h, + lm.acumulado_chuva_1_h/power(h3.dist,5) AS p1_1h, 1/power(h3.dist,5) AS inv_dist FROM ( WITH centroid_h3 AS ( @@ -111,7 +111,7 @@ class constants(Enum): # pylint: disable=c0103 h3_media AS ( -- calcula média de chuva para as 3 estações mais próximas SELECT id_h3, - CAST(sum(p1_15min)/sum(inv_dist) AS DECIMAL) AS chuva_15min, + CAST(sum(p1_1h)/sum(inv_dist) AS DECIMAL) AS chuva_1h, STRING_AGG(estacao ORDER BY estacao) estacoes FROM h3_chuvas -- WHERE ranking < 4 @@ -123,7 +123,7 @@ class constants(Enum): # pylint: disable=c0103 h3_media.id_h3, h3_media.estacoes, nome AS bairro, - cast(round(h3_media.chuva_15min,2) AS decimal) AS chuva_15min, + cast(round(h3_media.chuva_1h,2) AS decimal) AS chuva_1h, FROM h3_media LEFT JOIN `rj-cor.dados_mestres.h3_grid_res8` h3_grid ON h3_grid.id=h3_media.id_h3 @@ -134,20 +134,20 @@ class constants(Enum): # pylint: disable=c0103 SELECT final_table.id_h3, bairro, - chuva_15min, + chuva_1h, estacoes, CASE - WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN 'chuva fraca' - WHEN chuva_15min> 10 AND chuva_15min<= 50 THEN 'chuva moderada' - WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN 'chuva forte' - WHEN chuva_15min> 100 THEN 'chuva muito forte' + WHEN chuva_1h> 0 AND chuva_1h<= 10 THEN 'chuva fraca' + WHEN chuva_1h> 10 AND chuva_1h<= 50 THEN 'chuva moderada' + WHEN chuva_1h> 50 AND chuva_1h<= 100 THEN 'chuva forte' + WHEN chuva_1h> 100 THEN 'chuva muito forte' ELSE 'sem chuva' END AS status, CASE - WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN '#DAECFB'--'#00CCFF' - WHEN chuva_15min> 1 AND chuva_15min<= 50 THEN '#A9CBE8'--'#BFA230' - WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN '#77A9D5'--'#E0701F' - WHEN chuva_15min> 100 THEN '#125999'--'#FF0000' + WHEN chuva_1h> 0 AND chuva_1h<= 10 THEN '#DAECFB'--'#00CCFF' + WHEN chuva_1h> 1 AND chuva_1h<= 50 THEN '#A9CBE8'--'#BFA230' + WHEN chuva_1h> 50 AND chuva_1h<= 100 THEN '#77A9D5'--'#E0701F' + WHEN chuva_1h> 100 THEN '#125999'--'#FF0000' ELSE '#ffffff' END AS color FROM final_table From 722856f008fc4c9b742393b7683f34e61a6f7432 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 15:27:00 -0300 Subject: [PATCH 04/32] fix: registrar flow Cemaden --- pipelines/rj_cor/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/rj_cor/__init__.py b/pipelines/rj_cor/__init__.py index 535876ff4..00c06849e 100644 --- a/pipelines/rj_cor/__init__.py +++ b/pipelines/rj_cor/__init__.py @@ -5,6 +5,7 @@ from pipelines.rj_cor.bot_semaforo.flows import * from pipelines.rj_cor.meteorologia.meteorologia_inmet.flows import * from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import * +from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import * from pipelines.rj_cor.meteorologia.satelite.flows import * from pipelines.rj_cor.meteorologia.precipitacao_websirene.flows import * from pipelines.rj_cor.meteorologia.radar.precipitacao.flows import * From c6aa1c37520b026eb17b2d3091c7399540e8c570 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 15:43:09 -0300 Subject: [PATCH 05/32] =?UTF-8?q?fix:=20corrigir=20erro=20em=20limpeza=20d?= =?UTF-8?q?e=20dados=20n=C3=A3o-float=20do=20Cemaden?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 002e1ecd5..e6a7dd2bf 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -87,7 +87,7 @@ def tratar_dados( log(f"DEBUG: data {dados[see_cols]}") # Alterando valores '-' e np.nan para NULL - dados.replace(["-", np.nan], [None], inplace=True) + dados.replace(["-", np.nan], [None, None], inplace=True) # Altera valores negativos para None float_cols = [ From c0af02e19531c16a95a86a2c0fff84dcc247c406 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 18:11:41 -0300 Subject: [PATCH 06/32] feat: add meteorologia_redemet --- pipelines/rj_cor/__init__.py | 1 + .../meteorologia_redemet/__init__.py | 7 + .../meteorologia_redemet/flows.py | 145 +++++++++++++ .../meteorologia_redemet/schedules.py | 22 ++ .../meteorologia_redemet/tasks.py | 203 ++++++++++++++++++ 5 files changed, 378 insertions(+) create mode 100644 pipelines/rj_cor/meteorologia/meteorologia_redemet/__init__.py create mode 100644 pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py create mode 100644 pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py create mode 100644 pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py diff --git a/pipelines/rj_cor/__init__.py b/pipelines/rj_cor/__init__.py index 00c06849e..34ffc0e76 100644 --- a/pipelines/rj_cor/__init__.py +++ b/pipelines/rj_cor/__init__.py @@ -4,6 +4,7 @@ """ from pipelines.rj_cor.bot_semaforo.flows import * from pipelines.rj_cor.meteorologia.meteorologia_inmet.flows import * +from pipelines.rj_cor.meteorologia.meteorologia_redemet.flows import * from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import * from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import * from pipelines.rj_cor.meteorologia.satelite.flows import * diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/__init__.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/__init__.py new file mode 100644 index 000000000..1515dbb06 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +""" +Prefect flows for meteorologia_redemet project +""" +############################################################################### +# Automatically managed, please do not touch +############################################################################### diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py new file mode 100644 index 000000000..342e4adc9 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103, E1120 +""" +Flows for meteorologia_redemet +""" +from datetime import timedelta + +from prefect import case, Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.utils.constants import constants as utils_constants +from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import ( + get_dates, + # slice_data, + download, + tratar_dados, + salvar_dados, +) +from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule +from pipelines.utils.decorators import Flow +from pipelines.utils.dump_db.constants import constants as dump_db_constants +from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, +) + + +with Flow( + name="COR: Meteorologia - Meteorologia REDEMET", + code_owners=[ + "richardg867", + ], +) as cor_meteorologia_meteorologia_redemet: + + DATASET_ID = "clima_estacao_meteorologica" + TABLE_ID = "meteorologia_redemet" + DUMP_MODE = "append" + + # data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD" + data_inicio = Parameter("data_inicio", default="", required=False) + data_fim = Parameter("data_fim", default="", required=False) + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=False, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=False, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) + # data = slice_data(current_time=CURRENT_TIME) + dados = download(data_inicio_, data_fim_) + dados = tratar_dados(dados, backfill) + PATH = salvar_dados(dados=dados) + + # Create table in BigQuery + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=PATH, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + dump_mode=DUMP_MODE, + wait=PATH, + ) + + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", + ) + + materialization_flow.set_upstream(UPLOAD_TABLE) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run( + dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + +# para rodar na cloud +cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + # labels=[constants.RJ_COR_AGENT_LABEL.value], + labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value], +) +cor_meteorologia_meteorologia_redemet.schedule = hour_schedule diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py new file mode 100644 index 000000000..be0420606 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +Schedules for meteorologia_redemet +Rodar a cada 1 hora +""" +from datetime import timedelta, datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +hour_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(hours=1), + start_date=datetime(2023, 1, 1, 0, 12, 0), + labels=[ + # constants.RJ_COR_AGENT_LABEL.value, + constants.RJ_IPLANRIO_AGENT_LABEL.value, + ], + ), + ] +) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py new file mode 100644 index 000000000..1ff90316d --- /dev/null +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +""" +Tasks for meteorologia_redemet +""" +from datetime import datetime, timedelta +import json +from pathlib import Path +from typing import Tuple, Union + +import pandas as pd +import pendulum +from prefect import task +import requests + +from pipelines.constants import constants +from pipelines.utils.utils import get_vault_secret, log, to_partitions +from pipelines.rj_cor.meteorologia.precipitacao_alertario.utils import ( + parse_date_columns, +) + + +@task(nout=3) +def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: + """ + Task para obter o dia de início e o de fim. + Se nenhuma data foi passada a data_inicio corresponde a ontem + e data_fim a hoje e não estamos fazendo backfill. + Caso contrário, retorna as datas inputadas mos parâmetros do flow. + """ + # a API sempre retorna o dado em UTC + log(f"data de inicio e fim antes do if {data_inicio} {data_fim}") + if data_inicio == "": + data_fim = pendulum.now("UTC").format("YYYY-MM-DD") + data_inicio = pendulum.yesterday("UTC").format("YYYY-MM-DD") + backfill = 0 + else: + backfill = 1 + log(f"data de inicio e fim dps do if {data_inicio} {data_fim}") + + return data_inicio, data_fim, backfill + + +@task() +def slice_data(current_time: str) -> str: + """ + Retorna a data e hora do timestamp de execução + """ + if not isinstance(current_time, str): + current_time = current_time.to_datetime_string() + + data = current_time[:10] + return data + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def download(data_inicio: str, data_fim: str) -> pd.DataFrame: + """ + Faz o request na data especificada e retorna dados + """ + + # Lista com as estações da cidade do Rio de Janeiro + estacoes_unicas = [ + "SBAF", + "SBGL", + "SBJR", + "SBRJ", + "SBSC", + ] + + dicionario = get_vault_secret("redemet-token") + token = dicionario["data"]["token"] + + # Converte datas em int para cálculo de faixas. + data_inicio_int = int(data_inicio.replace("-", "")) + data_fim_int = int(data_fim.replace("-", "")) + + raw = [] + for id_estacao in estacoes_unicas: + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={token}" + for data in range(data_inicio_int, data_fim_int + 1): + for hora in range(24): + url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" + res = requests.get(url) + if res.status_code != 200: + log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") + continue + res_data = json.loads(res.text) + if res_data["status"] is not True: + log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") + continue + if "data" not in res_data["data"]: + # Datas no futuro retornam apenas a informação da estação, + # sem os campos de meteorologia, inclusive o campo "data". + # Não requisitar as horas seguintes. + break + raw.append(res_data) + + # Extrai objetos de dados + raw = [res_data["data"] for res_data in raw] + + # converte para dados + dados = pd.DataFrame(raw) + + return dados + + +@task +def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: + """ + Renomeia colunas e filtra dados com a hora do timestamp de execução + """ + + drop_cols = [ + "nome", + "cidade", + "lon", + "lat", + "localizacao", + "tempoImagem", + ] + # Checa se todas estão no df + drop_cols = [c for c in drop_cols if c in dados.columns] + + # Remove colunas que já temos os dados em outras tabelas + dados = dados.drop(drop_cols, axis=1) + + # Adequando nome das variáveis + rename_cols = { + "localidade": "id_estacao", + "ur": "umidade", + } + + dados = dados.rename(columns=rename_cols) + + # Converte horário de UTC para America/Sao Paulo + formato = "DD/MM/YYYY HH:mm(z)" + dados["data"] = dados["data"].apply( + lambda x: pendulum.from_format(x, formato) + .in_tz("America/Sao_Paulo") + .format(formato) + ) + + # Ordenamento de variáveis + chaves_primarias = ["id_estacao", "data"] + demais_cols = [c for c in dados.columns if c not in chaves_primarias] + + dados = dados[chaves_primarias + demais_cols] + + # Converte variáveis que deveriam ser int para int + dados["temperatura"] = dados["temperatura"].apply(lambda x: int(x[:-2])) + dados["umidade"] = dados["umidade"].apply(lambda x: int(x[:-1])) + + dados["data"] = pd.to_datetime(dados.data, format="%d/%m/%Y %H:%M(%Z)") + + # Pegar o dia no nosso timezone como partição + br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD") + + # Define colunas que serão salvas + dados = dados[ + [ + "id_estacao", + "data", + "temperatura", + "umidade", + "condicoes_tempo", + "ceu", + "teto", + "visibilidade", + ] + ] + + if not backfill: + # Seleciona apenas dados daquele dia (devido à UTC) + dados = dados[dados["data"] == br_timezone] + + print(">>>> max hora ", dados[~dados.temperatura.isna()].horario.max()) + return dados + + +@task +def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: + """ + Salvar dados em csv + """ + + prepath = Path("/tmp/meteorologia_redemet/") + prepath.mkdir(parents=True, exist_ok=True) + + partition_column = "data" + dataframe, partitions = parse_date_columns(dados, partition_column) + + # Cria partições a partir da data + to_partitions( + data=dataframe, + partition_columns=partitions, + savepath=prepath, + data_type="csv", + ) + log(f"[DEBUG] Files saved on {prepath}") + return prepath From 502793138395d07950ab8b31a6147cc8f71c11d1 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 12 May 2023 21:39:25 -0300 Subject: [PATCH 07/32] fix: corrigir erro de log REDEMET --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 1ff90316d..b8d31b6a6 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -176,7 +176,7 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: # Seleciona apenas dados daquele dia (devido à UTC) dados = dados[dados["data"] == br_timezone] - print(">>>> max hora ", dados[~dados.temperatura.isna()].horario.max()) + log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") return dados From 8040d0ac3eaf0193ad77a30c01e217a264821d30 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 19 May 2023 14:13:30 -0300 Subject: [PATCH 08/32] fix: tentativa de diagnosticar problemas com pipeline REDEMET --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index b8d31b6a6..163dccb7d 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -83,6 +83,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: for data in range(data_inicio_int, data_fim_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" + log(f"Carregando localidade={id_estacao}&datahora={data:06}{hora:02}") res = requests.get(url) if res.status_code != 200: log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") @@ -95,6 +96,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: # Datas no futuro retornam apenas a informação da estação, # sem os campos de meteorologia, inclusive o campo "data". # Não requisitar as horas seguintes. + log("Dados nao retornados") break raw.append(res_data) From 1d8482a9f8cc6711222357bddcccff660731a603 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 19 May 2023 14:56:37 -0300 Subject: [PATCH 09/32] fix: nao interromper pipeline REDEMET em caso de falta de dados para a hora --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 163dccb7d..45aaae896 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -83,7 +83,6 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: for data in range(data_inicio_int, data_fim_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" - log(f"Carregando localidade={id_estacao}&datahora={data:06}{hora:02}") res = requests.get(url) if res.status_code != 200: log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") @@ -92,12 +91,6 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: if res_data["status"] is not True: log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") continue - if "data" not in res_data["data"]: - # Datas no futuro retornam apenas a informação da estação, - # sem os campos de meteorologia, inclusive o campo "data". - # Não requisitar as horas seguintes. - log("Dados nao retornados") - break raw.append(res_data) # Extrai objetos de dados From e7679f7db5cd3f65596007deaa99de9100677606 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 26 May 2023 10:29:52 -0300 Subject: [PATCH 10/32] chore: tentativa de diagnosticar mais problemas REDEMET --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 45aaae896..b6244b2fb 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -98,6 +98,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: # converte para dados dados = pd.DataFrame(raw) + log(f"Dados base:\n{dados}") return dados @@ -167,6 +168,8 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] + log(f"Dados antes do filtro dia:\n{dados}") + if not backfill: # Seleciona apenas dados daquele dia (devido à UTC) dados = dados[dados["data"] == br_timezone] From c6f7028edbc695e5a8b79079224f72126b61f018 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 26 May 2023 14:07:35 -0300 Subject: [PATCH 11/32] fix: mais correcoes no REDEMET --- .../meteorologia/meteorologia_redemet/tasks.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index b6244b2fb..27836d8b1 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -91,6 +91,9 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: if res_data["status"] is not True: log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") continue + elif "data" not in res_data["data"]: + # Sem dados para esse horario + continue raw.append(res_data) # Extrai objetos de dados @@ -98,7 +101,6 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: # converte para dados dados = pd.DataFrame(raw) - log(f"Dados base:\n{dados}") return dados @@ -146,8 +148,12 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: dados = dados[chaves_primarias + demais_cols] # Converte variáveis que deveriam ser int para int - dados["temperatura"] = dados["temperatura"].apply(lambda x: int(x[:-2])) - dados["umidade"] = dados["umidade"].apply(lambda x: int(x[:-1])) + dados["temperatura"] = dados["temperatura"].apply( + lambda x: None if x[:-2] == "NIL" else int(x[:-2]) + ) + dados["umidade"] = dados["umidade"].apply( + lambda x: None if "%" not in x else int(x[:-1]) + ) dados["data"] = pd.to_datetime(dados.data, format="%d/%m/%Y %H:%M(%Z)") From 3b903113fce478b9b0efae2ac4fcc79ecc7958c2 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Wed, 14 Jun 2023 17:24:29 -0300 Subject: [PATCH 12/32] chore: Mais uma tentativa de diagnosticar o REDEMET --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 27836d8b1..a3dba595b 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -174,12 +174,15 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] - log(f"Dados antes do filtro dia:\n{dados}") + log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") if not backfill: # Seleciona apenas dados daquele dia (devido à UTC) dados = dados[dados["data"] == br_timezone] + log(f"Dados depois do filtro dia:\n{dados[['id_estacao', 'data']]}") + + log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}") log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") return dados From 60f586d175775eab21b60281a5257316e85a5800 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Fri, 16 Jun 2023 14:22:49 -0300 Subject: [PATCH 13/32] fix: Corrigir filtro de datas REDEMET --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index a3dba595b..e438b1979 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -174,13 +174,12 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] - log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") + # log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") + log(dados.to_csv()) if not backfill: # Seleciona apenas dados daquele dia (devido à UTC) - dados = dados[dados["data"] == br_timezone] - - log(f"Dados depois do filtro dia:\n{dados[['id_estacao', 'data']]}") + dados = dados[dados["data"].dt.date.astype(str) == br_timezone] log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}") log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") From 61ae82e6b3c61a41f70f5e16571109ec496324ee Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Thu, 22 Jun 2023 13:32:59 -0300 Subject: [PATCH 14/32] fix: Remover dados duplicados REDEMET --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index e438b1979..ed7d0bbd4 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -174,8 +174,10 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] - # log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") - log(dados.to_csv()) + # Remover dados duplicados + dados = dados.drop_duplicates(subset=["id_estacao", "data"]) + + log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") if not backfill: # Seleciona apenas dados daquele dia (devido à UTC) From 0844e64753550189b3edbf531e469fd049561630 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Tue, 4 Jul 2023 14:34:06 -0300 Subject: [PATCH 15/32] chore: Corrigidas warnings no codigo REDEMET --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index ed7d0bbd4..e9a1d326a 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -2,7 +2,7 @@ """ Tasks for meteorologia_redemet """ -from datetime import datetime, timedelta +from datetime import timedelta import json from pathlib import Path from typing import Tuple, Union @@ -71,7 +71,6 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: ] dicionario = get_vault_secret("redemet-token") - token = dicionario["data"]["token"] # Converte datas em int para cálculo de faixas. data_inicio_int = int(data_inicio.replace("-", "")) @@ -79,7 +78,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: raw = [] for id_estacao in estacoes_unicas: - base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={token}" + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa for data in range(data_inicio_int, data_fim_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" @@ -91,7 +90,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: if res_data["status"] is not True: log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") continue - elif "data" not in res_data["data"]: + if "data" not in res_data["data"]: # Sem dados para esse horario continue raw.append(res_data) From 06b9fd0ff82f5db10ee58548b269c9bec6c0bfd7 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Tue, 4 Jul 2023 15:19:09 -0300 Subject: [PATCH 16/32] fix: Migrar flows Cemaden e REDEMET para agente do COR --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 +-- .../rj_cor/meteorologia/meteorologia_redemet/schedules.py | 3 +-- pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py | 3 +-- .../rj_cor/meteorologia/precipitacao_cemaden/schedules.py | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 342e4adc9..1fd089666 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -139,7 +139,6 @@ cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, - # labels=[constants.RJ_COR_AGENT_LABEL.value], - labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value], + labels=[constants.RJ_COR_AGENT_LABEL.value], ) cor_meteorologia_meteorologia_redemet.schedule = hour_schedule diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py index be0420606..e180adc28 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -14,8 +14,7 @@ interval=timedelta(hours=1), start_date=datetime(2023, 1, 1, 0, 12, 0), labels=[ - # constants.RJ_COR_AGENT_LABEL.value, - constants.RJ_IPLANRIO_AGENT_LABEL.value, + constants.RJ_COR_AGENT_LABEL.value, ], ), ] diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index 32793984b..66a33eee5 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -208,7 +208,6 @@ cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_precipitacao_cemaden.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, - # labels=[constants.RJ_COR_AGENT_LABEL.value], - labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value], + labels=[constants.RJ_COR_AGENT_LABEL.value], ) cor_meteorologia_precipitacao_cemaden.schedule = minute_schedule diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index 3a17890f0..1292bc049 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -15,8 +15,7 @@ interval=timedelta(minutes=1), start_date=datetime(2023, 1, 1, 0, 0, 30), labels=[ - # constants.RJ_COR_AGENT_LABEL.value, - constants.RJ_IPLANRIO_AGENT_LABEL.value, + constants.RJ_COR_AGENT_LABEL.value, ], parameter_defaults={ # "trigger_rain_dashboard_update": True, From 32996adad1c6494bcac28504a9e88f4e01fa4d55 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Thu, 6 Jul 2023 17:41:54 -0300 Subject: [PATCH 17/32] fix: Remover fuso horario REDEMET para auxiliar o DBT --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index e9a1d326a..3aebecd6b 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -184,6 +184,10 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}") log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") + + # Remover fuso horário + dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") + return dados From d3bfe7f2a7b38f69b7b65eb1733787a4c7e06c78 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 24 Aug 2023 15:58:34 -0300 Subject: [PATCH 18/32] melhorias no flow --- .../meteorologia_redemet/flows.py | 2 +- .../precipitacao_cemaden/flows.py | 204 ++++++++---------- .../precipitacao_cemaden/schedules.py | 2 +- .../precipitacao_cemaden/tasks.py | 87 +------- 4 files changed, 97 insertions(+), 198 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 1fd089666..fb731877f 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -33,9 +33,9 @@ name="COR: Meteorologia - Meteorologia REDEMET", code_owners=[ "richardg867", + "paty", ], ) as cor_meteorologia_meteorologia_redemet: - DATASET_ID = "clima_estacao_meteorologica" TABLE_ID = "meteorologia_redemet" DUMP_MODE = "append" diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index 66a33eee5..e8e573e56 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -17,10 +17,8 @@ constants as cemaden_constants, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks import ( - check_to_run_dbt, tratar_dados, salvar_dados, - save_last_dbt_update, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.schedules import ( minute_schedule, @@ -44,10 +42,10 @@ name="COR: Meteorologia - Precipitacao CEMADEN", code_owners=[ "richardg867", + "paty", ], # skip_if_running=True, ) as cor_meteorologia_precipitacao_cemaden: - DATASET_ID = "clima_pluviometro" TABLE_ID = "taxa_precipitacao_cemaden" DUMP_MODE = "append" @@ -73,136 +71,112 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - dados, empty_data = tratar_dados( + dados = tratar_dados() + path = salvar_dados(dados=dados) + + # Create table in BigQuery + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=path, dataset_id=DATASET_ID, table_id=TABLE_ID, - mode=MATERIALIZATION_MODE, + dump_mode=DUMP_MODE, + wait=path, ) - with case(empty_data, False): - path = salvar_dados(dados=dados) - # Create table in BigQuery - UPLOAD_TABLE = create_table_and_upload_to_gcs( - data_path=path, - dataset_id=DATASET_ID, - table_id=TABLE_ID, - dump_mode=DUMP_MODE, - wait=path, + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", ) - run_dbt = check_to_run_dbt( - dataset_id=DATASET_ID, - table_id=TABLE_ID, - mode=MATERIALIZATION_MODE, - ) - run_dbt.set_upstream(UPLOAD_TABLE) - - with case(run_dbt, True): - # Trigger DBT flow run - with case(MATERIALIZE_AFTER_DUMP, True): - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(TRIGGER_RAIN_DASHBOARD_UPDATE, True): + # Trigger rain dashboard update flow run + rain_dashboard_update_flow = create_flow_run( + flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": DATASET_ID, - "table_id": TABLE_ID, - "mode": MATERIALIZATION_MODE, - "materialize_to_datario": MATERIALIZE_TO_DATARIO, - }, - labels=current_flow_labels, - run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", + parameters=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS.value, # noqa + labels=[ + "rj-escritorio-dev", + ], + run_name="Update rain dashboard data (triggered by precipitacao_cemaden flow)", # noqa ) + rain_dashboard_update_flow.set_upstream(wait_for_materialization) - current_flow_labels.set_upstream(run_dbt) - materialization_flow.set_upstream(current_flow_labels) - - wait_for_materialization = wait_for_flow_run_with_2min_timeout( - flow_run_id=materialization_flow, + wait_for_rain_dashboard_update = wait_for_flow_run( + flow_run_id=rain_dashboard_update_flow, stream_states=True, stream_logs=True, - raise_final_state=True, + raise_final_state=False, ) - last_dbt_update = save_last_dbt_update( - dataset_id=DATASET_ID, - table_id=TABLE_ID, - mode=MATERIALIZATION_MODE, - wait=wait_for_materialization, + # Trigger rain dashboard update last 2h flow run + rain_dashboard_last_2h_update_flow = create_flow_run( + flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters=cemaden_constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS.value, # noqa + labels=[ + "rj-escritorio-dev", + ], + run_name="Update rain dashboard data (triggered by precipitacao_cemaden last 2h flow)", # noqa ) + rain_dashboard_last_2h_update_flow.set_upstream(wait_for_materialization) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + wait_for_rain_dashboard_last_2h_update = wait_for_flow_run( + flow_run_id=rain_dashboard_last_2h_update_flow, + stream_states=True, + stream_logs=True, + raise_final_state=False, ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) - with case(TRIGGER_RAIN_DASHBOARD_UPDATE, True): - # Trigger rain dashboard update flow run - rain_dashboard_update_flow = create_flow_run( - flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS.value, # noqa - labels=[ - "rj-escritorio-dev", - ], - run_name="Update rain dashboard data (triggered by precipitacao_cemaden flow)", # noqa - ) - rain_dashboard_update_flow.set_upstream(wait_for_materialization) - - wait_for_rain_dashboard_update = wait_for_flow_run( - flow_run_id=rain_dashboard_update_flow, - stream_states=True, - stream_logs=True, - raise_final_state=False, - ) - - # Trigger rain dashboard update last 2h flow run - rain_dashboard_last_2h_update_flow = create_flow_run( - flow_name=rain_dashboard_constants.RAIN_DASHBOARD_FLOW_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters=cemaden_constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS.value, # noqa - labels=[ - "rj-escritorio-dev", - ], - run_name="Update rain dashboard data (triggered by precipitacao_cemaden last 2h flow)", # noqa - ) - rain_dashboard_last_2h_update_flow.set_upstream( - wait_for_materialization - ) - - wait_for_rain_dashboard_last_2h_update = wait_for_flow_run( - flow_run_id=rain_dashboard_last_2h_update_flow, - stream_states=True, - stream_logs=True, - raise_final_state=False, - ) - - with case(DUMP_TO_GCS, True): - # Trigger Dump to GCS flow run with project id as datario - dump_to_gcs_flow = create_flow_run( - flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "project_id": "datario", - "dataset_id": DATASET_ID, - "table_id": TABLE_ID, - "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, - }, - labels=[ - "datario", - ], - run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", - ) - dump_to_gcs_flow.set_upstream(wait_for_materialization) - - wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( - flow_run_id=dump_to_gcs_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) # para rodar na cloud cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index 1292bc049..1f2a0f1a2 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -12,7 +12,7 @@ minute_schedule = Schedule( clocks=[ IntervalClock( - interval=timedelta(minutes=1), + interval=timedelta(minutes=5), start_date=datetime(2023, 1, 1, 0, 0, 30), labels=[ constants.RJ_COR_AGENT_LABEL.value, diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index e6a7dd2bf..1e83f12a5 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -9,7 +9,6 @@ import numpy as np import pandas as pd -import pendulum from prefect import task # from prefect import context @@ -19,12 +18,8 @@ parse_date_columns, ) from pipelines.utils.utils import ( - build_redis_key, - compare_dates_between_tables_redis, log, to_partitions, - save_str_on_redis, - save_updated_rows_on_redis, ) @@ -33,9 +28,7 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratar_dados( - dataset_id: str, table_id: str, mode: str = "dev" -) -> Tuple[pd.DataFrame, bool]: +def tratar_dados() -> Tuple[pd.DataFrame, bool]: """ Renomeia colunas e filtra dados com a hora e minuto do timestamp de execução mais próximo à este @@ -54,7 +47,7 @@ def tratar_dados( ] rename_cols = { "idestacao": "id_estacao", - "ultimovalor": "instantaneo_chuva", + "ultimovalor": "acumulado_chuva_10_min", "datahoraUltimovalor": "data_medicao_utc", "acc1hr": "acumulado_chuva_1_h", "acc3hr": "acumulado_chuva_3_h", @@ -91,6 +84,7 @@ def tratar_dados( # Altera valores negativos para None float_cols = [ + "acumulado_chuva_10_min", "acumulado_chuva_1_h", "acumulado_chuva_3_h", "acumulado_chuva_6_h", @@ -106,36 +100,12 @@ def tratar_dados( dados.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) dados.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") - log(f"uniquesss df >>>, {type(dados.id_estacao.unique()[0])}") - dados["id_estacao"] = dados["id_estacao"].astype(str) - - dados = save_updated_rows_on_redis( - dados, - dataset_id, - table_id, - unique_id="id_estacao", - date_column="data_medicao", - date_format=date_format, - mode=mode, - ) - - # If df is empty stop flow on flows.py - empty_data = dados.shape[0] == 0 - log(f"[DEBUG]: dataframe is empty: {empty_data}") - - # Save max date on redis to compare this with last dbt run - if not empty_data: - max_date = str(dados["data_medicao"].max()) - redis_key = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) - log(f"[DEBUG]: dataframe is not empty key: {redis_key} {max_date}") - save_str_on_redis(redis_key, "date", max_date) - # Fixar ordem das colunas dados = dados[ [ - "data_medicao", "id_estacao", - "instantaneo_chuva", + "data_medicao", + "acumulado_chuva_10_min", "acumulado_chuva_1_h", "acumulado_chuva_3_h", "acumulado_chuva_6_h", @@ -147,7 +117,7 @@ def tratar_dados( ] ] - return dados, empty_data + return dados @task @@ -161,7 +131,6 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dados, partition_column) - current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") # Cria partições a partir da data to_partitions( @@ -169,50 +138,6 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: partition_columns=partitions, savepath=prepath, data_type="csv", - suffix=current_time, ) log(f"[DEBUG] Files saved on {prepath}") return prepath - - -@task -def save_last_dbt_update( - dataset_id: str, - table_id: str, - mode: str = "dev", - wait=None, # pylint: disable=unused-argument -) -> None: - """ - Save on dbt last timestamp where it was updated - """ - now = pendulum.now("America/Sao_Paulo").to_datetime_string() - redis_key = build_redis_key(dataset_id, table_id, name="dbt_last_update", mode=mode) - log(f">>>>> debug saving actual date on dbt redis {redis_key} {now}") - save_str_on_redis(redis_key, "date", now) - - -@task(skip_on_upstream_skip=False) -def check_to_run_dbt( - dataset_id: str, - table_id: str, - mode: str = "dev", -) -> bool: - """ - It will run even if its upstream tasks skip. - """ - - key_table_1 = build_redis_key( - dataset_id, table_id, name="dbt_last_update", mode=mode - ) - key_table_2 = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) - - format_date_table_1 = "YYYY-MM-DD HH:mm:SS" - format_date_table_2 = "YYYY-MM-DD HH:mm:SS" - - # Returns true if date saved on table_2 (cemaden) is bigger than - # the date saved on table_1 (dbt). - run_dbt = compare_dates_between_tables_redis( - key_table_1, format_date_table_1, key_table_2, format_date_table_2 - ) - log(f">>>> debug data cemaden > data dbt: {run_dbt}") - return run_dbt From b7064de40f44bece83156d8a80ab3dda94cd10db Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 29 Aug 2023 15:58:58 -0300 Subject: [PATCH 19/32] =?UTF-8?q?salvando=20=C3=BAltimos=20dados=20no=20re?= =?UTF-8?q?dis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../precipitacao_alertario/schedules.py | 4 +- .../precipitacao_cemaden/flows.py | 6 ++- .../precipitacao_cemaden/schedules.py | 2 +- .../precipitacao_cemaden/tasks.py | 41 ++++++++++++++++++- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py index c296d3922..84d3768e2 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py @@ -12,8 +12,8 @@ minute_schedule = Schedule( clocks=[ IntervalClock( - interval=timedelta(minutes=1), - start_date=datetime(2021, 1, 1, 0, 0, 30), + interval=timedelta(minutes=5), + start_date=datetime(2021, 1, 1, 0, 1, 0), labels=[ constants.RJ_COR_AGENT_LABEL.value, ], diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index e8e573e56..d4f8f0810 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -71,7 +71,11 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - dados = tratar_dados() + dados = tratar_dados( + dataset_id=DATASET_ID, + table_id=TABLE_ID, + mode=MATERIALIZATION_MODE, + ) path = salvar_dados(dados=dados) # Create table in BigQuery diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index 1f2a0f1a2..f3e2c8a77 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -13,7 +13,7 @@ clocks=[ IntervalClock( interval=timedelta(minutes=5), - start_date=datetime(2023, 1, 1, 0, 0, 30), + start_date=datetime(2023, 1, 1, 0, 1, 0), labels=[ constants.RJ_COR_AGENT_LABEL.value, ], diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 1e83f12a5..686692d7c 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -9,7 +9,10 @@ import numpy as np import pandas as pd +import pendulum from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Skipped # from prefect import context @@ -20,6 +23,7 @@ from pipelines.utils.utils import ( log, to_partitions, + save_updated_rows_on_redis, ) @@ -28,7 +32,9 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratar_dados() -> Tuple[pd.DataFrame, bool]: +def tratar_dados( + dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, bool]: """ Renomeia colunas e filtra dados com a hora e minuto do timestamp de execução mais próximo à este @@ -80,7 +86,7 @@ def tratar_dados() -> Tuple[pd.DataFrame, bool]: log(f"DEBUG: data {dados[see_cols]}") # Alterando valores '-' e np.nan para NULL - dados.replace(["-", np.nan], [None, None], inplace=True) + dados.replace(["-", np.nan], [0, None], inplace=True) # Altera valores negativos para None float_cols = [ @@ -100,6 +106,35 @@ def tratar_dados() -> Tuple[pd.DataFrame, bool]: dados.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) dados.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + # Ajustando dados da meia-noite que vem sem o horário + for index, row in dados.iterrows(): + try: + date = pd.to_datetime(row["data_medicao"], format="%Y-%m-%d %H:%M:%S") + except ValueError: + date = pd.to_datetime(row["data_medicao"]) + pd.DateOffset(hours=0) + + dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S") + + log(f"Dataframe before comparing with last data saved on redis {dados.head()}") + + dados = save_updated_rows_on_redis( + dados, + dataset_id, + table_id, + unique_id="id_estacao", + date_column="data_medicao", + date_format=date_format, + mode=mode, + ) + + log(f"Dataframe after comparing with last data saved on redis {dados.head()}") + + # If df is empty stop flow + if dados.shape[0] == 0: + skip_text = "No new data available on API" + log(skip_text) + raise ENDRUN(state=Skipped(skip_text)) + # Fixar ordem das colunas dados = dados[ [ @@ -131,6 +166,7 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dados, partition_column) + current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") # Cria partições a partir da data to_partitions( @@ -138,6 +174,7 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: partition_columns=partitions, savepath=prepath, data_type="csv", + suffix=current_time, ) log(f"[DEBUG] Files saved on {prepath}") return prepath From 860699400ef6c4f5e43ec721b56b496c2281b7f2 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 31 Aug 2023 12:05:11 -0300 Subject: [PATCH 20/32] ajustando quando nunca foi salvo dado no redis --- pipelines/utils/utils.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index efc21c133..73474d4a7 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -999,18 +999,27 @@ def save_updated_rows_on_redis( # pylint: disable=R0914 # Access all data saved on redis with this key last_updates = redis_client.hgetall(key) - # Convert data in dictionary in format with unique_id in key and last updated time as value - # Example > {"12": "2022-06-06 14:45:00"} - last_updates = { - k.decode("utf-8"): v.decode("utf-8") for k, v in last_updates.items() - } + if len(last_updates) == 0: + last_updates = pd.DataFrame(dataframe[unique_id].unique(), columns=[unique_id]) + last_updates["last_update"] = "1900-01-01 00:00:00" + log(f"Redis key: {key}\nCreating Redis fake values:\n {last_updates}") + else: + # Convert data in dictionary in format with unique_id in key and last updated time as value + # Example > {"12": "2022-06-06 14:45:00"} + last_updates = { + k.decode("utf-8"): v.decode("utf-8") for k, v in last_updates.items() + } - # Convert dictionary to dataframe - last_updates = pd.DataFrame( - last_updates.items(), columns=[unique_id, "last_update"] - ) + # Convert dictionary to dataframe + last_updates = pd.DataFrame( + last_updates.items(), columns=[unique_id, "last_update"] + ) + + log(f"Redis key: {key}\nRedis actual values:\n {last_updates}") - log(f"Redis key: {key}\nRedis actual values:\n {last_updates}") + # Garante that both are string + dataframe[unique_id] = dataframe[unique_id].astype(str) + last_updates[unique_id] = last_updates[unique_id].astype(str) # dataframe and last_updates need to have the same index, in our case unique_id missing_in_dfr = [ @@ -1026,11 +1035,8 @@ def save_updated_rows_on_redis( # pylint: disable=R0914 # If unique_id doesn't exists on updates we create a fake date for this station on updates if len(missing_in_updates) > 0: - for i in missing_in_updates: - last_updates = last_updates.append( - {unique_id: i, "last_update": "1900-01-01 00:00:00"}, - ignore_index=True, - ) + for i, _id in enumerate(missing_in_updates): + last_updates.loc[-i] = [_id, "1900-01-01 00:00:00"] # If unique_id doesn't exists on dataframe we remove this stations from last_updates if len(missing_in_dfr) > 0: From 129f9ac7bbb29e3bc642a7951cf5adca11fe89c8 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 10 Oct 2023 16:35:38 -0300 Subject: [PATCH 21/32] =?UTF-8?q?remo=C3=A7=C3=A3o=20do=20slice=5Fdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../meteorologia_redemet/flows.py | 1 - .../meteorologia_redemet/tasks.py | 21 +------------------ 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index fb731877f..95cf5f029 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -14,7 +14,6 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import ( get_dates, - # slice_data, download, tratar_dados, salvar_dados, diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 3aebecd6b..7c6c56c88 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -40,18 +40,6 @@ def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: return data_inicio, data_fim, backfill -@task() -def slice_data(current_time: str) -> str: - """ - Retorna a data e hora do timestamp de execução - """ - if not isinstance(current_time, str): - current_time = current_time.to_datetime_string() - - data = current_time[:10] - return data - - @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), @@ -110,14 +98,7 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: Renomeia colunas e filtra dados com a hora do timestamp de execução """ - drop_cols = [ - "nome", - "cidade", - "lon", - "lat", - "localizacao", - "tempoImagem", - ] + drop_cols = ["nome", "cidade", "lon", "lat", "localizacao", "tempoImagem", "metar"] # Checa se todas estão no df drop_cols = [c for c in drop_cols if c in dados.columns] From 91f67e4b5fccb6a5c791f9bdf666781fec7335cd Mon Sep 17 00:00:00 2001 From: Karina Pereira Passos Date: Mon, 16 Oct 2023 16:53:23 -0300 Subject: [PATCH 22/32] testando --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 7c6c56c88..e06285f14 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -18,6 +18,7 @@ parse_date_columns, ) +print("hi") @task(nout=3) def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: From 1263f3b1cdf25c37e59f94c59e05ef11ac7663a6 Mon Sep 17 00:00:00 2001 From: Karina Pereira Passos Date: Tue, 17 Oct 2023 13:48:54 -0300 Subject: [PATCH 23/32] testando --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index e06285f14..184ecdc00 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -18,8 +18,6 @@ parse_date_columns, ) -print("hi") - @task(nout=3) def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: """ @@ -27,7 +25,7 @@ def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: Se nenhuma data foi passada a data_inicio corresponde a ontem e data_fim a hoje e não estamos fazendo backfill. Caso contrário, retorna as datas inputadas mos parâmetros do flow. - """ + """ # a API sempre retorna o dado em UTC log(f"data de inicio e fim antes do if {data_inicio} {data_fim}") if data_inicio == "": From 77b8a967dcb26e2cd58d57d6aded9675092b89c1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Oct 2023 16:51:06 +0000 Subject: [PATCH 24/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 184ecdc00..7c6c56c88 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -18,6 +18,7 @@ parse_date_columns, ) + @task(nout=3) def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: """ @@ -25,7 +26,7 @@ def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: Se nenhuma data foi passada a data_inicio corresponde a ontem e data_fim a hoje e não estamos fazendo backfill. Caso contrário, retorna as datas inputadas mos parâmetros do flow. - """ + """ # a API sempre retorna o dado em UTC log(f"data de inicio e fim antes do if {data_inicio} {data_fim}") if data_inicio == "": From bab07fa39d5b3c7a7ea9465950c4a2443ab6aa7b Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Thu, 19 Oct 2023 13:30:15 +0000 Subject: [PATCH 25/32] =?UTF-8?q?Capitalizando=20os=20dados=20da=20coluna?= =?UTF-8?q?=20c=C3=A9u?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 7c6c56c88..83822590e 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -13,8 +13,10 @@ import requests from pipelines.constants import constants -from pipelines.utils.utils import get_vault_secret, log, to_partitions -from pipelines.rj_cor.meteorologia.precipitacao_alertario.utils import ( +from pipelines.utils.utils import ( + get_vault_secret, + log, + to_partitions, parse_date_columns, ) @@ -169,6 +171,9 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: # Remover fuso horário dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") + # Capitalizar os dados da coluna céu + dados["ceu"] = dados["ceu"].capitalize() + return dados From 77310fca04dc5a9837c051b0d8b32fee73606799 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Thu, 19 Oct 2023 13:33:04 +0000 Subject: [PATCH 26/32] =?UTF-8?q?Capitalizando=20os=20dados=20da=20coluna?= =?UTF-8?q?=20c=C3=A9u?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 83822590e..0af0bd172 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -13,12 +13,7 @@ import requests from pipelines.constants import constants -from pipelines.utils.utils import ( - get_vault_secret, - log, - to_partitions, - parse_date_columns, -) +from pipelines.utils.utils import get_vault_secret, log, to_partitions, parse_date_columns @task(nout=3) From 055056de055f7db431573041eaf09ae231687101 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Oct 2023 13:34:14 +0000 Subject: [PATCH 27/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 0af0bd172..83822590e 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -13,7 +13,12 @@ import requests from pipelines.constants import constants -from pipelines.utils.utils import get_vault_secret, log, to_partitions, parse_date_columns +from pipelines.utils.utils import ( + get_vault_secret, + log, + to_partitions, + parse_date_columns, +) @task(nout=3) From 6e555082b57fb57b865855929270e9297ca22cf8 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Thu, 19 Oct 2023 14:02:52 +0000 Subject: [PATCH 28/32] =?UTF-8?q?Capitalizando=20os=20dados=20da=20coluna?= =?UTF-8?q?=20c=C3=A9u?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 0af0bd172..bd67f5dce 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -13,7 +13,12 @@ import requests from pipelines.constants import constants -from pipelines.utils.utils import get_vault_secret, log, to_partitions, parse_date_columns +from pipelines.utils.utils import ( + get_vault_secret, + log, + to_partitions, + parse_date_columns, +) @task(nout=3) @@ -167,7 +172,7 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") # Capitalizar os dados da coluna céu - dados["ceu"] = dados["ceu"].capitalize() + dados["ceu"] = dados["ceu"].str.capitalize() return dados From abd6d28396ee11863fc41298bfbc8dd255ed95d6 Mon Sep 17 00:00:00 2001 From: Patricia Bongiovanni Catandi <62657143+patriciacatandi@users.noreply.github.com> Date: Thu, 19 Oct 2023 16:01:58 -0300 Subject: [PATCH 29/32] Changing column name tasks.py --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index bd67f5dce..d3be95a54 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -170,7 +170,8 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: # Remover fuso horário dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") - + dados.rename(columns={"data": "data_medicao"}, inplace=True) + # Capitalizar os dados da coluna céu dados["ceu"] = dados["ceu"].str.capitalize() From df202119b11326faf98f8945f4c7dc8fa0ad347e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:02:13 +0000 Subject: [PATCH 30/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index d3be95a54..0ddcf3724 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -171,7 +171,7 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: # Remover fuso horário dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") dados.rename(columns={"data": "data_medicao"}, inplace=True) - + # Capitalizar os dados da coluna céu dados["ceu"] = dados["ceu"].str.capitalize() From c760b109f895025c98fe484a59c1b05469b61aac Mon Sep 17 00:00:00 2001 From: Patricia Bongiovanni Catandi <62657143+patriciacatandi@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:21:49 -0300 Subject: [PATCH 31/32] bugfix --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 0ddcf3724..caa89d80d 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -184,10 +184,10 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: Salvar dados em csv """ - prepath = Path("/tmp/meteorologia_redemet/") + prepath = Path("/tmp/meteorologia_redemeta prepath.mkdir(parents=True, exist_ok=True) - partition_column = "data" + partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dados, partition_column) # Cria partições a partir da data From fa51bc3ce524c7b3a70e88c2bfcb31c45cc29b32 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 20 Oct 2023 15:54:35 +0000 Subject: [PATCH 32/32] bugfix --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index caa89d80d..90f6dcbc2 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -184,7 +184,7 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: Salvar dados em csv """ - prepath = Path("/tmp/meteorologia_redemeta + prepath = Path("/tmp/meteorologia_redemet/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao"