Skip to content

Commit

Permalink
Merge branch 'master' into staging/cor-richard
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Oct 10, 2023
2 parents d165f9b + 5a95a5a commit b7360b5
Show file tree
Hide file tree
Showing 22 changed files with 1,927 additions and 313 deletions.
2 changes: 2 additions & 0 deletions code_owners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pipelines:
- fernandascovino
- eng-rodrigocunha
- borismarinho
- pixuimpou
- lingsv
rj_escritorio:
owners:
- gabriel-milan
Expand Down
8 changes: 8 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ class constants(Enum): # pylint: disable=c0103
"user_id": "369657115012366336",
"type": "user_nickname",
},
"rafaelpinheiro": {
"user_id": "1131538976101109772",
"type": "user_nickname",
},
"carolinagomes": {
"user_id": "620000269392019469",
"type": "user_nickname",
},
}
1 change: 1 addition & 0 deletions pipelines/rj_escritorio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pipelines.rj_escritorio.waze.flows import *
from pipelines.rj_escritorio.geolocator.flows import *
from pipelines.rj_escritorio.inea.flows import *
from pipelines.rj_escritorio.dump_ftp_inea.flows import *
from pipelines.rj_escritorio.seconserva_buracos_refresh_data.flows import *
from pipelines.rj_escritorio.dump_url_turismo.flows import *
from pipelines.rj_escritorio.dump_policy_matrix.flows import *
Expand Down
152 changes: 152 additions & 0 deletions pipelines/rj_escritorio/dump_ftp_inea/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
"""
Dumping data from INEA FTP to BigQuery
"""
# pylint: disable=E1101,C0103,bad-continuation

from copy import deepcopy

from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped

from pipelines.constants import constants
from pipelines.rj_escritorio.dump_ftp_inea.tasks import (
get_ftp_client,
get_files_datalake,
get_files_from_ftp,
download_files,
select_files_to_download,
upload_file_to_gcs,
)
from pipelines.rj_escritorio.dump_ftp_inea.schedules import (
every_5_minutes,
every_5_minutes_mac,
every_1_day,
every_1_day_mac,
)
from pipelines.rj_cor.tasks import get_on_redis, save_on_redis
from pipelines.utils.decorators import Flow


with Flow(
"INEA: Captura FTP dados de radar (Guaratiba)", code_owners=["paty"]
) as inea_ftp_radar_flow:
bucket_name = Parameter("bucket_name", default="rj-escritorio-dev", required=False)
date = Parameter("date", default=None, required=False)
get_only_last_file = Parameter("get_only_last_file", default=True, required=False)
greater_than = Parameter("greater_than", default=None, required=False)
check_datalake_files = Parameter(
"check_datalake_files", default=True, required=False
)
prefix = Parameter(
"prefix", default="raw/meio_ambiente_clima/inea_radar_hdf5", required=False
)
mode = Parameter("mode", default="prod", required=False)
radar = Parameter("radar", default="gua", required=False)
product = Parameter("product", default="ppi", required=False)

client = get_ftp_client()

files = get_files_from_ftp(
client=client,
radar=radar,
)

redis_files = get_on_redis(
dataset_id="meio_ambiente_clima",
table_id=radar,
mode=mode,
wait=files,
)

datalake_files = get_files_datalake(
bucket_name=bucket_name,
prefix=prefix,
radar=radar,
product=product,
date=date,
greater_than=greater_than,
check_datalake_files=check_datalake_files,
mode=mode,
wait=files,
)

files_to_download = select_files_to_download(
files=files,
redis_files=redis_files,
datalake_files=datalake_files,
date=date,
greater_than=greater_than,
get_only_last_file=get_only_last_file,
)

files_to_upload = download_files(
client=client, files=files_to_download, radar=radar
)

upload_files = upload_file_to_gcs.map(
file_to_upload=files_to_upload,
bucket_name=unmapped(bucket_name),
prefix=unmapped(prefix),
mode=unmapped(mode),
radar=unmapped(radar),
product=unmapped(product),
)

save_on_redis(
dataset_id="meio_ambiente_clima",
table_id=radar,
mode=mode,
files=files_to_upload,
keep_last=14400, # last 30 days files
wait=upload_files,
)

inea_ftp_radar_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
inea_ftp_radar_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value],
)
inea_ftp_radar_flow.schedule = every_5_minutes

inea_ftp_radar_flow_mac = deepcopy(inea_ftp_radar_flow)
inea_ftp_radar_flow_mac.name = "INEA: Captura FTP dados de radar (Macaé)"
inea_ftp_radar_flow_mac.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
inea_ftp_radar_flow_mac.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value],
)
inea_ftp_radar_flow_mac.schedule = every_5_minutes_mac

inea_ftp_radar_flow_fill_missing = deepcopy(inea_ftp_radar_flow)
inea_ftp_radar_flow_fill_missing.name = (
"INEA: Captura FTP dados de radar (Guaratiba): preenchimento de arquivos faltantes"
)
inea_ftp_radar_flow_fill_missing.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
inea_ftp_radar_flow_fill_missing.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value],
)
inea_ftp_radar_flow_fill_missing.schedule = every_1_day

inea_ftp_radar_flow_fill_missing_mac = deepcopy(inea_ftp_radar_flow)
inea_ftp_radar_flow_fill_missing_mac.name = (
"INEA: Captura FTP dados de radar (Macaé): preenchimento de arquivos faltantes"
)
inea_ftp_radar_flow_fill_missing_mac.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
inea_ftp_radar_flow_fill_missing_mac.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value],
)
inea_ftp_radar_flow_fill_missing_mac.schedule = every_1_day_mac

inea_ftp_backfill_radar_flow = deepcopy(inea_ftp_radar_flow)
inea_ftp_backfill_radar_flow.name = "INEA: Captura dados de radar (backfill)"
inea_ftp_backfill_radar_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
inea_ftp_backfill_radar_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value],
)
inea_ftp_backfill_radar_flow.schedule = None
92 changes: 92 additions & 0 deletions pipelines/rj_escritorio/dump_ftp_inea/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
"""
Schedules for the INEA flows.
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
import pytz

from pipelines.constants import constants

every_5_minutes = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=5),
start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[constants.INEA_AGENT_LABEL.value],
parameter_defaults={
"bucket_name": "rj-escritorio-dev",
"convert_params": "-k=ODIM2.1 -M=All",
"mode": "prod",
"output_format": "HDF5",
"prefix": "raw/meio_ambiente_clima/inea_radar_hdf5",
"product": "ppi",
"radar": "gua",
"vols_remote_directory": "/var/opt/edge/vols",
},
)
]
)
every_5_minutes_mac = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=5),
start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[constants.INEA_AGENT_LABEL.value],
parameter_defaults={
"bucket_name": "rj-escritorio-dev",
"convert_params": "-k=ODIM2.1 -M=All",
"mode": "prod",
"output_format": "HDF5",
"prefix": "raw/meio_ambiente_clima/inea_radar_hdf5",
"product": "ppi",
"radar": "mac",
"vols_remote_directory": "/var/opt/edge/vols",
},
)
]
)
every_1_day = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[constants.INEA_AGENT_LABEL.value],
parameter_defaults={
"bucket_name": "rj-escritorio-dev",
"convert_params": "-k=ODIM2.1 -M=All",
"mode": "prod",
"output_format": "HDF5",
"prefix": "raw/meio_ambiente_clima/inea_radar_hdf5",
"product": "ppi",
"radar": "gua",
"get_only_last_file": False,
"vols_remote_directory": "/var/opt/edge/vols",
},
)
]
)
every_1_day_mac = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[constants.INEA_AGENT_LABEL.value],
parameter_defaults={
"bucket_name": "rj-escritorio-dev",
"convert_params": "-k=ODIM2.1 -M=All",
"mode": "prod",
"output_format": "HDF5",
"prefix": "raw/meio_ambiente_clima/inea_radar_hdf5",
"product": "ppi",
"radar": "mac",
"get_only_last_file": False,
"vols_remote_directory": "/var/opt/edge/vols",
},
)
]
)
Loading

0 comments on commit b7360b5

Please sign in to comment.