Skip to content

Commit

Permalink
Deploying to gh-pages from @ 3778cb9 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Feb 7, 2024
1 parent a575cf3 commit 6b83b18
Show file tree
Hide file tree
Showing 7 changed files with 785 additions and 239 deletions.
37 changes: 37 additions & 0 deletions rj_cor/meteorologia/precipitacao_alertario/constants.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
The constants for actual values are on rain_dashboard_constants file
&#34;&#34;&#34;

DATASET_ID_PLUVIOMETRIC = &#34;clima_pluviometro&#34;
TABLE_ID_PLUVIOMETRIC = &#34;taxa_precipitacao_alertario_5min&#34;
TABLE_ID_PLUVIOMETRIC_OLD_API = &#34;taxa_precipitacao_alertario&#34;
DATASET_ID_METEOROLOGICAL = &#34;clima_estacao_meteorologica&#34;
TABLE_ID_METEOROLOGICAL = &#34;meteorologia_alertario&#34;

RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS = {
&#34;redis_data_key&#34;: &#34;data_chuva_passado_alertario&#34;,
&#34;redis_update_key&#34;: &#34;data_update_chuva_passado_alertario&#34;,
Expand Down Expand Up @@ -298,6 +304,12 @@ <h2 class="section-title" id="header-classes">Classes</h2>
The constants for actual values are on rain_dashboard_constants file
&#34;&#34;&#34;

DATASET_ID_PLUVIOMETRIC = &#34;clima_pluviometro&#34;
TABLE_ID_PLUVIOMETRIC = &#34;taxa_precipitacao_alertario_5min&#34;
TABLE_ID_PLUVIOMETRIC_OLD_API = &#34;taxa_precipitacao_alertario&#34;
DATASET_ID_METEOROLOGICAL = &#34;clima_estacao_meteorologica&#34;
TABLE_ID_METEOROLOGICAL = &#34;meteorologia_alertario&#34;

RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS = {
&#34;redis_data_key&#34;: &#34;data_chuva_passado_alertario&#34;,
&#34;redis_update_key&#34;: &#34;data_update_chuva_passado_alertario&#34;,
Expand Down Expand Up @@ -533,10 +545,30 @@ <h3>Ancestors</h3>
</ul>
<h3>Class variables</h3>
<dl>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_METEOROLOGICAL"><code class="name">var <span class="ident">DATASET_ID_METEOROLOGICAL</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_PLUVIOMETRIC"><code class="name">var <span class="ident">DATASET_ID_PLUVIOMETRIC</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS"><code class="name">var <span class="ident">RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_METEOROLOGICAL"><code class="name">var <span class="ident">TABLE_ID_METEOROLOGICAL</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC"><code class="name">var <span class="ident">TABLE_ID_PLUVIOMETRIC</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC_OLD_API"><code class="name">var <span class="ident">TABLE_ID_PLUVIOMETRIC_OLD_API</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
</dl>
</dd>
</dl>
Expand Down Expand Up @@ -604,7 +636,12 @@ <h1>Index</h1>
<li>
<h4><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants">constants</a></code></h4>
<ul class="">
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_METEOROLOGICAL" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_METEOROLOGICAL">DATASET_ID_METEOROLOGICAL</a></code></li>
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_PLUVIOMETRIC" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.DATASET_ID_PLUVIOMETRIC">DATASET_ID_PLUVIOMETRIC</a></code></li>
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS">RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS</a></code></li>
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_METEOROLOGICAL" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_METEOROLOGICAL">TABLE_ID_METEOROLOGICAL</a></code></li>
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC">TABLE_ID_PLUVIOMETRIC</a></code></li>
<li><code><a title="pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC_OLD_API" href="#pipelines.rj_cor.meteorologia.precipitacao_alertario.constants.constants.TABLE_ID_PLUVIOMETRIC_OLD_API">TABLE_ID_PLUVIOMETRIC_OLD_API</a></code></li>
</ul>
</li>
</ul>
Expand Down
198 changes: 174 additions & 24 deletions rj_cor/meteorologia/precipitacao_alertario/flows.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
)
from pipelines.rj_cor.meteorologia.precipitacao_alertario.tasks import (
check_to_run_dbt,
tratar_dados,
salvar_dados,
download_data,
treat_old_pluviometer,
treat_pluviometer_and_meteorological_data,
save_data,
save_data_old,
save_last_dbt_update,
)
from pipelines.rj_cor.meteorologia.precipitacao_alertario.schedules import (
Expand All @@ -71,17 +74,28 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
)

with Flow(
name=&#34;COR: Meteorologia - Precipitacao ALERTARIO&#34;,
name=&#34;COR: Meteorologia - Precipitacao e Meteorologia ALERTARIO&#34;,
code_owners=[
&#34;paty&#34;,
],
# skip_if_running=True,
) as cor_meteorologia_precipitacao_alertario:
DATASET_ID = &#34;clima_pluviometro&#34;
TABLE_ID = &#34;taxa_precipitacao_alertario&#34;
DATASET_ID_PLUVIOMETRIC = alertario_constants.DATASET_ID_PLUVIOMETRIC.value
TABLE_ID_PLUVIOMETRIC = alertario_constants.TABLE_ID_PLUVIOMETRIC.value
TABLE_ID_PLUVIOMETRIC_OLD_API = (
alertario_constants.TABLE_ID_PLUVIOMETRIC_OLD_API.value
)
DATASET_ID_METEOROLOGICAL = alertario_constants.DATASET_ID_METEOROLOGICAL.value
TABLE_ID_METEOROLOGICAL = alertario_constants.TABLE_ID_METEOROLOGICAL.value
DUMP_MODE = &#34;append&#34;

# Materialization parameters
MATERIALIZE_AFTER_DUMP_OLD_API = Parameter(
&#34;materialize_after_dump_old_api&#34;, default=False, required=False
)
MATERIALIZE_TO_DATARIO_OLD_API = Parameter(
&#34;materialize_to_datario_old_api&#34;, default=False, required=False
)
MATERIALIZE_AFTER_DUMP = Parameter(
&#34;materialize_after_dump&#34;, default=False, required=False
)
Expand All @@ -102,21 +116,37 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

dados, empty_data = tratar_dados(
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dfr_pluviometric, dfr_meteorological = download_data()
(
dfr_pluviometric,
empty_data_pluviometric,
) = treat_pluviometer_and_meteorological_data(
dfr=dfr_pluviometric,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC,
mode=MATERIALIZATION_MODE,
)
(
dfr_meteorological,
empty_data_meteorological,
) = treat_pluviometer_and_meteorological_data(
dfr=dfr_meteorological,
dataset_id=DATASET_ID_METEOROLOGICAL,
table_id=TABLE_ID_METEOROLOGICAL,
mode=MATERIALIZATION_MODE,
)

with case(empty_data, False):
path = salvar_dados(dados=dados)
with case(empty_data_pluviometric, False):
path_pluviometric = save_data(
dfr_pluviometric, &#34;pluviometric&#34;, wait=empty_data_pluviometric
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=path,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
data_path=path_pluviometric,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC,
dump_mode=DUMP_MODE,
wait=path,
wait=path_pluviometric,
)

with case(TRIGGER_RAIN_DASHBOARD_UPDATE, True):
Expand Down Expand Up @@ -158,9 +188,59 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
raise_final_state=False,
)

# Treat data to save on old table
dfr_pluviometric_old_api = treat_old_pluviometer(dfr_pluviometric)

path_pluviometric_old_api = save_data_old(
dfr_pluviometric_old_api,
&#34;pluviometric_old_api&#34;,
wait=dfr_pluviometric_old_api,
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=path_pluviometric_old_api,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC_OLD_API,
dump_mode=DUMP_MODE,
wait=path_pluviometric_old_api,
)

# Trigger DBT flow run
with case(MATERIALIZE_AFTER_DUMP_OLD_API, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
&#34;dataset_id&#34;: DATASET_ID_PLUVIOMETRIC,
&#34;table_id&#34;: TABLE_ID_PLUVIOMETRIC_OLD_API,
&#34;mode&#34;: MATERIALIZATION_MODE,
&#34;materialize_to_datario&#34;: MATERIALIZE_TO_DATARIO_OLD_API,
},
labels=current_flow_labels,
run_name=f&#34;Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC_OLD_API}&#34;,
)

materialization_flow.set_upstream(current_flow_labels)

wait_for_materialization = wait_for_flow_run_with_5min_timeout(
flow_run_id=materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

# Trigger DBT for new API
run_dbt = check_to_run_dbt(
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC,
mode=MATERIALIZATION_MODE,
)
run_dbt.set_upstream(UPLOAD_TABLE)
Expand All @@ -173,13 +253,13 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
&#34;dataset_id&#34;: DATASET_ID,
&#34;table_id&#34;: TABLE_ID,
&#34;dataset_id&#34;: DATASET_ID_PLUVIOMETRIC,
&#34;table_id&#34;: TABLE_ID_PLUVIOMETRIC,
&#34;mode&#34;: MATERIALIZATION_MODE,
&#34;materialize_to_datario&#34;: MATERIALIZE_TO_DATARIO,
},
labels=current_flow_labels,
run_name=f&#34;Materialize {DATASET_ID}.{TABLE_ID}&#34;,
run_name=f&#34;Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}&#34;,
)

current_flow_labels.set_upstream(run_dbt)
Expand All @@ -193,8 +273,8 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
)

last_dbt_update = save_last_dbt_update(
dataset_id=DATASET_ID,
table_id=TABLE_ID,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC,
mode=MATERIALIZATION_MODE,
wait=wait_for_materialization,
)
Expand All @@ -213,14 +293,84 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
&#34;project_id&#34;: &#34;datario&#34;,
&#34;dataset_id&#34;: DATASET_ID,
&#34;table_id&#34;: TABLE_ID,
&#34;dataset_id&#34;: DATASET_ID_PLUVIOMETRIC,
&#34;table_id&#34;: TABLE_ID_PLUVIOMETRIC,
&#34;maximum_bytes_processed&#34;: MAXIMUM_BYTES_PROCESSED,
},
labels=[
&#34;datario&#34;,
],
run_name=f&#34;Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}&#34;,
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

wait_for_dump_to_gcs = wait_for_flow_run_with_5min_timeout(
flow_run_id=dump_to_gcs_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Save and materialize meteorological data
with case(empty_data_meteorological, False):
path_meteorological = save_data(
dfr_meteorological, &#34;meteorological&#34;, wait=empty_data_meteorological
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
data_path=path_meteorological,
dataset_id=DATASET_ID_METEOROLOGICAL,
table_id=TABLE_ID_METEOROLOGICAL,
dump_mode=DUMP_MODE,
wait=path_meteorological,
)

with case(MATERIALIZE_AFTER_DUMP, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
&#34;dataset_id&#34;: DATASET_ID_METEOROLOGICAL,
&#34;table_id&#34;: TABLE_ID_METEOROLOGICAL,
&#34;mode&#34;: MATERIALIZATION_MODE,
&#34;materialize_to_datario&#34;: MATERIALIZE_TO_DATARIO,
},
labels=current_flow_labels,
run_name=f&#34;Materialize {DATASET_ID_METEOROLOGICAL}.{TABLE_ID_METEOROLOGICAL}&#34;,
)

materialization_flow.set_upstream(current_flow_labels)

wait_for_materialization = wait_for_flow_run_with_5min_timeout(
flow_run_id=materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(DUMP_TO_GCS, True):
# Trigger Dump to GCS flow run with project id as datario
dump_to_gcs_flow = create_flow_run(
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
&#34;project_id&#34;: &#34;datario&#34;,
&#34;dataset_id&#34;: DATASET_ID_METEOROLOGICAL,
&#34;table_id&#34;: TABLE_ID_METEOROLOGICAL,
&#34;maximum_bytes_processed&#34;: MAXIMUM_BYTES_PROCESSED,
},
labels=[
&#34;datario&#34;,
],
run_name=f&#34;Dump to GCS {DATASET_ID}.{TABLE_ID}&#34;,
run_name=f&#34;Dump to GCS {DATASET_ID_METEOROLOGICAL}.{TABLE_ID_METEOROLOGICAL}&#34;,
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

Expand Down
8 changes: 5 additions & 3 deletions rj_cor/meteorologia/precipitacao_alertario/schedules.html
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
# &#34;trigger_rain_dashboard_update&#34;: True,
&#34;materialize_after_dump&#34;: True,
&#34;trigger_rain_dashboard_update&#34;: True,
&#34;materialize_after_dump_old_api&#34;: True,
&#34;materialize_to_datario_old_api&#34;: True,
&#34;materialize_after_dump&#34;: False,
&#34;materialize_to_datario&#34;: False,
&#34;mode&#34;: &#34;prod&#34;,
&#34;materialize_to_datario&#34;: True,
&#34;dump_to_gcs&#34;: False,
},
),
Expand Down
Loading

0 comments on commit 6b83b18

Please sign in to comment.