From e3d068c7833817158e919cebd23e38bfdc31a486 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Fri, 27 Oct 2023 13:22:09 +0000 Subject: [PATCH 01/42] =?UTF-8?q?teste=20atualiza=C3=A7=C3=A3o=20esta?= =?UTF-8?q?=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../meteorologia_redemet/flows.py | 49 ++++++++++++++ .../meteorologia_redemet/tasks.py | 66 +++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 95cf5f029..4b1cedaea 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -134,6 +134,55 @@ ) +# Código não finalizado; flow de atualização das estações +with Flow( + name="COR: Meteorologia - Atualização das estações", + code_owners=[ + "karinappassos", + "paty", + ], +) as cor_meteorologia_meteorologia_redemet: + DATASET_ID = "clima_estacao_meteorologica" + TABLE_ID = "meteorologia_redemet" + DUMP_MODE = "append" + + # data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD" + data_inicio = Parameter("data_inicio", default="", required=False) + data_fim = Parameter("data_fim", default="", required=False) + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=False, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=False, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) + # data = slice_data(current_time=CURRENT_TIME) + dados = tratar_dados_estacao(data_inicio_, data_fim_) + PATH = salvar_dados(dados=dados) + + # Create table in BigQuery + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=PATH, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + dump_mode=DUMP_MODE, + wait=PATH, + ) + + # para rodar na cloud cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index bd67f5dce..289af270f 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -198,3 +198,69 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: ) log(f"[DEBUG] Files saved on {prepath}") return prepath + + +@task +def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: + # Lista com as estações da cidade do Rio de Janeiro + estacoes_unicas = [ + "SBAF", + "SBGL", + "SBJR", + "SBRJ", + "SBSC", + ] + + dicionario = get_vault_secret("redemet-token") + + # Converte datas em int para cálculo de faixas. + data_inicio_int = int(data_inicio.replace("-", "")) + data_fim_int = int(data_fim.replace("-", "")) + + raw = [] + for id_estacao in estacoes_unicas: + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa + for data in range(data_inicio_int, data_fim_int + 1): + for hora in range(24): + url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" + res = requests.get(url) + if res.status_code != 200: + log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") + continue + res_data = json.loads(res.text) + if res_data["status"] is not True: + log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") + continue + if "data" not in res_data["data"]: + # Sem dados para esse horario + continue + raw.append(res_data) + + # Função para converter longitude de graus, minutos, segundos para decimal + res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) + res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) + return dados + + + +def converter_lat_lon(longitude_str): + longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/") + + # Divida a string com base nos espaços em branco + partes = longitude_str.split("/") + # print(partes) + + # Extraia os graus, minutos e segundos da lista de partes + graus = int(partes[0]) + minutos = int(partes[1]) + segundos = float(partes[2]) + + # Calcule o valor decimal + decimal = graus + (minutos / 60) + (segundos / 3600) + + # Verifique se a direção é Oeste (W) e faça o valor negativo + # Verifique se a direção é Norte (N) e retorne o valor decimal + if ("W" in partes[3].upper()) | ("S" in partes[3].upper()): + decimal = -decimal + + return decimal \ No newline at end of file From cb89b2c41d49e2c35034297a03492158020907d7 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Fri, 27 Oct 2023 13:32:13 +0000 Subject: [PATCH 02/42] =?UTF-8?q?teste=20atualiza=C3=A7=C3=A3o=20esta?= =?UTF-8?q?=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../meteorologia/meteorologia_redemet/flows.py | 1 + .../meteorologia/meteorologia_redemet/tasks.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 4b1cedaea..9ffd9f4d1 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -17,6 +17,7 @@ download, tratar_dados, salvar_dados, + tratar_dados_estacao, ) from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule from pipelines.utils.decorators import Flow diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 289af270f..3ec8835f6 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -239,17 +239,16 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: # Função para converter longitude de graus, minutos, segundos para decimal res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) - return dados - - - + return res_data + + def converter_lat_lon(longitude_str): longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/") - + # Divida a string com base nos espaços em branco partes = longitude_str.split("/") # print(partes) - + # Extraia os graus, minutos e segundos da lista de partes graus = int(partes[0]) minutos = int(partes[1]) @@ -262,5 +261,5 @@ def converter_lat_lon(longitude_str): # Verifique se a direção é Norte (N) e retorne o valor decimal if ("W" in partes[3].upper()) | ("S" in partes[3].upper()): decimal = -decimal - - return decimal \ No newline at end of file + + return decimal From a46fbadc35d42fcbe03ce62c76dd88dd3563e538 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Mon, 30 Oct 2023 17:44:46 +0000 Subject: [PATCH 03/42] testando flow --- .../meteorologia_redemet/tasks.py | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 3ec8835f6..5c8193444 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -200,6 +200,29 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: return prepath +def converter_lat_lon(longitude_str): + longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/") + + # Divida a string com base nos espaços em branco + partes = longitude_str.split("/") + # print(partes) + + # Extraia os graus, minutos e segundos da lista de partes + graus = int(partes[0]) + minutos = int(partes[1]) + segundos = float(partes[2]) + + # Calcule o valor decimal + decimal = graus + (minutos / 60) + (segundos / 3600) + + # Verifique se a direção é Oeste (W) e faça o valor negativo + # Verifique se a direção é Norte (N) e retorne o valor decimal + if ("W" in partes[3].upper()) | ("S" in partes[3].upper()): + decimal = -decimal + + return decimal + + @task def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: # Lista com as estações da cidade do Rio de Janeiro @@ -237,29 +260,6 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: raw.append(res_data) # Função para converter longitude de graus, minutos, segundos para decimal - res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) - res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) + # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) + # res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) return res_data - - -def converter_lat_lon(longitude_str): - longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/") - - # Divida a string com base nos espaços em branco - partes = longitude_str.split("/") - # print(partes) - - # Extraia os graus, minutos e segundos da lista de partes - graus = int(partes[0]) - minutos = int(partes[1]) - segundos = float(partes[2]) - - # Calcule o valor decimal - decimal = graus + (minutos / 60) + (segundos / 3600) - - # Verifique se a direção é Oeste (W) e faça o valor negativo - # Verifique se a direção é Norte (N) e retorne o valor decimal - if ("W" in partes[3].upper()) | ("S" in partes[3].upper()): - decimal = -decimal - - return decimal From c9b1d02ef96c84eb26966ac04188f2a7f1744299 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Mon, 30 Oct 2023 18:42:50 +0000 Subject: [PATCH 04/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 9ffd9f4d1..e601e3c14 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -172,7 +172,8 @@ data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) - PATH = salvar_dados(dados=dados) + print(dados) + #PATH = salvar_dados(dados=dados) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( From a94cc94c6fe232c2e2df8460566d58a8a898f321 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Mon, 30 Oct 2023 19:40:08 +0000 Subject: [PATCH 05/42] testando flow --- .../meteorologia/meteorologia_redemet/flows.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index e601e3c14..6101acf5e 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -176,13 +176,13 @@ #PATH = salvar_dados(dados=dados) # Create table in BigQuery - UPLOAD_TABLE = create_table_and_upload_to_gcs( - data_path=PATH, - dataset_id=DATASET_ID, - table_id=TABLE_ID, - dump_mode=DUMP_MODE, - wait=PATH, - ) + #UPLOAD_TABLE = create_table_and_upload_to_gcs( + # data_path=PATH, + # dataset_id=DATASET_ID, + # table_id=TABLE_ID, + # dump_mode=DUMP_MODE, + # wait=PATH, + #) # para rodar na cloud From be004f3b90bf584c5cf550bc662d309cb6c576af Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 12:40:19 +0000 Subject: [PATCH 06/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 6101acf5e..1a07da530 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -172,7 +172,8 @@ data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) - print(dados) + print("hi") + print(dados.columns) #PATH = salvar_dados(dados=dados) # Create table in BigQuery From 3bde70e17f639b5ecc417d2bfda96294768c9198 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 12:53:49 +0000 Subject: [PATCH 07/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 1a07da530..97c04f8a5 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -174,16 +174,16 @@ dados = tratar_dados_estacao(data_inicio_, data_fim_) print("hi") print(dados.columns) - #PATH = salvar_dados(dados=dados) + # PATH = salvar_dados(dados=dados) # Create table in BigQuery - #UPLOAD_TABLE = create_table_and_upload_to_gcs( + # UPLOAD_TABLE = create_table_and_upload_to_gcs( # data_path=PATH, # dataset_id=DATASET_ID, # table_id=TABLE_ID, # dump_mode=DUMP_MODE, # wait=PATH, - #) + # ) # para rodar na cloud From 4679df39b8029746853c27ab2aad56c40f28bda0 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 13:05:06 +0000 Subject: [PATCH 08/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 97c04f8a5..872907b03 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -173,7 +173,7 @@ # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) print("hi") - print(dados.columns) + # print(dados.columns) # PATH = salvar_dados(dados=dados) # Create table in BigQuery From e839ecc0c768885bf0d783048017882009ea57c9 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 13:35:30 +0000 Subject: [PATCH 09/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 872907b03..fed364895 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -28,6 +28,7 @@ get_current_flow_labels, ) +import logging with Flow( name="COR: Meteorologia - Meteorologia REDEMET", @@ -172,7 +173,7 @@ data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) - print("hi") + logging.info("hi") # print(dados.columns) # PATH = salvar_dados(dados=dados) From c53cd728a1bfea8dc1d7c61988a68289396907e6 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 13:58:09 +0000 Subject: [PATCH 10/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index fed364895..9e1d53785 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -173,6 +173,10 @@ data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) + + # Configure logging format and level + logging.basicConfig(level=logging.INFO, format="%(message)s") + logging.info("hi") # print(dados.columns) # PATH = salvar_dados(dados=dados) From 97066c9dbf475a3ea1d9d90b41f9926b8f2c9a16 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 14:41:31 +0000 Subject: [PATCH 11/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 9e1d53785..58352f330 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -174,10 +174,9 @@ # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) - # Configure logging format and level - logging.basicConfig(level=logging.INFO, format="%(message)s") + # debug + log(f"printa df {dados.head()}") - logging.info("hi") # print(dados.columns) # PATH = salvar_dados(dados=dados) From 29434dfb2a71cfb92aca3e13eb60f7d7ed6e9ca8 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 14:57:21 +0000 Subject: [PATCH 12/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 58352f330..4ac5b9ade 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -27,8 +27,7 @@ create_table_and_upload_to_gcs, get_current_flow_labels, ) - -import logging +from pipelines.utils.utils import log with Flow( name="COR: Meteorologia - Meteorologia REDEMET", From a105b7c2b12003d0bf7664b5c90cd8786730ed1b Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 15:09:35 +0000 Subject: [PATCH 13/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 4ac5b9ade..ca3611a63 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -174,7 +174,7 @@ dados = tratar_dados_estacao(data_inicio_, data_fim_) # debug - log(f"printa df {dados.head()}") + #log(f"printa df {dados.head()}") # print(dados.columns) # PATH = salvar_dados(dados=dados) From 08a514bcc5dd53d2d90d2b5006b22086e3825834 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 15:23:22 +0000 Subject: [PATCH 14/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index ca3611a63..2aa056bf6 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -174,7 +174,8 @@ dados = tratar_dados_estacao(data_inicio_, data_fim_) # debug - #log(f"printa df {dados.head()}") + # log(f"printa df {dados.head()}") + log("hi") # print(dados.columns) # PATH = salvar_dados(dados=dados) From 717c86e8ed13ff0d766e65a3d7194ee3e931c065 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 18:01:15 +0000 Subject: [PATCH 15/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 2 -- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 3 +++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 2aa056bf6..aece7fd4c 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -27,7 +27,6 @@ create_table_and_upload_to_gcs, get_current_flow_labels, ) -from pipelines.utils.utils import log with Flow( name="COR: Meteorologia - Meteorologia REDEMET", @@ -175,7 +174,6 @@ # debug # log(f"printa df {dados.head()}") - log("hi") # print(dados.columns) # PATH = salvar_dados(dados=dados) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 5c8193444..bd87863bc 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -19,6 +19,7 @@ to_partitions, parse_date_columns, ) +from pipelines.utils.utils import log @task(nout=3) @@ -259,6 +260,8 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: continue raw.append(res_data) + log("testando") + # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) # res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) From 8f683ac52bc4ea0d8d1520b6efc4605a49f87e66 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 18:22:32 +0000 Subject: [PATCH 16/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 4 ---- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index aece7fd4c..0e7d7f081 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -172,12 +172,8 @@ # data = slice_data(current_time=CURRENT_TIME) dados = tratar_dados_estacao(data_inicio_, data_fim_) - # debug - # log(f"printa df {dados.head()}") - # print(dados.columns) # PATH = salvar_dados(dados=dados) - # Create table in BigQuery # UPLOAD_TABLE = create_table_and_upload_to_gcs( # data_path=PATH, diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index bd87863bc..71eb8d474 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -260,7 +260,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: continue raw.append(res_data) - log("testando") + log(f"printa df {res_data.head()}") # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) From 48a7db338ce327be43dda31a0445a5664cb837a0 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 31 Oct 2023 19:05:29 +0000 Subject: [PATCH 17/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 71eb8d474..6b8b94056 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -260,7 +260,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: continue raw.append(res_data) - log(f"printa df {res_data.head()}") + log(f"printa df {res_data}") # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) From 8a19f6e0a03ea69f7a8e54bdfdac0e8b12ea571e Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 1 Nov 2023 18:31:48 +0000 Subject: [PATCH 18/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 6b8b94056..28e5f70ea 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -6,6 +6,7 @@ import json from pathlib import Path from typing import Tuple, Union +from unidecode import unidecode import pandas as pd import pendulum @@ -262,6 +263,11 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"printa df {res_data}") + # Removendo acentos + res_data['data']['nome'] = unidecode(res_data['data']['nome']) + + log(f"printa df {res_data}") + # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) # res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) From bf468ff09cc564eb19ad40de432dbc80f7aa66bf Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 1 Nov 2023 18:32:45 +0000 Subject: [PATCH 19/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 28e5f70ea..0902f1824 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -264,7 +264,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"printa df {res_data}") # Removendo acentos - res_data['data']['nome'] = unidecode(res_data['data']['nome']) + res_data["data"]["nome"] = unidecode(res_data["data"]["nome"]) log(f"printa df {res_data}") From d89eb87a1eebf71627360cb7ddd7a0145e4aa148 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 1 Nov 2023 19:44:01 +0000 Subject: [PATCH 20/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 0902f1824..72cf1ca69 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -264,7 +264,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"printa df {res_data}") # Removendo acentos - res_data["data"]["nome"] = unidecode(res_data["data"]["nome"]) + # res_data["data"]["nome"] = unidecode(res_data["data"]["nome"]) log(f"printa df {res_data}") From d9f6132ea95020e236321918b3ef045d24c48f61 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 1 Nov 2023 20:26:16 +0000 Subject: [PATCH 21/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 72cf1ca69..6b8b94056 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -6,7 +6,6 @@ import json from pathlib import Path from typing import Tuple, Union -from unidecode import unidecode import pandas as pd import pendulum @@ -263,11 +262,6 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"printa df {res_data}") - # Removendo acentos - # res_data["data"]["nome"] = unidecode(res_data["data"]["nome"]) - - log(f"printa df {res_data}") - # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) # res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) From 84cb335db750b854c23b27e7c5f7ea9febb18574 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 7 Nov 2023 20:19:30 +0000 Subject: [PATCH 22/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 6b8b94056..27623dfca 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -260,7 +260,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: continue raw.append(res_data) - log(f"printa df {res_data}") + log(f"printa dados {res_data}") # Função para converter longitude de graus, minutos, segundos para decimal # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) From 0eb518c71d83923ba84afccb51c0d127fffcf57d Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 7 Nov 2023 20:55:15 +0000 Subject: [PATCH 23/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 27623dfca..ce3c41b99 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -263,6 +263,6 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"printa dados {res_data}") # Função para converter longitude de graus, minutos, segundos para decimal - # res_data["latitude"] = res_data["lat"].apply(converter_lat_lon) - # res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) + res_data["latitude"] = res_data[data]["lat"].apply(converter_lat_lon) + res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) return res_data From d779cbce6ab9818f1a317e0a16ff30a8bb0d8385 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Tue, 7 Nov 2023 21:41:44 +0000 Subject: [PATCH 24/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index ce3c41b99..e7638d554 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -260,9 +260,9 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: continue raw.append(res_data) - log(f"printa dados {res_data}") - # Função para converter longitude de graus, minutos, segundos para decimal res_data["latitude"] = res_data[data]["lat"].apply(converter_lat_lon) res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) + + log(f"printa dados {res_data}") return res_data From e4f67c97cfdf5721fb8667b545d7c7d38b944c7b Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 8 Nov 2023 12:46:36 +0000 Subject: [PATCH 25/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index e7638d554..4512d7c14 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -261,8 +261,8 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: raw.append(res_data) # Função para converter longitude de graus, minutos, segundos para decimal - res_data["latitude"] = res_data[data]["lat"].apply(converter_lat_lon) - res_data["longitude"] = res_data["lon"].apply(converter_lat_lon) + # res_data['latitude'] = res_data['data']['lat'].apply(converter_lat_lon) + # res_data['longitude'] = res_data['data']['lon'].apply(converter_lat_lon) log(f"printa dados {res_data}") return res_data From 9f712e980b2d5e01c0ecec542e7d10ba05b3cc4c Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 8 Nov 2023 13:35:14 +0000 Subject: [PATCH 26/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 4512d7c14..b7b7b99b7 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -261,8 +261,8 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: raw.append(res_data) # Função para converter longitude de graus, minutos, segundos para decimal - # res_data['latitude'] = res_data['data']['lat'].apply(converter_lat_lon) - # res_data['longitude'] = res_data['data']['lon'].apply(converter_lat_lon) + res_data["latitude"] = res_data["data"]["lat"].apply(converter_lat_lon) + res_data["longitude"] = res_data["data"]["lon"].apply(converter_lat_lon) log(f"printa dados {res_data}") return res_data From 69b99a27226f571c5481cc08ba81a7eb45436c6f Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 8 Nov 2023 17:07:14 +0000 Subject: [PATCH 27/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index b7b7b99b7..19e27f550 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -261,8 +261,8 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: raw.append(res_data) # Função para converter longitude de graus, minutos, segundos para decimal - res_data["latitude"] = res_data["data"]["lat"].apply(converter_lat_lon) - res_data["longitude"] = res_data["data"]["lon"].apply(converter_lat_lon) + res_data["latitude"] = converter_lat_lon(res_data["data"]["lat"]) + res_data["longitude"] = converter_lat_lon(res_data["data"]["lon"]) log(f"printa dados {res_data}") return res_data From 1309b92fa25849d638afb875f72a38c2c768010a Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 8 Nov 2023 18:01:45 +0000 Subject: [PATCH 28/42] testando flow --- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 19e27f550..2aea47c2a 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -21,6 +21,8 @@ ) from pipelines.utils.utils import log +# import unidecode + @task(nout=3) def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: @@ -261,8 +263,10 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: raw.append(res_data) # Função para converter longitude de graus, minutos, segundos para decimal - res_data["latitude"] = converter_lat_lon(res_data["data"]["lat"]) - res_data["longitude"] = converter_lat_lon(res_data["data"]["lon"]) + res_data["data"]["lat"] = converter_lat_lon(res_data["data"]["lat"]) + res_data["data"]["lon"] = converter_lat_lon(res_data["data"]["lon"]) + + # res_data["data"]["nome"] = unidecode.unidecode(res_data["data"]["nome"]) log(f"printa dados {res_data}") return res_data From 3bfda35fe649de0990a5120e37e517b14f507819 Mon Sep 17 00:00:00 2001 From: Karina Passos Date: Wed, 8 Nov 2023 18:34:56 +0000 Subject: [PATCH 29/42] testando flow --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 2aea47c2a..f198ae121 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -21,7 +21,7 @@ ) from pipelines.utils.utils import log -# import unidecode +import unidecode @task(nout=3) @@ -265,8 +265,7 @@ def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: # Função para converter longitude de graus, minutos, segundos para decimal res_data["data"]["lat"] = converter_lat_lon(res_data["data"]["lat"]) res_data["data"]["lon"] = converter_lat_lon(res_data["data"]["lon"]) - - # res_data["data"]["nome"] = unidecode.unidecode(res_data["data"]["nome"]) + res_data["data"]["nome"] = unidecode.unidecode(res_data["data"]["nome"]) log(f"printa dados {res_data}") return res_data From 27e5277b84f283750edad2c0d2a94c62fedfcd2f Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 12:27:37 -0300 Subject: [PATCH 30/42] modifying redemet stations --- .../meteorologia_redemet/flows.py | 150 ++++++++--- .../meteorologia_redemet/schedules.py | 31 +++ .../meteorologia_redemet/tasks.py | 249 +++++++++--------- .../precipitacao_cemaden/flows.py | 15 +- .../precipitacao_cemaden/schedules.py | 8 +- .../precipitacao_cemaden/tasks.py | 71 +++-- 6 files changed, 314 insertions(+), 210 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 0e7d7f081..b14905868 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -13,13 +13,18 @@ from pipelines.constants import constants from pipelines.utils.constants import constants as utils_constants from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import ( + check_for_new_stations, get_dates, - download, - tratar_dados, - salvar_dados, - tratar_dados_estacao, + download_data, + download_stations_data, + treat_data, + treat_stations_data, + save_data, +) +from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import ( + hour_schedule, + month_schedule, ) -from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule from pipelines.utils.decorators import Flow from pipelines.utils.dump_db.constants import constants as dump_db_constants from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants @@ -35,13 +40,15 @@ "paty", ], ) as cor_meteorologia_meteorologia_redemet: - DATASET_ID = "clima_estacao_meteorologica" - TABLE_ID = "meteorologia_redemet" - DUMP_MODE = "append" + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID = Parameter( + "dataset_id", default="clima_estacao_meteorologica", required=True + ) + TABLE_ID = Parameter("table_id", default="meteorologia_redemet", required=True) - # data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD" - data_inicio = Parameter("data_inicio", default="", required=False) - data_fim = Parameter("data_fim", default="", required=False) + # first_date and last_date must be strings as "YYYY-MM-DD" + first_date = Parameter("first_date", default=None, required=False) + last_date = Parameter("last_date", default=None, required=False) # Materialization parameters MATERIALIZE_AFTER_DUMP = Parameter( @@ -61,11 +68,11 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) + first_date_, last_date_, backfill = get_dates(first_date, last_date) # data = slice_data(current_time=CURRENT_TIME) - dados = download(data_inicio_, data_fim_) - dados = tratar_dados(dados, backfill) - PATH = salvar_dados(dados=dados) + dados = download_data(first_date_, last_date_) + dados = treat_data(dados, backfill) + PATH = save_data(dados=dados) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( @@ -134,21 +141,26 @@ ) -# Código não finalizado; flow de atualização das estações +cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +cor_meteorologia_meteorologia_redemet.schedule = hour_schedule + + with Flow( - name="COR: Meteorologia - Atualização das estações", + name="COR: Meteorologia REDEMET - Atualização das estações", code_owners=[ "karinappassos", "paty", ], -) as cor_meteorologia_meteorologia_redemet: - DATASET_ID = "clima_estacao_meteorologica" - TABLE_ID = "meteorologia_redemet" - DUMP_MODE = "append" - - # data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD" - data_inicio = Parameter("data_inicio", default="", required=False) - data_fim = Parameter("data_fim", default="", required=False) +) as cor_meteorologia_meteorologia_redemet_estacoes: + DUMP_MODE = Parameter("dump_mode", default="overwrite", required=True) + DATASET_ID = Parameter( + "dataset_id", default="clima_estacao_meteorologica", required=True + ) + TABLE_ID = Parameter("table_id", default="estacoes_redemet", required=True) # Materialization parameters MATERIALIZE_AFTER_DUMP = Parameter( @@ -168,26 +180,84 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim) - # data = slice_data(current_time=CURRENT_TIME) - dados = tratar_dados_estacao(data_inicio_, data_fim_) + dataframe = download_stations_data() + dataframe = treat_stations_data(dataframe) + path = save_data(dataframe=dataframe, partition_column="data_alteracao") - # print(dados.columns) - # PATH = salvar_dados(dados=dados) # Create table in BigQuery - # UPLOAD_TABLE = create_table_and_upload_to_gcs( - # data_path=PATH, - # dataset_id=DATASET_ID, - # table_id=TABLE_ID, - # dump_mode=DUMP_MODE, - # wait=PATH, - # ) + UPLOAD_TABLE = create_table_and_upload_to_gcs( + data_path=path, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + dump_mode=DUMP_MODE, + wait=path, + ) + + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID}.{TABLE_ID}", + ) + + materialization_flow.set_upstream(UPLOAD_TABLE) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run( + dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + check_for_new_stations(dataframe) # para rodar na cloud -cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun( +cor_meteorologia_meteorologia_redemet_estacoes.storage = GCS( + constants.GCS_FLOWS_BUCKET.value +) +cor_meteorologia_meteorologia_redemet_estacoes.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_COR_AGENT_LABEL.value], ) -cor_meteorologia_meteorologia_redemet.schedule = hour_schedule +cor_meteorologia_meteorologia_redemet_estacoes.schedule = month_schedule diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py index e180adc28..5ff59fad4 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -16,6 +16,37 @@ labels=[ constants.RJ_COR_AGENT_LABEL.value, ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id": "clima_estacao_meteorologica", + "table_id": "meteorologia_redemet", + }, + ), + ] +) + +month_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(months=1), + start_date=datetime(2023, 1, 1, 0, 12, 0), + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], + parameter_defaults={ + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + # "dump_mode": "overwrite", + # "dataset_id": "clima_estacao_meteorologica", + # "table_id": "estacoes_redemet", + }, ), ] ) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index b2a72702f..4aaff6a4d 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -6,10 +6,13 @@ import json from pathlib import Path from typing import Tuple, Union +from unidecode import unidecode import pandas as pd import pendulum from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Failed import requests from pipelines.constants import constants @@ -19,43 +22,39 @@ to_partitions, parse_date_columns, ) -from pipelines.utils.utils import log - -import unidecode @task(nout=3) -def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]: +def get_dates(first_date: str, last_date: str) -> Tuple[str, str]: """ - Task para obter o dia de início e o de fim. - Se nenhuma data foi passada a data_inicio corresponde a ontem - e data_fim a hoje e não estamos fazendo backfill. - Caso contrário, retorna as datas inputadas mos parâmetros do flow. + Task to get first and last date. + If none date is passed on parameters or we are not doing a backfill + the first_date will be yesterday and last_date will be today. + Otherwise, this function will return date inputed on flow's parameters. """ # a API sempre retorna o dado em UTC - log(f"data de inicio e fim antes do if {data_inicio} {data_fim}") - if data_inicio == "": - data_fim = pendulum.now("UTC").format("YYYY-MM-DD") - data_inicio = pendulum.yesterday("UTC").format("YYYY-MM-DD") - backfill = 0 - else: + if first_date: backfill = 1 - log(f"data de inicio e fim dps do if {data_inicio} {data_fim}") + else: + last_date = pendulum.now("UTC").format("YYYY-MM-DD") + first_date = pendulum.yesterday("UTC").format("YYYY-MM-DD") + backfill = 0 + log(f"Selected first_date as: {first_date} and last_date as: {last_date}") - return data_inicio, data_fim, backfill + return first_date, last_date, backfill @task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def download(data_inicio: str, data_fim: str) -> pd.DataFrame: +def download_data(first_date: str, last_date: str) -> pd.DataFrame: """ - Faz o request na data especificada e retorna dados + Request data from especified date range """ - # Lista com as estações da cidade do Rio de Janeiro - estacoes_unicas = [ + # Stations inside Rio de Janeiro city + rj_stations = [ "SBAF", "SBGL", "SBJR", @@ -63,16 +62,16 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: "SBSC", ] - dicionario = get_vault_secret("redemet-token") + redemet_token = get_vault_secret("redemet-token") # Converte datas em int para cálculo de faixas. - data_inicio_int = int(data_inicio.replace("-", "")) - data_fim_int = int(data_fim.replace("-", "")) + first_date_int = int(first_date.replace("-", "")) + last_date_int = int(last_date.replace("-", "")) raw = [] - for id_estacao in estacoes_unicas: - base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa - for data in range(data_inicio_int, data_fim_int + 1): + for id_estacao in rj_stations: + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={redemet_token['data']['token']}" # noqa + for data in range(first_date_int, last_date_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" res = requests.get(url) @@ -84,69 +83,65 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame: log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") continue if "data" not in res_data["data"]: - # Sem dados para esse horario + # Sem dataframe para esse horario continue raw.append(res_data) - # Extrai objetos de dados + # Extrai objetos de dataframe raw = [res_data["data"] for res_data in raw] - # converte para dados - dados = pd.DataFrame(raw) + # converte para dataframe + dataframe = pd.DataFrame(raw) - return dados + return dataframe @task -def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: +def treat_data(dataframe: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: """ - Renomeia colunas e filtra dados com a hora do timestamp de execução + Rename cols, convert timestamp, filter data for the actual date """ drop_cols = ["nome", "cidade", "lon", "lat", "localizacao", "tempoImagem", "metar"] - # Checa se todas estão no df - drop_cols = [c for c in drop_cols if c in dados.columns] + # Check if all columns are on the dataframe + drop_cols = [c for c in drop_cols if c in dataframe.columns] - # Remove colunas que já temos os dados em outras tabelas - dados = dados.drop(drop_cols, axis=1) + # Remove columns that are already in another table + dataframe = dataframe.drop(drop_cols, axis=1) - # Adequando nome das variáveis rename_cols = { "localidade": "id_estacao", "ur": "umidade", } - dados = dados.rename(columns=rename_cols) + dataframe = dataframe.rename(columns=rename_cols) - # Converte horário de UTC para America/Sao Paulo + # Convert UTC time to America/Sao Paulo formato = "DD/MM/YYYY HH:mm(z)" - dados["data"] = dados["data"].apply( + dataframe["data"] = dataframe["data"].apply( lambda x: pendulum.from_format(x, formato) .in_tz("America/Sao_Paulo") .format(formato) ) - # Ordenamento de variáveis - chaves_primarias = ["id_estacao", "data"] - demais_cols = [c for c in dados.columns if c not in chaves_primarias] + # Order variables + primary_keys = ["id_estacao", "data"] + other_cols = [c for c in dataframe.columns if c not in primary_keys] - dados = dados[chaves_primarias + demais_cols] + dataframe = dataframe[primary_keys + other_cols] - # Converte variáveis que deveriam ser int para int - dados["temperatura"] = dados["temperatura"].apply( + # Clean data + dataframe["temperatura"] = dataframe["temperatura"].apply( lambda x: None if x[:-2] == "NIL" else int(x[:-2]) ) - dados["umidade"] = dados["umidade"].apply( + dataframe["umidade"] = dataframe["umidade"].apply( lambda x: None if "%" not in x else int(x[:-1]) ) - dados["data"] = pd.to_datetime(dados.data, format="%d/%m/%Y %H:%M(%Z)") - - # Pegar o dia no nosso timezone como partição - br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD") + dataframe["data"] = pd.to_datetime(dataframe.data, format="%d/%m/%Y %H:%M(%Z)") # Define colunas que serão salvas - dados = dados[ + dataframe = dataframe[ [ "id_estacao", "data", @@ -159,39 +154,40 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame: ] ] - # Remover dados duplicados - dados = dados.drop_duplicates(subset=["id_estacao", "data"]) + dataframe = dataframe.drop_duplicates(subset=["id_estacao", "data"]) - log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}") + log(f"Dados antes do filtro dia:\n{dataframe[['id_estacao', 'data']]}") if not backfill: - # Seleciona apenas dados daquele dia (devido à UTC) - dados = dados[dados["data"].dt.date.astype(str) == br_timezone] + # Select our date + br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD") - log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}") - log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}") + # Select only data from that date + dataframe = dataframe[dataframe["data"].dt.date.astype(str) == br_timezone] - # Remover fuso horário - dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S") - dados.rename(columns={"data": "data_medicao"}, inplace=True) + log(f">>>> min hora {dataframe[~dataframe.temperatura.isna()].data.min()}") + log(f">>>> max hora {dataframe[~dataframe.temperatura.isna()].data.max()}") - # Capitalizar os dados da coluna céu - dados["ceu"] = dados["ceu"].str.capitalize() + dataframe["data"] = dataframe["data"].dt.strftime("%Y-%m-%d %H:%M:%S") + dataframe.rename(columns={"data": "data_medicao"}, inplace=True) - return dados + dataframe["ceu"] = dataframe["ceu"].str.capitalize() + + return dataframe @task -def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: +def save_data( + dataframe: pd.DataFrame, partition_column: str = "data_medicao" +) -> Union[str, Path]: """ - Salvar dados em csv + Salve dataframe as a csv file """ prepath = Path("/tmp/meteorologia_redemet/") prepath.mkdir(parents=True, exist_ok=True) - partition_column = "data_medicao" - dataframe, partitions = parse_date_columns(dados, partition_column) + dataframe, partitions = parse_date_columns(dataframe, partition_column) # Cria partições a partir da data to_partitions( @@ -204,69 +200,78 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: return prepath -def converter_lat_lon(longitude_str): - longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/") +@task +def download_stations_data() -> pd.DataFrame: + """ + Download station information + """ + + redemet_token = get_vault_secret("redemet-token") + base_url = ( + f"https://api-redemet.decea.mil.br/aerodromos/?api_key={redemet_token}" # noqa + ) + url = f"{base_url}&pais=Brasil" + res = requests.get(url) + if res.status_code != 200: + print(f"Problem on request: {res.status_code}, {url}") + res_data = json.loads(res.text) + + dataframe = pd.DataFrame(res_data["data"]) + return dataframe - # Divida a string com base nos espaços em branco - partes = longitude_str.split("/") - # print(partes) - # Extraia os graus, minutos e segundos da lista de partes - graus = int(partes[0]) - minutos = int(partes[1]) - segundos = float(partes[2]) +@task +def treat_stations_data(dataframe: pd.DataFrame) -> pd.DataFrame: + """ + Treat station data + """ + rename_cols = { + "lat_dec": "latitude", + "lon_dec": "longitude", + "nome": "estacao", + "altitude_metros": "altitude", + "cod": "id_estacao", + } + dataframe = dataframe.rename(rename_cols, axis=1) - # Calcule o valor decimal - decimal = graus + (minutos / 60) + (segundos / 3600) + dataframe = dataframe[dataframe.cidade.str.contains("Rio de Janeiro")] - # Verifique se a direção é Oeste (W) e faça o valor negativo - # Verifique se a direção é Norte (N) e retorne o valor decimal - if ("W" in partes[3].upper()) | ("S" in partes[3].upper()): - decimal = -decimal + dataframe["estacao"] = dataframe["estacao"].apply(unidecode) + dataframe["data_atualizacao"] = pendulum.now(tz="America/Sao_Paulo").format( + "YYYY-MM-DD" + ) - return decimal + keep_cols = [ + "id_estacao", + "estacao", + "latitude", + "longitude", + "altitude", + "data_atualizacao", + ] + return dataframe[keep_cols] @task -def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame: - # Lista com as estações da cidade do Rio de Janeiro - estacoes_unicas = [ +def check_for_new_stations(dataframe: pd.DataFrame): + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + change treat_data task. + """ + + stations_before = [ "SBAF", "SBGL", "SBJR", "SBRJ", "SBSC", ] - - dicionario = get_vault_secret("redemet-token") - - # Converte datas em int para cálculo de faixas. - data_inicio_int = int(data_inicio.replace("-", "")) - data_fim_int = int(data_fim.replace("-", "")) - - raw = [] - for id_estacao in estacoes_unicas: - base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa - for data in range(data_inicio_int, data_fim_int + 1): - for hora in range(24): - url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" - res = requests.get(url) - if res.status_code != 200: - log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") - continue - res_data = json.loads(res.text) - if res_data["status"] is not True: - log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") - continue - if "data" not in res_data["data"]: - # Sem dados para esse horario - continue - raw.append(res_data) - - # Função para converter longitude de graus, minutos, segundos para decimal - res_data["data"]["lat"] = converter_lat_lon(res_data["data"]["lat"]) - res_data["data"]["lon"] = converter_lat_lon(res_data["data"]["lon"]) - res_data["data"]["nome"] = unidecode.unidecode(res_data["data"]["nome"]) - - log(f"printa dados {res_data}") - return res_data + new_stations = [ + i for i in dataframe.id_estacao.unique() if i not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update REDEMET\ + flow and add station(s) {new_stations}" + log(message) + raise ENDRUN(state=Failed(message)) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index d4f8f0810..aa0beea16 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -17,8 +17,8 @@ constants as cemaden_constants, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks import ( - tratar_dados, - salvar_dados, + treat_data, + save_data, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.schedules import ( minute_schedule, @@ -46,10 +46,9 @@ ], # skip_if_running=True, ) as cor_meteorologia_precipitacao_cemaden: - DATASET_ID = "clima_pluviometro" - TABLE_ID = "taxa_precipitacao_cemaden" - DUMP_MODE = "append" - + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID = Parameter("dataset_id", default="clima_pluviometro", required=True) + TABLE_ID = Parameter("table_id", default="taxa_precipitacao_cemaden", required=True) # Materialization parameters MATERIALIZE_AFTER_DUMP = Parameter( "materialize_after_dump", default=False, required=False @@ -71,12 +70,12 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) - dados = tratar_dados( + dataframe = treat_data( dataset_id=DATASET_ID, table_id=TABLE_ID, mode=MATERIALIZATION_MODE, ) - path = salvar_dados(dados=dados) + path = save_data(dataframe=dataframe) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index f3e2c8a77..a039baa7b 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -19,11 +19,13 @@ ], parameter_defaults={ # "trigger_rain_dashboard_update": True, - # "materialize_after_dump": True, - "materialize_after_dump": False, - # "mode": "prod", + "materialize_after_dump": True, + "mode": "prod", "materialize_to_datario": False, "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id": "clima_pluviometro", + "table_id": "taxa_precipitacao_cemaden", }, ), ] diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index b49c1afb0..128e48707 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -13,9 +13,6 @@ from prefect import task from prefect.engine.signals import ENDRUN from prefect.engine.state import Skipped - -# from prefect import context - from pipelines.constants import constants from pipelines.utils.utils import ( log, @@ -30,16 +27,15 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def tratar_dados( +def treat_data( dataset_id: str, table_id: str, mode: str = "dev" ) -> Tuple[pd.DataFrame, bool]: """ - Renomeia colunas e filtra dados com a hora e minuto do timestamp - de execução mais próximo à este + Rename cols and filter data using hour and minute from the nearest current timestamp """ url = "http://sjc.salvar.cemaden.gov.br/resources/graficos/interativo/getJson2.php?uf=RJ" - dados = pd.read_json(url) + dataframe = pd.read_json(url) drop_cols = [ "uf", @@ -63,34 +59,34 @@ def tratar_dados( "acc96hr": "acumulado_chuva_96_h", } - dados = ( - dados[(dados["codibge"] == 3304557) & (dados["tipoestacao"] == 1)] + dataframe = ( + dataframe[(dataframe["codibge"] == 3304557) & (dataframe["tipoestacao"] == 1)] .drop(drop_cols, axis=1) .rename(rename_cols, axis=1) ) - log(f"\n[DEBUG]: df.head() {dados.head()}") + log(f"\n[DEBUG]: df.head() {dataframe.head()}") - # Converte de UTC para horário São Paulo - dados["data_medicao_utc"] = pd.to_datetime( - dados["data_medicao_utc"], dayfirst=True + # Convert from UTC to São Paulo timezone + dataframe["data_medicao_utc"] = pd.to_datetime( + dataframe["data_medicao_utc"], dayfirst=True ) + pd.DateOffset(hours=0) - dados["data_medicao"] = ( - dados["data_medicao_utc"] + dataframe["data_medicao"] = ( + dataframe["data_medicao_utc"] .dt.tz_localize("UTC") .dt.tz_convert("America/Sao_Paulo") ) see_cols = ["data_medicao_utc", "data_medicao", "id_estacao", "acumulado_chuva_1_h"] - log(f"DEBUG: data utc -> GMT-3 {dados[see_cols]}") + log(f"DEBUG: data utc -> GMT-3 {dataframe[see_cols]}") date_format = "%Y-%m-%d %H:%M:%S" - dados["data_medicao"] = dados["data_medicao"].dt.strftime(date_format) + dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format) - log(f"DEBUG: data {dados[see_cols]}") + log(f"DEBUG: data {dataframe[see_cols]}") - # Alterando valores '-' e np.nan para NULL - dados.replace(["-", np.nan], [0, None], inplace=True) + # Change values '-' and np.nan to NULL + dataframe.replace(["-", np.nan], [0, None], inplace=True) - # Altera valores negativos para None + # Change negative values to None float_cols = [ "acumulado_chuva_10_min", "acumulado_chuva_1_h", @@ -102,16 +98,18 @@ def tratar_dados( "acumulado_chuva_72_h", "acumulado_chuva_96_h", ] - dados[float_cols] = np.where(dados[float_cols] < 0, None, dados[float_cols]) + dataframe[float_cols] = np.where( + dataframe[float_cols] < 0, None, dataframe[float_cols] + ) - # Elimina linhas em que o id_estacao é igual mantendo a de menor valor nas colunas float - dados.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) - dados.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + # Eliminate where the id_estacao is the same keeping the smallest one + dataframe.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) + dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") - log(f"Dataframe before comparing with last data saved on redis {dados.head()}") + log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}") - dados = save_updated_rows_on_redis( - dados, + dataframe = save_updated_rows_on_redis( + dataframe, dataset_id, table_id, unique_id="id_estacao", @@ -120,16 +118,16 @@ def tratar_dados( mode=mode, ) - log(f"Dataframe after comparing with last data saved on redis {dados.head()}") + log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}") # If df is empty stop flow - if dados.shape[0] == 0: + if dataframe.shape[0] == 0: skip_text = "No new data available on API" log(skip_text) raise ENDRUN(state=Skipped(skip_text)) - # Fixar ordem das colunas - dados = dados[ + # Fix columns order + dataframe = dataframe[ [ "id_estacao", "data_medicao", @@ -145,23 +143,22 @@ def tratar_dados( ] ] - return dados + return dataframe @task -def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: +def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: """ - Salvar dados tratados em csv para conseguir subir pro GCP + Save data on a csv file to be uploaded to GCP """ prepath = Path("/tmp/precipitacao_cemaden/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" - dataframe, partitions = parse_date_columns(dados, partition_column) + dataframe, partitions = parse_date_columns(dataframe, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") - # Cria partições a partir da data to_partitions( data=dataframe, partition_columns=partitions, From 04377923c11e18e6555b6c0eb0e5bf7f5bba8ba3 Mon Sep 17 00:00:00 2001 From: Thiago Felipe Date: Fri, 15 Dec 2023 14:35:46 -0300 Subject: [PATCH 31/42] uptating Wait On Check Action --- .github/workflows/cd_staging.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cd_staging.yaml b/.github/workflows/cd_staging.yaml index 090e88d27..fb7307a50 100644 --- a/.github/workflows/cd_staging.yaml +++ b/.github/workflows/cd_staging.yaml @@ -77,7 +77,7 @@ jobs: echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml - name: Wait for Docker image to be available - uses: lewagon/wait-on-check-action@v1.1.2 + uses: lewagon/wait-on-check-action@v1.3.1 with: ref: ${{ github.event.pull_request.head.sha || github.sha }} check-name: 'Build Docker image' From 70fb6fbe7439c900cdd7f591417d17f67e1933b8 Mon Sep 17 00:00:00 2001 From: Patricia Bongiovanni Catandi <62657143+patriciacatandi@users.noreply.github.com> Date: Fri, 15 Dec 2023 14:56:26 -0300 Subject: [PATCH 32/42] Update schedules.py --- pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py index 5ff59fad4..daa992735 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -11,7 +11,7 @@ hour_schedule = Schedule( clocks=[ IntervalClock( - interval=timedelta(hours=1), + interval=timedelta(days=30), start_date=datetime(2023, 1, 1, 0, 12, 0), labels=[ constants.RJ_COR_AGENT_LABEL.value, From 2ea96a5ea046277a65bcbd84a209d4313dcbede6 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 15:07:28 -0300 Subject: [PATCH 33/42] changing scheduler --- pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py index 5ff59fad4..9606a92ec 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py @@ -33,7 +33,7 @@ month_schedule = Schedule( clocks=[ IntervalClock( - interval=timedelta(months=1), + interval=timedelta(days=30), start_date=datetime(2023, 1, 1, 0, 12, 0), labels=[ constants.RJ_COR_AGENT_LABEL.value, From 337dd56ff63dcd865dea857ae2af04d036033ba2 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 15:30:38 -0300 Subject: [PATCH 34/42] adding verification for changes on cemaden stations --- .../precipitacao_cemaden/flows.py | 7 ++ .../precipitacao_cemaden/tasks.py | 76 +++++++++++++++++-- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index aa0beea16..7ab78ddd8 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -17,6 +17,8 @@ constants as cemaden_constants, ) from pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks import ( + check_for_new_stations, + download_data, treat_data, save_data, ) @@ -70,7 +72,9 @@ default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, ) + dataframe = download_data() dataframe = treat_data( + dataframe=dataframe, dataset_id=DATASET_ID, table_id=TABLE_ID, mode=MATERIALIZATION_MODE, @@ -181,6 +185,9 @@ raise_final_state=True, ) + check_for_new_stations(dataframe) + check_for_new_stations.set_upstream(UPLOAD_TABLE) + # para rodar na cloud cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) cor_meteorologia_precipitacao_cemaden.run_config = KubernetesRun( diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 128e48707..27ad1c04e 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -12,7 +12,7 @@ import pendulum from prefect import task from prefect.engine.signals import ENDRUN -from prefect.engine.state import Skipped +from prefect.engine.state import Skipped, Failed from pipelines.constants import constants from pipelines.utils.utils import ( log, @@ -27,15 +27,27 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def treat_data( - dataset_id: str, table_id: str, mode: str = "dev" -) -> Tuple[pd.DataFrame, bool]: +def download_data() -> pd.DataFrame: """ - Rename cols and filter data using hour and minute from the nearest current timestamp + Download data from API """ url = "http://sjc.salvar.cemaden.gov.br/resources/graficos/interativo/getJson2.php?uf=RJ" dataframe = pd.read_json(url) + return dataframe + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def treat_data( + dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, bool]: + """ + Rename cols and filter data using hour and minute from the nearest current timestamp + """ drop_cols = [ "uf", @@ -168,3 +180,57 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: ) log(f"[DEBUG] Files saved on {prepath}") return prepath + + +@task +def check_for_new_stations(dataframe: pd.DataFrame): + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + add this new station on estacoes_redemet. + I can't automatically update this new station, because + I couldn't find a url that gives me the lat and lon for + all the stations. To manually update enter + http://www2.cemaden.gov.br/mapainterativo/# > + Download de Dados > Estações Pluviométricas and fill the + requested information. + """ + + stations_before = [ + "Abolicao", + "Tanque jacarepagua", + "Penha", + "Praca seca", + "Gloria", + "Est. pedra bonita", + "Jardim maravilha", + "Santa cruz", + "Realengo batan", + "Padre miguel", + "Salgueiro", + "Andarai", + "Ciep samuel wainer", + "Vargem pequena", + "Jacarepagua", + "Ciep dr. joao ramos de souza", + "Sao conrado", + "Catete", + "Pavuna", + "Vigario geral", + "Defesa civil santa cruz", + "Vicente de carvalho", + "Alto da boa vista", + "Tijuca", + "Usina", + "Higienopolis", + "Pilares", + "Ilha de paqueta", + ] + new_stations = [ + i for i in dataframe.id_estacao.unique() if i not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update CEMADEN\ + estacoes_cemaden adding station(s) {new_stations}" + log(message) + raise ENDRUN(state=Failed(message)) From e424540df1fbbe4719956faa5d7eeae17ec8166c Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 15:40:29 -0300 Subject: [PATCH 35/42] removing url from log --- pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 4aaff6a4d..5b9f5017e 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -76,11 +76,11 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame: url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" res = requests.get(url) if res.status_code != 200: - log(f"Problema no id: {id_estacao}, {res.status_code}, {url}") + log(f"Problema no id: {id_estacao}, {res.status_code}") continue res_data = json.loads(res.text) if res_data["status"] is not True: - log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}") + log(f"Problema no id: {id_estacao}, {res_data['message']}") continue if "data" not in res_data["data"]: # Sem dataframe para esse horario @@ -213,7 +213,7 @@ def download_stations_data() -> pd.DataFrame: url = f"{base_url}&pais=Brasil" res = requests.get(url) if res.status_code != 200: - print(f"Problem on request: {res.status_code}, {url}") + print(f"Problem on request: {res.status_code}") res_data = json.loads(res.text) dataframe = pd.DataFrame(res_data["data"]) From 4090543853640077c8168f8bda0ea35a2893a94d Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 15:49:52 -0300 Subject: [PATCH 36/42] bugfix alertario --- pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py index bd7dbb9d0..ff95c3eaa 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py @@ -143,7 +143,7 @@ def tratar_dados( save_str_on_redis(redis_key, "date", max_date) else: # If df is empty stop flow on flows.py - log(f"Dataframe is empty. Skipping update flow for datetime {date}.") + log("Dataframe is empty. Skipping update flow.") # Fixar ordem das colunas dados = dados[ From 3968029eb03e567404108b9986965ba5ace4a6cd Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 15:52:27 -0300 Subject: [PATCH 37/42] bugfix redemet --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index b14905868..d916737b1 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -70,9 +70,9 @@ first_date_, last_date_, backfill = get_dates(first_date, last_date) # data = slice_data(current_time=CURRENT_TIME) - dados = download_data(first_date_, last_date_) - dados = treat_data(dados, backfill) - PATH = save_data(dados=dados) + dataframe = download_data(first_date_, last_date_) + dataframe = treat_data(dataframe, backfill) + PATH = save_data(dataframe=dataframe) # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( From 13198772e73d64b673f733d97eb6d880083c83bf Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 16:45:27 -0300 Subject: [PATCH 38/42] bugfix cemaden and redemet --- .../rj_cor/meteorologia/meteorologia_redemet/flows.py | 2 +- .../rj_cor/meteorologia/meteorologia_redemet/tasks.py | 9 ++++++++- .../rj_cor/meteorologia/precipitacao_cemaden/flows.py | 3 +-- .../rj_cor/meteorologia/precipitacao_cemaden/tasks.py | 5 ++++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index d916737b1..39361a1bb 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -250,7 +250,7 @@ raise_final_state=True, ) - check_for_new_stations(dataframe) + check_for_new_stations(dataframe, wait=UPLOAD_TABLE) # para rodar na cloud cor_meteorologia_meteorologia_redemet_estacoes.storage = GCS( diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 5b9f5017e..0d55dc450 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -214,9 +214,13 @@ def download_stations_data() -> pd.DataFrame: res = requests.get(url) if res.status_code != 200: print(f"Problem on request: {res.status_code}") + res_data = json.loads(res.text) + log(f"API Return: {res_data}") dataframe = pd.DataFrame(res_data["data"]) + log(f"Stations dataframe: {dataframe.head()}") + return dataframe @@ -253,7 +257,10 @@ def treat_stations_data(dataframe: pd.DataFrame) -> pd.DataFrame: @task -def check_for_new_stations(dataframe: pd.DataFrame): +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +): """ Check if the updated stations are the same as before. If not, consider flow as failed and call attention to diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index 7ab78ddd8..7928a2d3c 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -185,8 +185,7 @@ raise_final_state=True, ) - check_for_new_stations(dataframe) - check_for_new_stations.set_upstream(UPLOAD_TABLE) + check_for_new_stations(dataframe, wait=UPLOAD_TABLE) # para rodar na cloud cor_meteorologia_precipitacao_cemaden.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 27ad1c04e..e53a586e9 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -183,7 +183,10 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: @task -def check_for_new_stations(dataframe: pd.DataFrame): +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +) -> None: """ Check if the updated stations are the same as before. If not, consider flow as failed and call attention to From 499935a191fcb0f08844b725c71e67fc848808e3 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 17:48:21 -0300 Subject: [PATCH 39/42] bugfix cemaden and redemet --- pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py | 1 - pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py | 4 +++- pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py | 1 - .../rj_cor/meteorologia/precipitacao_cemaden/schedules.py | 2 +- pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py | 4 +++- 5 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 39361a1bb..409995c3a 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -36,7 +36,6 @@ with Flow( name="COR: Meteorologia - Meteorologia REDEMET", code_owners=[ - "richardg867", "paty", ], ) as cor_meteorologia_meteorologia_redemet: diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py index 0d55dc450..7bb7801a8 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py @@ -63,6 +63,7 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame: ] redemet_token = get_vault_secret("redemet-token") + redemet_token = redemet_token["data"]["token"] # Converte datas em int para cálculo de faixas. first_date_int = int(first_date.replace("-", "")) @@ -70,7 +71,7 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame: raw = [] for id_estacao in rj_stations: - base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={redemet_token['data']['token']}" # noqa + base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={redemet_token}" # noqa for data in range(first_date_int, last_date_int + 1): for hora in range(24): url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}" @@ -207,6 +208,7 @@ def download_stations_data() -> pd.DataFrame: """ redemet_token = get_vault_secret("redemet-token") + redemet_token = redemet_token["data"]["token"] base_url = ( f"https://api-redemet.decea.mil.br/aerodromos/?api_key={redemet_token}" # noqa ) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py index 7928a2d3c..d4631c61d 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/flows.py @@ -43,7 +43,6 @@ with Flow( name="COR: Meteorologia - Precipitacao CEMADEN", code_owners=[ - "richardg867", "paty", ], # skip_if_running=True, diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py index a039baa7b..6b56ffe92 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/schedules.py @@ -21,7 +21,7 @@ # "trigger_rain_dashboard_update": True, "materialize_after_dump": True, "mode": "prod", - "materialize_to_datario": False, + "materialize_to_datario": True, "dump_to_gcs": False, "dump_mode": "append", "dataset_id": "clima_pluviometro", diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index e53a586e9..c4de942b4 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -230,7 +230,9 @@ def check_for_new_stations( "Ilha de paqueta", ] new_stations = [ - i for i in dataframe.id_estacao.unique() if i not in stations_before + i + for i in dataframe.id_estacao.unique() + if i.capitalize() not in stations_before ] if len(new_stations) != 0: message = f"New station identified. You need to update CEMADEN\ From d6ee28a1448b2484ea098abd1bcae0385959e02b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 18:19:09 -0300 Subject: [PATCH 40/42] bugfix cemaden and redemet --- .../meteorologia_redemet/flows.py | 2 +- .../precipitacao_alertario/tasks.py | 26 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py index 409995c3a..485c0cfa9 100644 --- a/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py +++ b/pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py @@ -181,7 +181,7 @@ dataframe = download_stations_data() dataframe = treat_stations_data(dataframe) - path = save_data(dataframe=dataframe, partition_column="data_alteracao") + path = save_data(dataframe=dataframe, partition_column="data_atualizacao") # Create table in BigQuery UPLOAD_TABLE = create_table_and_upload_to_gcs( diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py index ff95c3eaa..3f5feff92 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py @@ -118,17 +118,21 @@ def tratar_dados( mode=mode, ) - # Ajustando dados da meia-noite que vem sem o horário - for index, row in dados.iterrows(): - try: - date = pd.to_datetime(row["data_medicao"], format="%Y-%m-%d %H:%M:%S") - except ValueError: - date = pd.to_datetime(row["data_medicao"]) + pd.DateOffset(hours=0) - - dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S") - - # dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S") - log(f"Dataframe after comparing with last data saved on redis {dados.head()}") + # # Ajustando dados da meia-noite que vem sem o horário + # for index, row in dados.iterrows(): + # try: + # date = pd.to_datetime(row["data_medicao"], format="%Y-%m-%d %H:%M:%S") + # except ValueError: + # date = pd.to_datetime(row["data_medicao"]) + pd.DateOffset(hours=0) + + # dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S") + + see_cols = ["id_estacao", "data_medicao", "last_update"] + log( + f"Dataframe after comparing with last data saved on redis {dados[see_cols].head()}" + ) + dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S") + log(f"Dataframe after converting to string {dados[see_cols].head()}") if dados.shape[0] > 0: log(f"Dataframe after comparing with last data saved on redis {dados.iloc[0]}") From 68b2b5436972890dd07f4b17f00c61975310c9a7 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Dec 2023 19:02:08 -0300 Subject: [PATCH 41/42] temporary remove failed if new stations from cemaden --- pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index c4de942b4..4b3f5452b 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -12,7 +12,7 @@ import pendulum from prefect import task from prefect.engine.signals import ENDRUN -from prefect.engine.state import Skipped, Failed +from prefect.engine.state import Skipped # , Failed from pipelines.constants import constants from pipelines.utils.utils import ( log, @@ -238,4 +238,4 @@ def check_for_new_stations( message = f"New station identified. You need to update CEMADEN\ estacoes_cemaden adding station(s) {new_stations}" log(message) - raise ENDRUN(state=Failed(message)) + # raise ENDRUN(state=Failed(message)) From 19d1af60200e0f9fd514295258e749f1fdf1d75d Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Sat, 16 Dec 2023 09:32:19 -0300 Subject: [PATCH 42/42] changing stations cemaden --- .../precipitacao_cemaden/tasks.py | 69 ++++++++----------- 1 file changed, 30 insertions(+), 39 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py index 4b3f5452b..e0e1835b3 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_cemaden/tasks.py @@ -12,7 +12,7 @@ import pendulum from prefect import task from prefect.engine.signals import ENDRUN -from prefect.engine.state import Skipped # , Failed +from prefect.engine.state import Skipped, Failed from pipelines.constants import constants from pipelines.utils.utils import ( log, @@ -190,52 +190,43 @@ def check_for_new_stations( """ Check if the updated stations are the same as before. If not, consider flow as failed and call attention to - add this new station on estacoes_redemet. + add this new station on estacoes_cemaden. I can't automatically update this new station, because I couldn't find a url that gives me the lat and lon for - all the stations. To manually update enter - http://www2.cemaden.gov.br/mapainterativo/# > - Download de Dados > Estações Pluviométricas and fill the - requested information. + all the stations. """ stations_before = [ - "Abolicao", - "Tanque jacarepagua", - "Penha", - "Praca seca", - "Gloria", - "Est. pedra bonita", - "Jardim maravilha", - "Santa cruz", - "Realengo batan", - "Padre miguel", - "Salgueiro", - "Andarai", - "Ciep samuel wainer", - "Vargem pequena", - "Jacarepagua", - "Ciep dr. joao ramos de souza", - "Sao conrado", - "Catete", - "Pavuna", - "Vigario geral", - "Defesa civil santa cruz", - "Vicente de carvalho", - "Alto da boa vista", - "Tijuca", - "Usina", - "Higienopolis", - "Pilares", - "Ilha de paqueta", + "3043", + "3044", + "3045", + "3114", + "3215", + "7593", + "7594", + "7595", + "7596", + "7597", + "7599", + "7600", + "7601", + "7602", + "7603", + "7606", + "7609", + "7610", + "7611", + "7612", + "7613", + "7614", + "7615", ] new_stations = [ - i - for i in dataframe.id_estacao.unique() - if i.capitalize() not in stations_before + i for i in dataframe.id_estacao.unique() if str(i) not in stations_before ] if len(new_stations) != 0: message = f"New station identified. You need to update CEMADEN\ - estacoes_cemaden adding station(s) {new_stations}" + estacoes_cemaden adding station(s) {new_stations}: \ + {dataframe[dataframe.id_estacao.isin(new_stations)]} " log(message) - # raise ENDRUN(state=Failed(message)) + raise ENDRUN(state=Failed(message))