Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

teste atualização estações #545

Merged
merged 75 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
e3d068c
teste atualização estações
KarinaPassos Oct 27, 2023
cb89b2c
teste atualização estações
KarinaPassos Oct 27, 2023
1524f50
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 27, 2023
954fb74
Merge branch 'master' into staging/cor-richard
mergify[bot] Oct 27, 2023
a46fbad
testando flow
KarinaPassos Oct 30, 2023
c9b1d02
testando flow
KarinaPassos Oct 30, 2023
a94cc94
testando flow
KarinaPassos Oct 30, 2023
be004f3
testando flow
KarinaPassos Oct 31, 2023
3bde70e
testando flow
KarinaPassos Oct 31, 2023
4679df3
testando flow
KarinaPassos Oct 31, 2023
e839ecc
testando flow
KarinaPassos Oct 31, 2023
c53cd72
testando flow
KarinaPassos Oct 31, 2023
97066c9
testando flow
KarinaPassos Oct 31, 2023
29434df
testando flow
KarinaPassos Oct 31, 2023
a105b7c
testando flow
KarinaPassos Oct 31, 2023
08a514b
testando flow
KarinaPassos Oct 31, 2023
717c86e
testando flow
KarinaPassos Oct 31, 2023
8f683ac
testando flow
KarinaPassos Oct 31, 2023
48a7db3
testando flow
KarinaPassos Oct 31, 2023
8a19f6e
testando flow
KarinaPassos Nov 1, 2023
bf468ff
testando flow
KarinaPassos Nov 1, 2023
d89eb87
testando flow
KarinaPassos Nov 1, 2023
d9f6132
testando flow
KarinaPassos Nov 1, 2023
2e65fe7
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 1, 2023
5046b2b
Merge branch 'staging/cor-richard' of https://github.com/prefeitura-r…
KarinaPassos Nov 7, 2023
84cb335
testando flow
KarinaPassos Nov 7, 2023
0eb518c
testando flow
KarinaPassos Nov 7, 2023
d779cbc
testando flow
KarinaPassos Nov 7, 2023
e4f67c9
testando flow
KarinaPassos Nov 8, 2023
9f712e9
testando flow
KarinaPassos Nov 8, 2023
69b99a2
testando flow
KarinaPassos Nov 8, 2023
1309b92
testando flow
KarinaPassos Nov 8, 2023
3bfda35
testando flow
KarinaPassos Nov 8, 2023
739be2e
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 8, 2023
47adba9
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 9, 2023
9d189a5
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 9, 2023
90199f7
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 9, 2023
071096c
Merge branch 'staging/cor-richard' of https://github.com/prefeitura-r…
KarinaPassos Nov 9, 2023
e81be8e
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 9, 2023
50cf1d8
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 10, 2023
21080ba
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 13, 2023
f8f1870
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 13, 2023
155564f
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 13, 2023
65e27ea
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 13, 2023
e1db6b5
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 13, 2023
519efb6
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 14, 2023
e237322
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 14, 2023
919849b
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 15, 2023
324728d
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 15, 2023
cd2aceb
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 15, 2023
af6d73a
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 19, 2023
1ba31be
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 19, 2023
93f100f
Merge branch 'master' into staging/cor-richard
mergify[bot] Nov 24, 2023
35323cd
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 11, 2023
16139be
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 12, 2023
6be38d8
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 12, 2023
6b26127
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 12, 2023
aa3a0dd
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 13, 2023
34b899c
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 14, 2023
7db6d3b
Merge branch 'master' into staging/cor-richard
mergify[bot] Dec 14, 2023
27e5277
modifying redemet stations
patriciacatandi Dec 15, 2023
6bdb437
Merge branch 'staging/cor-richard' of github.com:prefeitura-rio/pipel…
patriciacatandi Dec 15, 2023
0437792
uptating Wait On Check Action
thiago-felipe-99 Dec 15, 2023
70fb6fb
Update schedules.py
patriciacatandi Dec 15, 2023
2ea96a5
changing scheduler
patriciacatandi Dec 15, 2023
034442d
Merge branch 'staging/cor-richard' of github.com:prefeitura-rio/pipel…
patriciacatandi Dec 15, 2023
337dd56
adding verification for changes on cemaden stations
patriciacatandi Dec 15, 2023
e424540
removing url from log
patriciacatandi Dec 15, 2023
4090543
bugfix alertario
patriciacatandi Dec 15, 2023
3968029
bugfix redemet
patriciacatandi Dec 15, 2023
1319877
bugfix cemaden and redemet
patriciacatandi Dec 15, 2023
499935a
bugfix cemaden and redemet
patriciacatandi Dec 15, 2023
d6ee28a
bugfix cemaden and redemet
patriciacatandi Dec 15, 2023
68b2b54
temporary remove failed if new stations from cemaden
patriciacatandi Dec 15, 2023
19d1af6
changing stations cemaden
patriciacatandi Dec 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 136 additions & 16 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import (
check_for_new_stations,
get_dates,
download,
tratar_dados,
salvar_dados,
download_data,
download_stations_data,
treat_data,
treat_stations_data,
save_data,
)
from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import (
hour_schedule,
month_schedule,
)
from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule
from pipelines.utils.decorators import Flow
from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants
Expand All @@ -27,21 +33,22 @@
get_current_flow_labels,
)


with Flow(
name="COR: Meteorologia - Meteorologia REDEMET",
code_owners=[
"richardg867",
"paty",
],
) as cor_meteorologia_meteorologia_redemet:
DATASET_ID = "clima_estacao_meteorologica"
TABLE_ID = "meteorologia_redemet"
DUMP_MODE = "append"
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
DATASET_ID = Parameter(
"dataset_id", default="clima_estacao_meteorologica", required=True
)
TABLE_ID = Parameter("table_id", default="meteorologia_redemet", required=True)

# data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD"
data_inicio = Parameter("data_inicio", default="", required=False)
data_fim = Parameter("data_fim", default="", required=False)
# first_date and last_date must be strings as "YYYY-MM-DD"
first_date = Parameter("first_date", default=None, required=False)
last_date = Parameter("last_date", default=None, required=False)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
Expand All @@ -61,11 +68,11 @@
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim)
first_date_, last_date_, backfill = get_dates(first_date, last_date)
# data = slice_data(current_time=CURRENT_TIME)
dados = download(data_inicio_, data_fim_)
dados = tratar_dados(dados, backfill)
PATH = salvar_dados(dados=dados)
dados = download_data(first_date_, last_date_)
dados = treat_data(dados, backfill)
PATH = save_data(dados=dados)

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
Expand Down Expand Up @@ -134,10 +141,123 @@
)


# para rodar na cloud
cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_meteorologia_redemet.schedule = hour_schedule


with Flow(
name="COR: Meteorologia REDEMET - Atualização das estações",
code_owners=[
"karinappassos",
"paty",
],
) as cor_meteorologia_meteorologia_redemet_estacoes:
DUMP_MODE = Parameter("dump_mode", default="overwrite", required=True)
DATASET_ID = Parameter(
"dataset_id", default="clima_estacao_meteorologica", required=True
)
TABLE_ID = Parameter("table_id", default="estacoes_redemet", required=True)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
"materialize_after_dump", default=False, required=False
)
MATERIALIZE_TO_DATARIO = Parameter(
"materialize_to_datario", default=False, required=False
)
MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)

MAXIMUM_BYTES_PROCESSED = Parameter(
"maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

dataframe = download_stations_data()
dataframe = treat_stations_data(dataframe)
path = save_data(dataframe=dataframe, partition_column="data_alteracao")

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=path,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dump_mode=DUMP_MODE,
wait=path,
)

# Trigger DBT flow run
with case(MATERIALIZE_AFTER_DUMP, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"mode": MATERIALIZATION_MODE,
"materialize_to_datario": MATERIALIZE_TO_DATARIO,
},
labels=current_flow_labels,
run_name=f"Materialize {DATASET_ID}.{TABLE_ID}",
)

materialization_flow.set_upstream(UPLOAD_TABLE)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(DUMP_TO_GCS, True):
# Trigger Dump to GCS flow run with project id as datario
dump_to_gcs_flow = create_flow_run(
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"project_id": "datario",
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED,
},
labels=[
"datario",
],
run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}",
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

wait_for_dump_to_gcs = wait_for_flow_run(
dump_to_gcs_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

check_for_new_stations(dataframe)

# para rodar na cloud
cor_meteorologia_meteorologia_redemet_estacoes.storage = GCS(
constants.GCS_FLOWS_BUCKET.value
)
cor_meteorologia_meteorologia_redemet_estacoes.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_meteorologia_redemet_estacoes.schedule = month_schedule
31 changes: 31 additions & 0 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,37 @@
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
# "trigger_rain_dashboard_update": True,
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"dump_to_gcs": False,
"dump_mode": "append",
"dataset_id": "clima_estacao_meteorologica",
"table_id": "meteorologia_redemet",
},
),
]
)

month_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(months=1),
start_date=datetime(2023, 1, 1, 0, 12, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"dump_to_gcs": False,
# "dump_mode": "overwrite",
# "dataset_id": "clima_estacao_meteorologica",
# "table_id": "estacoes_redemet",
},
),
]
)
Loading
Loading