Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adicionar flows de meteorologia Cemaden e REDEMET #449

Merged
merged 86 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
fe0b719
feat: add precipitacao_cemaden
richardg867 May 12, 2023
3685fc6
fix: remover nome de estacao Cemaden
richardg867 May 12, 2023
f2810ba
fix: corrigir espaco de tempo na query Cemaden
richardg867 May 12, 2023
722856f
fix: registrar flow Cemaden
richardg867 May 12, 2023
c6aa1c3
fix: corrigir erro em limpeza de dados não-float do Cemaden
richardg867 May 12, 2023
c0af02e
feat: add meteorologia_redemet
richardg867 May 12, 2023
5027931
fix: corrigir erro de log REDEMET
richardg867 May 13, 2023
8040d0a
fix: tentativa de diagnosticar problemas com pipeline REDEMET
richardg867 May 19, 2023
f640429
Merge branch 'master' into staging/cor-richard
mergify[bot] May 19, 2023
1d8482a
fix: nao interromper pipeline REDEMET em caso de falta de dados para …
richardg867 May 19, 2023
e830d3b
Merge branch 'master' into staging/cor-richard
mergify[bot] May 22, 2023
9c2204c
Merge branch 'master' into staging/cor-richard
mergify[bot] May 25, 2023
2a8267c
Merge branch 'staging/cor-richard' of ssh://github.com/prefeitura-rio…
richardg867 May 26, 2023
e7679f7
chore: tentativa de diagnosticar mais problemas REDEMET
richardg867 May 26, 2023
c6f7028
fix: mais correcoes no REDEMET
richardg867 May 26, 2023
d34f98e
Merge branch 'master' into staging/cor-richard
mergify[bot] May 30, 2023
5fd1df4
Merge branch 'master' into staging/cor-richard
mergify[bot] May 31, 2023
e156a3f
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 5, 2023
d513fe4
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 6, 2023
b4da7e9
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 6, 2023
e45ff1a
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 6, 2023
dda6d23
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 13, 2023
3b90311
chore: Mais uma tentativa de diagnosticar o REDEMET
richardg867 Jun 14, 2023
97573dc
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 15, 2023
60f586d
fix: Corrigir filtro de datas REDEMET
richardg867 Jun 16, 2023
9383cf0
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 17, 2023
61ae82e
fix: Remover dados duplicados REDEMET
richardg867 Jun 22, 2023
1cf6ea3
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 22, 2023
329e106
Merge branch 'master' into staging/cor-richard
mergify[bot] Jun 23, 2023
c6591f2
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 3, 2023
0844e64
chore: Corrigidas warnings no codigo REDEMET
richardg867 Jul 4, 2023
06b9fd0
fix: Migrar flows Cemaden e REDEMET para agente do COR
richardg867 Jul 4, 2023
1d1e74f
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 6, 2023
32996ad
fix: Remover fuso horario REDEMET para auxiliar o DBT
richardg867 Jul 6, 2023
801f312
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 7, 2023
f1b74dd
Merge branch 'staging/cor-richard' of ssh://github.com/prefeitura-rio…
richardg867 Jul 7, 2023
db37ad5
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 7, 2023
7a3cc61
Merge branch 'master' into staging/cor-richard
gabriel-milan Jul 11, 2023
797bdab
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 11, 2023
816be6d
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 12, 2023
2fed5eb
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 12, 2023
5c4572b
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 12, 2023
ac86fa8
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 12, 2023
12be7f5
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 17, 2023
7c04484
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 25, 2023
ef1bbd0
Merge branch 'master' into staging/cor-richard
mergify[bot] Jul 25, 2023
c646acf
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 10, 2023
d582b54
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 14, 2023
345c64b
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 16, 2023
e81c81f
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 16, 2023
7bf1f7e
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 16, 2023
3fc3559
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 18, 2023
b01fbed
Merge branch 'master' into staging/cor-richard
patriciacatandi Aug 24, 2023
d3bfe7f
melhorias no flow
patriciacatandi Aug 24, 2023
7962d3f
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 25, 2023
b7064de
salvando últimos dados no redis
patriciacatandi Aug 29, 2023
1637b7b
Merge branch 'staging/cor-richard' of github.com:prefeitura-rio/pipel…
patriciacatandi Aug 29, 2023
630a7a7
Merge branch 'master' into staging/cor-richard
mergify[bot] Aug 30, 2023
8606994
ajustando quando nunca foi salvo dado no redis
patriciacatandi Aug 31, 2023
85ab0f5
Merge branch 'staging/cor-richard' of github.com:prefeitura-rio/pipel…
patriciacatandi Aug 31, 2023
0641994
Merge branch 'master' into staging/cor-richard
mergify[bot] Sep 4, 2023
e28972d
Merge branch 'master' into staging/cor-richard
mergify[bot] Sep 18, 2023
d165f9b
Merge branch 'master' into staging/cor-richard
mergify[bot] Sep 19, 2023
b7360b5
Merge branch 'master' into staging/cor-richard
patriciacatandi Oct 10, 2023
129f9ac
remoção do slice_data
patriciacatandi Oct 10, 2023
6da7e97
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 10, 2023
91f67e4
testando
KarinaPassos Oct 16, 2023
1263f3b
testando
KarinaPassos Oct 17, 2023
77b8a96
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 17, 2023
bab07fa
Capitalizando os dados da coluna céu
KarinaPassos Oct 19, 2023
77310fc
Capitalizando os dados da coluna céu
KarinaPassos Oct 19, 2023
055056d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 19, 2023
6e55508
Capitalizando os dados da coluna céu
KarinaPassos Oct 19, 2023
abd6d28
Changing column name tasks.py
patriciacatandi Oct 19, 2023
df20211
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 19, 2023
6e9c847
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 19, 2023
c760b10
bugfix
patriciacatandi Oct 19, 2023
e957730
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 20, 2023
928f729
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 20, 2023
fa51bc3
bugfix
Oct 20, 2023
edd6293
Merge branch 'staging/cor-richard' of https://github.com/prefeitura-r…
KarinaPassos Oct 20, 2023
a333148
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 23, 2023
857d7a6
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 23, 2023
c39f371
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 23, 2023
66ef586
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 24, 2023
84768c3
Merge branch 'staging/cor-richard' of https://github.com/prefeitura-r…
KarinaPassos Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pipelines/rj_cor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"""
from pipelines.rj_cor.bot_semaforo.flows import *
from pipelines.rj_cor.meteorologia.meteorologia_inmet.flows import *
from pipelines.rj_cor.meteorologia.meteorologia_redemet.flows import *
from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import *
from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import *
from pipelines.rj_cor.meteorologia.satelite.flows import *
from pipelines.rj_cor.meteorologia.precipitacao_websirene.flows import *
from pipelines.rj_cor.meteorologia.radar.precipitacao.flows import *
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
"""
Prefect flows for meteorologia_redemet project
"""
###############################################################################
# Automatically managed, please do not touch
###############################################################################
143 changes: 143 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103, E1120
"""
Flows for meteorologia_redemet
"""
from datetime import timedelta

from prefect import case, Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import (
get_dates,
download,
tratar_dados,
salvar_dados,
)
from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule
from pipelines.utils.decorators import Flow
from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
)


with Flow(
name="COR: Meteorologia - Meteorologia REDEMET",
code_owners=[
"richardg867",
"paty",
],
) as cor_meteorologia_meteorologia_redemet:
DATASET_ID = "clima_estacao_meteorologica"
TABLE_ID = "meteorologia_redemet"
DUMP_MODE = "append"

# data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD"
data_inicio = Parameter("data_inicio", default="", required=False)
data_fim = Parameter("data_fim", default="", required=False)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
"materialize_after_dump", default=False, required=False
)
MATERIALIZE_TO_DATARIO = Parameter(
"materialize_to_datario", default=False, required=False
)
MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)

MAXIMUM_BYTES_PROCESSED = Parameter(
"maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim)
# data = slice_data(current_time=CURRENT_TIME)
dados = download(data_inicio_, data_fim_)
dados = tratar_dados(dados, backfill)
PATH = salvar_dados(dados=dados)

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=PATH,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dump_mode=DUMP_MODE,
wait=PATH,
)

# Trigger DBT flow run
with case(MATERIALIZE_AFTER_DUMP, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"mode": MATERIALIZATION_MODE,
"materialize_to_datario": MATERIALIZE_TO_DATARIO,
},
labels=current_flow_labels,
run_name=f"Materialize {DATASET_ID}.{TABLE_ID}",
)

materialization_flow.set_upstream(UPLOAD_TABLE)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(DUMP_TO_GCS, True):
# Trigger Dump to GCS flow run with project id as datario
dump_to_gcs_flow = create_flow_run(
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"project_id": "datario",
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED,
},
labels=[
"datario",
],
run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}",
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

wait_for_dump_to_gcs = wait_for_flow_run(
dump_to_gcs_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)


# para rodar na cloud
cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_meteorologia_redemet.schedule = hour_schedule
21 changes: 21 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Schedules for meteorologia_redemet
Rodar a cada 1 hora
"""
from datetime import timedelta, datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants

hour_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(hours=1),
start_date=datetime(2023, 1, 1, 0, 12, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
),
]
)
201 changes: 201 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
"""
Tasks for meteorologia_redemet
"""
from datetime import timedelta
import json
from pathlib import Path
from typing import Tuple, Union

import pandas as pd
import pendulum
from prefect import task
import requests

from pipelines.constants import constants
from pipelines.utils.utils import (
get_vault_secret,
log,
to_partitions,
parse_date_columns,
)


@task(nout=3)
def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]:
"""
Task para obter o dia de início e o de fim.
Se nenhuma data foi passada a data_inicio corresponde a ontem
e data_fim a hoje e não estamos fazendo backfill.
Caso contrário, retorna as datas inputadas mos parâmetros do flow.
"""
# a API sempre retorna o dado em UTC
log(f"data de inicio e fim antes do if {data_inicio} {data_fim}")
if data_inicio == "":
data_fim = pendulum.now("UTC").format("YYYY-MM-DD")
data_inicio = pendulum.yesterday("UTC").format("YYYY-MM-DD")
backfill = 0
else:
backfill = 1
log(f"data de inicio e fim dps do if {data_inicio} {data_fim}")

return data_inicio, data_fim, backfill


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download(data_inicio: str, data_fim: str) -> pd.DataFrame:
"""
Faz o request na data especificada e retorna dados
"""

# Lista com as estações da cidade do Rio de Janeiro
estacoes_unicas = [
"SBAF",
"SBGL",
"SBJR",
"SBRJ",
"SBSC",
]

dicionario = get_vault_secret("redemet-token")

# Converte datas em int para cálculo de faixas.
data_inicio_int = int(data_inicio.replace("-", ""))
data_fim_int = int(data_fim.replace("-", ""))

raw = []
for id_estacao in estacoes_unicas:
base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa
for data in range(data_inicio_int, data_fim_int + 1):
for hora in range(24):
url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}"
res = requests.get(url)
if res.status_code != 200:
log(f"Problema no id: {id_estacao}, {res.status_code}, {url}")
continue
res_data = json.loads(res.text)
if res_data["status"] is not True:
log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}")
continue
if "data" not in res_data["data"]:
# Sem dados para esse horario
continue
raw.append(res_data)

# Extrai objetos de dados
raw = [res_data["data"] for res_data in raw]

# converte para dados
dados = pd.DataFrame(raw)

return dados


@task
def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame:
"""
Renomeia colunas e filtra dados com a hora do timestamp de execução
"""

drop_cols = ["nome", "cidade", "lon", "lat", "localizacao", "tempoImagem", "metar"]
# Checa se todas estão no df
drop_cols = [c for c in drop_cols if c in dados.columns]

# Remove colunas que já temos os dados em outras tabelas
dados = dados.drop(drop_cols, axis=1)

# Adequando nome das variáveis
rename_cols = {
"localidade": "id_estacao",
"ur": "umidade",
}

dados = dados.rename(columns=rename_cols)

# Converte horário de UTC para America/Sao Paulo
formato = "DD/MM/YYYY HH:mm(z)"
dados["data"] = dados["data"].apply(
lambda x: pendulum.from_format(x, formato)
.in_tz("America/Sao_Paulo")
.format(formato)
)

# Ordenamento de variáveis
chaves_primarias = ["id_estacao", "data"]
demais_cols = [c for c in dados.columns if c not in chaves_primarias]

dados = dados[chaves_primarias + demais_cols]

# Converte variáveis que deveriam ser int para int
dados["temperatura"] = dados["temperatura"].apply(
lambda x: None if x[:-2] == "NIL" else int(x[:-2])
)
dados["umidade"] = dados["umidade"].apply(
lambda x: None if "%" not in x else int(x[:-1])
)

dados["data"] = pd.to_datetime(dados.data, format="%d/%m/%Y %H:%M(%Z)")

# Pegar o dia no nosso timezone como partição
br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD")

# Define colunas que serão salvas
dados = dados[
[
"id_estacao",
"data",
"temperatura",
"umidade",
"condicoes_tempo",
"ceu",
"teto",
"visibilidade",
]
]

# Remover dados duplicados
dados = dados.drop_duplicates(subset=["id_estacao", "data"])

log(f"Dados antes do filtro dia:\n{dados[['id_estacao', 'data']]}")

if not backfill:
# Seleciona apenas dados daquele dia (devido à UTC)
dados = dados[dados["data"].dt.date.astype(str) == br_timezone]

log(f">>>> min hora {dados[~dados.temperatura.isna()].data.min()}")
log(f">>>> max hora {dados[~dados.temperatura.isna()].data.max()}")

# Remover fuso horário
dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S")
dados.rename(columns={"data": "data_medicao"}, inplace=True)

# Capitalizar os dados da coluna céu
dados["ceu"] = dados["ceu"].str.capitalize()

return dados


@task
def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
"""
Salvar dados em csv
"""

prepath = Path("/tmp/meteorologia_redemet/")
prepath.mkdir(parents=True, exist_ok=True)

partition_column = "data_medicao"
dataframe, partitions = parse_date_columns(dados, partition_column)

# Cria partições a partir da data
to_partitions(
data=dataframe,
partition_columns=partitions,
savepath=prepath,
data_type="csv",
)
log(f"[DEBUG] Files saved on {prepath}")
return prepath
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
minute_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=1),
start_date=datetime(2021, 1, 1, 0, 0, 30),
interval=timedelta(minutes=5),
start_date=datetime(2021, 1, 1, 0, 1, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
Expand Down
Loading