Skip to content

Commit

Permalink
Merge pull request #545 from prefeitura-rio/staging/cor-richard
Browse files Browse the repository at this point in the history
teste atualização estações
  • Loading branch information
patriciacatandi authored Dec 18, 2023
2 parents b9b61ca + 19d1af6 commit e63be59
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml
- name: Wait for Docker image to be available
uses: lewagon/wait-on-check-action@v1.1.2
uses: lewagon/wait-on-check-action@v1.3.1
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
check-name: 'Build Docker image'
Expand Down
153 changes: 136 additions & 17 deletions pipelines/rj_cor/meteorologia/meteorologia_redemet/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks import (
check_for_new_stations,
get_dates,
download,
tratar_dados,
salvar_dados,
download_data,
download_stations_data,
treat_data,
treat_stations_data,
save_data,
)
from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import (
hour_schedule,
month_schedule,
)
from pipelines.rj_cor.meteorologia.meteorologia_redemet.schedules import hour_schedule
from pipelines.utils.decorators import Flow
from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants
Expand All @@ -27,21 +33,21 @@
get_current_flow_labels,
)


with Flow(
name="COR: Meteorologia - Meteorologia REDEMET",
code_owners=[
"richardg867",
"paty",
],
) as cor_meteorologia_meteorologia_redemet:
DATASET_ID = "clima_estacao_meteorologica"
TABLE_ID = "meteorologia_redemet"
DUMP_MODE = "append"
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
DATASET_ID = Parameter(
"dataset_id", default="clima_estacao_meteorologica", required=True
)
TABLE_ID = Parameter("table_id", default="meteorologia_redemet", required=True)

# data_inicio e data_fim devem ser strings no formato "YYYY-MM-DD"
data_inicio = Parameter("data_inicio", default="", required=False)
data_fim = Parameter("data_fim", default="", required=False)
# first_date and last_date must be strings as "YYYY-MM-DD"
first_date = Parameter("first_date", default=None, required=False)
last_date = Parameter("last_date", default=None, required=False)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
Expand All @@ -61,11 +67,11 @@
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

data_inicio_, data_fim_, backfill = get_dates(data_inicio, data_fim)
first_date_, last_date_, backfill = get_dates(first_date, last_date)
# data = slice_data(current_time=CURRENT_TIME)
dados = download(data_inicio_, data_fim_)
dados = tratar_dados(dados, backfill)
PATH = salvar_dados(dados=dados)
dataframe = download_data(first_date_, last_date_)
dataframe = treat_data(dataframe, backfill)
PATH = save_data(dataframe=dataframe)

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
Expand Down Expand Up @@ -134,10 +140,123 @@
)


# para rodar na cloud
cor_meteorologia_meteorologia_redemet.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_meteorologia_redemet.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_meteorologia_redemet.schedule = hour_schedule


with Flow(
name="COR: Meteorologia REDEMET - Atualização das estações",
code_owners=[
"karinappassos",
"paty",
],
) as cor_meteorologia_meteorologia_redemet_estacoes:
DUMP_MODE = Parameter("dump_mode", default="overwrite", required=True)
DATASET_ID = Parameter(
"dataset_id", default="clima_estacao_meteorologica", required=True
)
TABLE_ID = Parameter("table_id", default="estacoes_redemet", required=True)

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter(
"materialize_after_dump", default=False, required=False
)
MATERIALIZE_TO_DATARIO = Parameter(
"materialize_to_datario", default=False, required=False
)
MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)

MAXIMUM_BYTES_PROCESSED = Parameter(
"maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

dataframe = download_stations_data()
dataframe = treat_stations_data(dataframe)
path = save_data(dataframe=dataframe, partition_column="data_atualizacao")

# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=path,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dump_mode=DUMP_MODE,
wait=path,
)

# Trigger DBT flow run
with case(MATERIALIZE_AFTER_DUMP, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"mode": MATERIALIZATION_MODE,
"materialize_to_datario": MATERIALIZE_TO_DATARIO,
},
labels=current_flow_labels,
run_name=f"Materialize {DATASET_ID}.{TABLE_ID}",
)

materialization_flow.set_upstream(UPLOAD_TABLE)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(DUMP_TO_GCS, True):
# Trigger Dump to GCS flow run with project id as datario
dump_to_gcs_flow = create_flow_run(
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"project_id": "datario",
"dataset_id": DATASET_ID,
"table_id": TABLE_ID,
"maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED,
},
labels=[
"datario",
],
run_name=f"Dump to GCS {DATASET_ID}.{TABLE_ID}",
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

wait_for_dump_to_gcs = wait_for_flow_run(
dump_to_gcs_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

check_for_new_stations(dataframe, wait=UPLOAD_TABLE)

# para rodar na cloud
cor_meteorologia_meteorologia_redemet_estacoes.storage = GCS(
constants.GCS_FLOWS_BUCKET.value
)
cor_meteorologia_meteorologia_redemet_estacoes.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_meteorologia_redemet_estacoes.schedule = month_schedule
33 changes: 32 additions & 1 deletion pipelines/rj_cor/meteorologia/meteorologia_redemet/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,42 @@
hour_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(hours=1),
interval=timedelta(days=30),
start_date=datetime(2023, 1, 1, 0, 12, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
# "trigger_rain_dashboard_update": True,
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"dump_to_gcs": False,
"dump_mode": "append",
"dataset_id": "clima_estacao_meteorologica",
"table_id": "meteorologia_redemet",
},
),
]
)

month_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=30),
start_date=datetime(2023, 1, 1, 0, 12, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"dump_to_gcs": False,
# "dump_mode": "overwrite",
# "dataset_id": "clima_estacao_meteorologica",
# "table_id": "estacoes_redemet",
},
),
]
)
Loading

0 comments on commit e63be59

Please sign in to comment.