diff --git a/.github/workflows/cd_staging.yaml b/.github/workflows/cd_staging.yaml index 090e88d27..fb7307a50 100644 --- a/.github/workflows/cd_staging.yaml +++ b/.github/workflows/cd_staging.yaml @@ -77,7 +77,7 @@ jobs: echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml - name: Wait for Docker image to be available - uses: lewagon/wait-on-check-action@v1.1.2 + uses: lewagon/wait-on-check-action@v1.3.1 with: ref: ${{ github.event.pull_request.head.sha || github.sha }} check-name: 'Build Docker image' diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 95cf5f029..485c0cfa9 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -13,12 +13,18 @@ from pipelines.constants import constants from pipelines.utils.constants import constants as utils_constants from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import ( + check_for_new_stations, get_dates, - download, - tratar_dados, - salvar_dados, + download_data, + download_stations_data, + treat_data, + treat_stations_data, + save_data, +) +from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import ( + hour_schedule, + month_schedule, ) -from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule from pipelines.utils.decorators import Flow from pipelines.utils.dump_db.constants import constants as dump_db_constants from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants @@ -27,21 +33,21 @@ get_current_flow_labels, ) - with Flow( name="COR: Meteorologia - Meteorologia REDEMET", code_owners=[ - "richardg867", "paty", ], ) as cor_meteorologia_meteorologia_redemet: - DATASET_ID = "clima_estacao_meteorologica" - TABLE_ID = "meteorologia_redemet" - DUMP_MODE = "append" + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID = Parameter( + "dataset_id", default="clima_estacao_meteorologica", required=True + ) + TABLE_ID = Parameter("table_id", default="meteorologia_redemet", required=True) - # data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD" - data_inicio = Parameter("data_inicio", default="", required=False) - data_fim = Parameter("data_fim", default="", required=False) + # first_date and last_date must be strings as "YYYY-MM-DD" + first_date = Parameter("first_date", default=None, required=False) + last_date = Parameter("last_date", default=None, required=False) # Materialization parameters MATERIALIZE_AFTER_DUMP = Parameter( @@ -61,11 +67,11 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) + first_date_, last_date_, backfill = get_dates(first_date, last_date) # data = slice_data(current_time=CURRENT_TIME) - dados = download(data_inicio_, data_fim_) - dados = tratar_dados(dados, backfill) - PATH = salvar_dados(dados=dados) + dataframe = download_data(first_date_, last_date_) + dataframe = treat_data(dataframe, backfill) + PATH = save_data(dataframe=dataframe) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( @@ -134,10 +140,123 @@ ) -# para rodar na cloud cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_COR_AGENT_LABEL.value], ) cor_meteorologia_meteorologia_redemet.schedule = hour_schedule + + +with Flow( + name="COR: Meteorologia REDEMET - Atualização das estações", + code_owners=[ + "karinappassos", + "paty", + ], +) as cor_meteorologia_meteorologia_redemet_estacoes: + DUMP_MODE = Parameter("dump_mode", default="overwrite", required=True) + DATASET_ID = Parameter( + "dataset_id", default="clima_estacao_meteorologica", required=True + ) + TABLE_ID = Parameter("table_id", default="estacoes_redemet", required=True) + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=False, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=False, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataframe = download_stations_data() + dataframe = treat_stations_data(dataframe) + path = save_data(dataframe=dataframe, partition_column="data_atualizacao") + + # Create table in BigQuery + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=path, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + dump_mode=DUMP_MODE, + wait=path, + ) + + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", + ) + + materialization_flow.set_upstream(UPLOAD_TABLE) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run( + dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + check_for_new_stations(dataframe, wait=UPLOAD_TABLE) + +# para rodar na cloud +cor_meteorologia_meteorologia_redemet_estacoes.storage = GCS( + constants.GCS_FLOWS_BUCKET.value +) +cor_meteorologia_meteorologia_redemet_estacoes.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +cor_meteorologia_meteorologia_redemet_estacoes.schedule = month_schedule diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py index e180adc28..7e4b288dd 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -11,11 +11,42 @@ hour_schedule = Schedule( clocks=[ IntervalClock( - interval=timedelta(hours=1), + interval=timedelta(days=30), start_date=datetime(2023, 1, 1, 0, 12, 0), labels=[ constants.RJ_COR_AGENT_LABEL.value, ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id": "clima_estacao_meteorologica", + "table_id": "meteorologia_redemet", + }, + ), + ] +) + +month_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=30), + start_date=datetime(2023, 1, 1, 0, 12, 0), + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], + parameter_defaults={ + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + # "dump_mode": "overwrite", + # "dataset_id": "clima_estacao_meteorologica", + # "table_id": "estacoes_redemet", + }, ), ] ) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 90f6dcbc2..7bb7801a8 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -6,10 +6,13 @@ import json from pathlib import Path from typing import Tuple, Union +from unidecode import unidecode import pandas as pd import pendulum from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Failed import requests from pipelines.constants import constants @@ -22,37 +25,36 @@ @task(nout=3) -def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: +def get_dates(first_date: str, last_date: str) -> Tuple[str, str]: """ - Task para obter o dia de início e o de fim. - Se nenhuma data foi passada a data_inicio corresponde a ontem - e data_fim a hoje e não estamos fazendo backfill. - Caso contrário, retorna as datas inputadas mos parâmetros do flow. + Task to get first and last date. + If none date is passed on parameters or we are not doing a backfill + the first_date will be yesterday and last_date will be today. + Otherwise, this function will return date inputed on flow's parameters. """ # a API sempre retorna o dado em UTC - log(f"data de inicio e fim antes do if {data_inicio} {data_fim}") - if data_inicio == "": - data_fim = pendulum.now("UTC").format("YYYY-MM-DD") - data_inicio = pendulum.yesterday("UTC").format("YYYY-MM-DD") - backfill = 0 - else: + if first_date: backfill = 1 - log(f"data de inicio e fim dps do if {data_inicio} {data_fim}") + else: + last_date = pendulum.now("UTC").format("YYYY-MM-DD") + first_date = pendulum.yesterday("UTC").format("YYYY-MM-DD") + backfill = 0 + log(f"Selected first_date as: {first_date} and last_date as: {last_date}") - return data_inicio, data_fim, backfill + return first_date, last_date, backfill @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def download(data_inicio: str, data_fim: str) -> pd.DataFrame: +def download_data(first_date: str, last_date: str) -> pd.DataFrame: """ - Faz o request na data especificada e retorna dados + Request data from especified date range """ - # Lista com as estações da cidade do Rio de Janeiro - estacoes_unicas = [ + # Stations inside Rio de Janeiro city + rj_stations = [ "SBAF", "SBGL", "SBJR", @@ -60,90 +62,87 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: "SBSC", ] - dicionario = get_vault_secret("redemet-token") + redemet_token = get_vault_secret("redemet-token") + redemet_token = redemet_token["data"]["token"] # Converte datas em int para cálculo de faixas. - data_inicio_int = int(data_inicio.replace("-", "")) - data_fim_int = int(data_fim.replace("-", "")) + first_date_int = int(first_date.replace("-", "")) + last_date_int = int(last_date.replace("-", "")) raw = [] - for id_estacao in estacoes_unicas: - base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa - for data in range(data_inicio_int, data_fim_int + 1): + for id_estacao in rj_stations: + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={redemet_token}" # noqa + for data in range(first_date_int, last_date_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" res = requests.get(url) if res.status_code != 200: - log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") + log(f"Problema no id: {id_estacao}, {res.status_code}") continue res_data = json.loads(res.text) if res_data["status"] is not True: - log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") + log(f"Problema no id: {id_estacao}, {res_data['message']}") continue if "data" not in res_data["data"]: - # Sem dados para esse horario + # Sem dataframe para esse horario continue raw.append(res_data) - # Extrai objetos de dados + # Extrai objetos de dataframe raw = [res_data["data"] for res_data in raw] - # converte para dados - dados = pd.DataFrame(raw) + # converte para dataframe + dataframe = pd.DataFrame(raw) - return dados + return dataframe @task -def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: +def treat_data(dataframe: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: """ - Renomeia colunas e filtra dados com a hora do timestamp de execução + Rename cols, convert timestamp, filter data for the actual date """ drop_cols = ["nome", "cidade", "lon", "lat", "localizacao", "tempoImagem", "metar"] - # Checa se todas estão no df - drop_cols = [c for c in drop_cols if c in dados.columns] + # Check if all columns are on the dataframe + drop_cols = [c for c in drop_cols if c in dataframe.columns] - # Remove colunas que já temos os dados em outras tabelas - dados = dados.drop(drop_cols, axis=1) + # Remove columns that are already in another table + dataframe = dataframe.drop(drop_cols, axis=1) - # Adequando nome das variáveis rename_cols = { "localidade": "id_estacao", "ur": "umidade", } - dados = dados.rename(columns=rename_cols) + dataframe = dataframe.rename(columns=rename_cols) - # Converte horário de UTC para America/Sao Paulo + # Convert UTC time to America/Sao Paulo formato = "DD/MM/YYYY HH:mm(z)" - dados["data"] = dados["data"].apply( + dataframe["data"] = dataframe["data"].apply( lambda x: pendulum.from_format(x, formato) .in_tz("America/Sao_Paulo") .format(formato) ) - # Ordenamento de variáveis - chaves_primarias = ["id_estacao", "data"] - demais_cols = [c for c in dados.columns if c not in chaves_primarias] + # Order variables + primary_keys = ["id_estacao", "data"] + other_cols = [c for c in dataframe.columns if c not in primary_keys] - dados = dados[chaves_primarias + demais_cols] + dataframe = dataframe[primary_keys + other_cols] - # Converte variáveis que deveriam ser int para int - dados["temperatura"] = dados["temperatura"].apply( + # Clean data + dataframe["temperatura"] = dataframe["temperatura"].apply( lambda x: None if x[:-2] == "NIL" else int(x[:-2]) ) - dados["umidade"] = dados["umidade"].apply( + dataframe["umidade"] = dataframe["umidade"].apply( lambda x: None if "%" not in x else int(x[:-1]) ) - dados["data"] = pd.to_datetime(dados.data, format="%d/%m/%Y %H:%M(%Z)") - - # Pegar o dia no nosso timezone como partição - br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD") + dataframe["data"] = pd.to_datetime(dataframe.data, format="%d/%m/%Y %H:%M(%Z)") # Define colunas que serão salvas - dados = dados[ + dataframe = dataframe[ [ "id_estacao", "data", @@ -156,39 +155,40 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] - # Remover dados duplicados - dados = dados.drop_duplicates(subset=["id_estacao", "data"]) + dataframe = dataframe.drop_duplicates(subset=["id_estacao", "data"]) - log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") + log(f"Dados antes do filtro dia:\n{dataframe[['id_estacao', 'data']]}") if not backfill: - # Seleciona apenas dados daquele dia (devido à UTC) - dados = dados[dados["data"].dt.date.astype(str) == br_timezone] + # Select our date + br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD") + + # Select only data from that date + dataframe = dataframe[dataframe["data"].dt.date.astype(str) == br_timezone] - log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}") - log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") + log(f">>>> min hora {dataframe[~dataframe.temperatura.isna()].data.min()}") + log(f">>>> max hora {dataframe[~dataframe.temperatura.isna()].data.max()}") - # Remover fuso horário - dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") - dados.rename(columns={"data": "data_medicao"}, inplace=True) + dataframe["data"] = dataframe["data"].dt.strftime("%Y-%m-%d %H:%M:%S") + dataframe.rename(columns={"data": "data_medicao"}, inplace=True) - # Capitalizar os dados da coluna céu - dados["ceu"] = dados["ceu"].str.capitalize() + dataframe["ceu"] = dataframe["ceu"].str.capitalize() - return dados + return dataframe @task -def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: +def save_data( + dataframe: pd.DataFrame, partition_column: str = "data_medicao" +) -> Union[str, Path]: """ - Salvar dados em csv + Salve dataframe as a csv file """ prepath = Path("/tmp/meteorologia_redemet/") prepath.mkdir(parents=True, exist_ok=True) - partition_column = "data_medicao" - dataframe, partitions = parse_date_columns(dados, partition_column) + dataframe, partitions = parse_date_columns(dataframe, partition_column) # Cria partições a partir da data to_partitions( @@ -199,3 +199,88 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: ) log(f"[DEBUG] Files saved on {prepath}") return prepath + + +@task +def download_stations_data() -> pd.DataFrame: + """ + Download station information + """ + + redemet_token = get_vault_secret("redemet-token") + redemet_token = redemet_token["data"]["token"] + base_url = ( + f"https://api-redemet.decea.mil.br/aerodromos/?api_key={redemet_token}" # noqa + ) + url = f"{base_url}&pais=Brasil" + res = requests.get(url) + if res.status_code != 200: + print(f"Problem on request: {res.status_code}") + + res_data = json.loads(res.text) + log(f"API Return: {res_data}") + + dataframe = pd.DataFrame(res_data["data"]) + log(f"Stations dataframe: {dataframe.head()}") + + return dataframe + + +@task +def treat_stations_data(dataframe: pd.DataFrame) -> pd.DataFrame: + """ + Treat station data + """ + rename_cols = { + "lat_dec": "latitude", + "lon_dec": "longitude", + "nome": "estacao", + "altitude_metros": "altitude", + "cod": "id_estacao", + } + dataframe = dataframe.rename(rename_cols, axis=1) + + dataframe = dataframe[dataframe.cidade.str.contains("Rio de Janeiro")] + + dataframe["estacao"] = dataframe["estacao"].apply(unidecode) + dataframe["data_atualizacao"] = pendulum.now(tz="America/Sao_Paulo").format( + "YYYY-MM-DD" + ) + + keep_cols = [ + "id_estacao", + "estacao", + "latitude", + "longitude", + "altitude", + "data_atualizacao", + ] + return dataframe[keep_cols] + + +@task +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +): + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + change treat_data task. + """ + + stations_before = [ + "SBAF", + "SBGL", + "SBJR", + "SBRJ", + "SBSC", + ] + new_stations = [ + i for i in dataframe.id_estacao.unique() if i not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update REDEMET\ + flow and add station(s) {new_stations}" + log(message) + raise ENDRUN(state=Failed(message)) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py index bd7dbb9d0..3f5feff92 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py @@ -118,17 +118,21 @@ def tratar_dados( mode=mode, ) - # Ajustando dados da meia-noite que vem sem o horário - for index, row in dados.iterrows(): - try: - date = pd.to_datetime(row["data_medicao"], format="%Y-%m-%d %H:%M:%S") - except ValueError: - date = pd.to_datetime(row["data_medicao"]) + pd.DateOffset(hours=0) - - dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S") - - # dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S") - log(f"Dataframe after comparing with last data saved on redis {dados.head()}") + # # Ajustando dados da meia-noite que vem sem o horário + # for index, row in dados.iterrows(): + # try: + # date = pd.to_datetime(row["data_medicao"], format="%Y-%m-%d %H:%M:%S") + # except ValueError: + # date = pd.to_datetime(row["data_medicao"]) + pd.DateOffset(hours=0) + + # dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S") + + see_cols = ["id_estacao", "data_medicao", "last_update"] + log( + f"Dataframe after comparing with last data saved on redis {dados[see_cols].head()}" + ) + dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S") + log(f"Dataframe after converting to string {dados[see_cols].head()}") if dados.shape[0] > 0: log(f"Dataframe after comparing with last data saved on redis {dados.iloc[0]}") @@ -143,7 +147,7 @@ def tratar_dados( save_str_on_redis(redis_key, "date", max_date) else: # If df is empty stop flow on flows.py - log(f"Dataframe is empty. Skipping update flow for datetime {date}.") + log("Dataframe is empty. Skipping update flow.") # Fixar ordem das colunas dados = dados[ diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index d4f8f0810..d4631c61d 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -17,8 +17,10 @@ constants as cemaden_constants, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks import ( - tratar_dados, - salvar_dados, + check_for_new_stations, + download_data, + treat_data, + save_data, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.schedules import ( minute_schedule, @@ -41,15 +43,13 @@ with Flow( name="COR: Meteorologia - Precipitacao CEMADEN", code_owners=[ - "richardg867", "paty", ], # skip_if_running=True, ) as cor_meteorologia_precipitacao_cemaden: - DATASET_ID = "clima_pluviometro" - TABLE_ID = "taxa_precipitacao_cemaden" - DUMP_MODE = "append" - + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID = Parameter("dataset_id", default="clima_pluviometro", required=True) + TABLE_ID = Parameter("table_id", default="taxa_precipitacao_cemaden", required=True) # Materialization parameters MATERIALIZE_AFTER_DUMP = Parameter( "materialize_after_dump", default=False, required=False @@ -71,12 +71,14 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - dados = tratar_dados( + dataframe = download_data() + dataframe = treat_data( + dataframe=dataframe, dataset_id=DATASET_ID, table_id=TABLE_ID, mode=MATERIALIZATION_MODE, ) - path = salvar_dados(dados=dados) + path = save_data(dataframe=dataframe) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( @@ -182,6 +184,8 @@ raise_final_state=True, ) + check_for_new_stations(dataframe, wait=UPLOAD_TABLE) + # para rodar na cloud cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_precipitacao_cemaden.run_config = KubernetesRun( diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index f3e2c8a77..6b56ffe92 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -19,11 +19,13 @@ ], parameter_defaults={ # "trigger_rain_dashboard_update": True, - # "materialize_after_dump": True, - "materialize_after_dump": False, - # "mode": "prod", - "materialize_to_datario": False, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id": "clima_pluviometro", + "table_id": "taxa_precipitacao_cemaden", }, ), ] diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index b49c1afb0..e0e1835b3 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -12,10 +12,7 @@ import pendulum from prefect import task from prefect.engine.signals import ENDRUN -from prefect.engine.state import Skipped - -# from prefect import context - +from prefect.engine.state import Skipped, Failed from pipelines.constants import constants from pipelines.utils.utils import ( log, @@ -30,16 +27,27 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratar_dados( - dataset_id: str, table_id: str, mode: str = "dev" -) -> Tuple[pd.DataFrame, bool]: +def download_data() -> pd.DataFrame: """ - Renomeia colunas e filtra dados com a hora e minuto do timestamp - de execução mais próximo à este + Download data from API """ url = "http://sjc.salvar.cemaden.gov.br/resources/graficos/interativo/getJson2.php?uf=RJ" - dados = pd.read_json(url) + dataframe = pd.read_json(url) + return dataframe + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def treat_data( + dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, bool]: + """ + Rename cols and filter data using hour and minute from the nearest current timestamp + """ drop_cols = [ "uf", @@ -63,34 +71,34 @@ def tratar_dados( "acc96hr": "acumulado_chuva_96_h", } - dados = ( - dados[(dados["codibge"] == 3304557) & (dados["tipoestacao"] == 1)] + dataframe = ( + dataframe[(dataframe["codibge"] == 3304557) & (dataframe["tipoestacao"] == 1)] .drop(drop_cols, axis=1) .rename(rename_cols, axis=1) ) - log(f"\n[DEBUG]: df.head() {dados.head()}") + log(f"\n[DEBUG]: df.head() {dataframe.head()}") - # Converte de UTC para horário São Paulo - dados["data_medicao_utc"] = pd.to_datetime( - dados["data_medicao_utc"], dayfirst=True + # Convert from UTC to São Paulo timezone + dataframe["data_medicao_utc"] = pd.to_datetime( + dataframe["data_medicao_utc"], dayfirst=True ) + pd.DateOffset(hours=0) - dados["data_medicao"] = ( - dados["data_medicao_utc"] + dataframe["data_medicao"] = ( + dataframe["data_medicao_utc"] .dt.tz_localize("UTC") .dt.tz_convert("America/Sao_Paulo") ) see_cols = ["data_medicao_utc", "data_medicao", "id_estacao", "acumulado_chuva_1_h"] - log(f"DEBUG: data utc -> GMT-3 {dados[see_cols]}") + log(f"DEBUG: data utc -> GMT-3 {dataframe[see_cols]}") date_format = "%Y-%m-%d %H:%M:%S" - dados["data_medicao"] = dados["data_medicao"].dt.strftime(date_format) + dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format) - log(f"DEBUG: data {dados[see_cols]}") + log(f"DEBUG: data {dataframe[see_cols]}") - # Alterando valores '-' e np.nan para NULL - dados.replace(["-", np.nan], [0, None], inplace=True) + # Change values '-' and np.nan to NULL + dataframe.replace(["-", np.nan], [0, None], inplace=True) - # Altera valores negativos para None + # Change negative values to None float_cols = [ "acumulado_chuva_10_min", "acumulado_chuva_1_h", @@ -102,16 +110,18 @@ def tratar_dados( "acumulado_chuva_72_h", "acumulado_chuva_96_h", ] - dados[float_cols] = np.where(dados[float_cols] < 0, None, dados[float_cols]) + dataframe[float_cols] = np.where( + dataframe[float_cols] < 0, None, dataframe[float_cols] + ) - # Elimina linhas em que o id_estacao é igual mantendo a de menor valor nas colunas float - dados.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) - dados.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + # Eliminate where the id_estacao is the same keeping the smallest one + dataframe.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) + dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") - log(f"Dataframe before comparing with last data saved on redis {dados.head()}") + log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}") - dados = save_updated_rows_on_redis( - dados, + dataframe = save_updated_rows_on_redis( + dataframe, dataset_id, table_id, unique_id="id_estacao", @@ -120,16 +130,16 @@ def tratar_dados( mode=mode, ) - log(f"Dataframe after comparing with last data saved on redis {dados.head()}") + log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}") # If df is empty stop flow - if dados.shape[0] == 0: + if dataframe.shape[0] == 0: skip_text = "No new data available on API" log(skip_text) raise ENDRUN(state=Skipped(skip_text)) - # Fixar ordem das colunas - dados = dados[ + # Fix columns order + dataframe = dataframe[ [ "id_estacao", "data_medicao", @@ -145,23 +155,22 @@ def tratar_dados( ] ] - return dados + return dataframe @task -def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: +def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: """ - Salvar dados tratados em csv para conseguir subir pro GCP + Save data on a csv file to be uploaded to GCP """ prepath = Path("/tmp/precipitacao_cemaden/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" - dataframe, partitions = parse_date_columns(dados, partition_column) + dataframe, partitions = parse_date_columns(dataframe, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") - # Cria partições a partir da data to_partitions( data=dataframe, partition_columns=partitions, @@ -171,3 +180,53 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: ) log(f"[DEBUG] Files saved on {prepath}") return prepath + + +@task +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +) -> None: + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + add this new station on estacoes_cemaden. + I can't automatically update this new station, because + I couldn't find a url that gives me the lat and lon for + all the stations. + """ + + stations_before = [ + "3043", + "3044", + "3045", + "3114", + "3215", + "7593", + "7594", + "7595", + "7596", + "7597", + "7599", + "7600", + "7601", + "7602", + "7603", + "7606", + "7609", + "7610", + "7611", + "7612", + "7613", + "7614", + "7615", + ] + new_stations = [ + i for i in dataframe.id_estacao.unique() if str(i) not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update CEMADEN\ + estacoes_cemaden adding station(s) {new_stations}: \ + {dataframe[dataframe.id_estacao.isin(new_stations)]} " + log(message) + raise ENDRUN(state=Failed(message))