Skip to content

Commit

Permalink
teste atualização estações
Browse files Browse the repository at this point in the history
  • Loading branch information
KarinaPassos committed Oct 27, 2023
1 parent 6e55508 commit e3d068c
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,55 @@
)


# Código não finalizado; flow de atualização das estações
with Flow(
name="COR: Meteorologia - Atualização das estações",
code_owners=[
"karinappassos",
"paty",
],
) as cor_meteorologia_meteorologia_redemet:
DATASET_ID = "clima_estacao_meteorologica"
TABLE_ID = "meteorologia_redemet"
DUMP_MODE = "append"

# data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD"
data_inicio = Parameter("data_inicio", default="", required=False)
data_fim = Parameter("data_fim", default="", required=False)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
"materialize_after_dump", default=False, required=False
)
MATERIALIZE_TO_DATARIO = Parameter(
"materialize_to_datario", default=False, required=False
)
MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)

MAXIMUM_BYTES_PROCESSED = Parameter(
"maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim)
# data = slice_data(current_time=CURRENT_TIME)
dados = tratar_dados_estacao(data_inicio_, data_fim_)
PATH = salvar_dados(dados=dados)

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=PATH,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dump_mode=DUMP_MODE,
wait=PATH,
)


# para rodar na cloud
cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun(
Expand Down
66 changes: 66 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,69 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
)
log(f"[DEBUG] Files saved on {prepath}")
return prepath


@task
def tratar_dados_estacao(data_inicio: str, data_fim: str) -> pd.DataFrame:
# Lista com as estações da cidade do Rio de Janeiro
estacoes_unicas = [
"SBAF",
"SBGL",
"SBJR",
"SBRJ",
"SBSC",
]

dicionario = get_vault_secret("redemet-token")

# Converte datas em int para cálculo de faixas.
data_inicio_int = int(data_inicio.replace("-", ""))
data_fim_int = int(data_fim.replace("-", ""))

raw = []
for id_estacao in estacoes_unicas:
base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={dicionario['data']['token']}" # noqa
for data in range(data_inicio_int, data_fim_int + 1):
for hora in range(24):
url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}"
res = requests.get(url)
if res.status_code != 200:
log(f"Problema no id: {id_estacao}, {res.status_code}, {url}")
continue
res_data = json.loads(res.text)
if res_data["status"] is not True:
log(f"Problema no id: {id_estacao}, {res_data['message']}, {url}")
continue
if "data" not in res_data["data"]:
# Sem dados para esse horario
continue
raw.append(res_data)

# Função para converter longitude de graus, minutos, segundos para decimal
res_data["latitude"] = res_data["lat"].apply(converter_lat_lon)
res_data["longitude"] = res_data["lon"].apply(converter_lat_lon)
return dados



def converter_lat_lon(longitude_str):
longitude_str = longitude_str.replace("º", "/").replace("''", "/").replace("'", "/")

# Divida a string com base nos espaços em branco
partes = longitude_str.split("/")
# print(partes)

# Extraia os graus, minutos e segundos da lista de partes
graus = int(partes[0])
minutos = int(partes[1])
segundos = float(partes[2])

# Calcule o valor decimal
decimal = graus + (minutos / 60) + (segundos / 3600)

# Verifique se a direção é Oeste (W) e faça o valor negativo
# Verifique se a direção é Norte (N) e retorne o valor decimal
if ("W" in partes[3].upper()) | ("S" in partes[3].upper()):
decimal = -decimal

return decimal

0 comments on commit e3d068c

Please sign in to comment.