Skip to content

Commit

Permalink
Deploying to gh-pages from @ cb688f4 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Feb 8, 2024
1 parent 6b83b18 commit 1ebb3a6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
2 changes: 0 additions & 2 deletions rj_cor/comando/eventos/tasks.html
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ <h1 class="title">Module <code>pipelines.rj_cor.comando.eventos.tasks</code></h1

categorical_cols = [
&#34;bairro&#34;,
&#34;prazo&#34;,
&#34;descricao&#34;,
&#34;gravidade&#34;,
&#34;status&#34;,
Expand Down Expand Up @@ -683,7 +682,6 @@ <h2 class="section-title" id="header-functions">Functions</h2>

categorical_cols = [
&#34;bairro&#34;,
&#34;prazo&#34;,
&#34;descricao&#34;,
&#34;gravidade&#34;,
&#34;status&#34;,
Expand Down
12 changes: 7 additions & 5 deletions rj_cor/meteorologia/precipitacao_alertario/flows.html
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,25 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
)

# Treat data to save on old table
dfr_pluviometric_old_api = treat_old_pluviometer(dfr_pluviometric)
dfr_pluviometric_old_api = treat_old_pluviometer(
dfr_pluviometric, wait=UPLOAD_TABLE
)

path_pluviometric_old_api = save_data_old(
dfr_pluviometric_old_api,
&#34;pluviometric_old_api&#34;,
wait=dfr_pluviometric_old_api,
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
# Create table in BigQuery for old table
UPLOAD_TABLE_OLD_API = create_table_and_upload_to_gcs(
data_path=path_pluviometric_old_api,
dataset_id=DATASET_ID_PLUVIOMETRIC,
table_id=TABLE_ID_PLUVIOMETRIC_OLD_API,
dump_mode=DUMP_MODE,
wait=path_pluviometric_old_api,
)

# Trigger DBT flow run
# Trigger DBT flow run for old table
with case(MATERIALIZE_AFTER_DUMP_OLD_API, True):
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
Expand Down Expand Up @@ -317,7 +319,7 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
dfr_meteorological, &#34;meteorological&#34;, wait=empty_data_meteorological
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
UPLOAD_TABLE_METEOROLOGICAL = create_table_and_upload_to_gcs(
data_path=path_meteorological,
dataset_id=DATASET_ID_METEOROLOGICAL,
table_id=TABLE_ID_METEOROLOGICAL,
Expand Down
2 changes: 1 addition & 1 deletion rj_cor/meteorologia/precipitacao_alertario/schedules.html
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
minute_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=3),
interval=timedelta(minutes=2),
start_date=datetime(2021, 1, 1, 0, 1, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
Expand Down
32 changes: 29 additions & 3 deletions rj_cor/meteorologia/precipitacao_alertario/tasks.html
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,18 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def treat_old_pluviometer(dfr: pd.DataFrame) -&gt; pd.DataFrame:
def treat_old_pluviometer(
dfr: pd.DataFrame,
wait=None, # pylint: disable=unused-argument
) -&gt; pd.DataFrame:
&#34;&#34;&#34;
Renomeia colunas no estilo do antigo flow.
&#34;&#34;&#34;

log(
f&#34;Starting treating pluviometer data to match old API.\
Pluviometer table enter as\n{dfr.iloc[0]}&#34;
)
rename_cols = {
&#34;acumulado_chuva_15min&#34;: &#34;acumulado_chuva_15_min&#34;,
&#34;acumulado_chuva_1h&#34;: &#34;acumulado_chuva_1_h&#34;,
Expand Down Expand Up @@ -347,6 +354,10 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
&#34;acumulado_chuva_96_h&#34;,
]
]
log(
f&#34;Ending treating pluviometer data to match old API.\
Pluviometer table finished as\n{dfr.iloc[0]}&#34;
)
return dfr


Expand All @@ -363,9 +374,11 @@ <h1 class="title">Module <code>pipelines.rj_cor.meteorologia.precipitacao_alerta
prepath = Path(f&#34;/tmp/precipitacao_alertario/{data_name}&#34;)
prepath.mkdir(parents=True, exist_ok=True)

log(f&#34;Dataframe before partitions old api {dfr.iloc[0]}&#34;)
partition_column = &#34;data_medicao&#34;
dataframe, partitions = parse_date_columns_old_api(dfr, partition_column)
current_time = pendulum.now(&#34;America/Sao_Paulo&#34;).strftime(&#34;%Y%m%d%H%M&#34;)
log(f&#34;Dataframe after partitions old api {dataframe.iloc[0]}&#34;)

to_partitions(
data=dataframe,
Expand Down Expand Up @@ -544,9 +557,11 @@ <h2 class="section-title" id="header-functions">Functions</h2>
prepath = Path(f&#34;/tmp/precipitacao_alertario/{data_name}&#34;)
prepath.mkdir(parents=True, exist_ok=True)

log(f&#34;Dataframe before partitions old api {dfr.iloc[0]}&#34;)
partition_column = &#34;data_medicao&#34;
dataframe, partitions = parse_date_columns_old_api(dfr, partition_column)
current_time = pendulum.now(&#34;America/Sao_Paulo&#34;).strftime(&#34;%Y%m%d%H%M&#34;)
log(f&#34;Dataframe after partitions old api {dataframe.iloc[0]}&#34;)

to_partitions(
data=dataframe,
Expand Down Expand Up @@ -588,7 +603,7 @@ <h2 class="section-title" id="header-functions">Functions</h2>
</details>
</dd>
<dt id="pipelines.rj_cor.meteorologia.precipitacao_alertario.tasks.treat_old_pluviometer"><code class="name flex">
<span>def <span class="ident">treat_old_pluviometer</span></span>(<span>dfr: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame</span>
<span>def <span class="ident">treat_old_pluviometer</span></span>(<span>dfr: pandas.core.frame.DataFrame, wait=None) ‑> pandas.core.frame.DataFrame</span>
</code></dt>
<dd>
<div class="desc"><p>Renomeia colunas no estilo do antigo flow.</p></div>
Expand All @@ -601,11 +616,18 @@ <h2 class="section-title" id="header-functions">Functions</h2>
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def treat_old_pluviometer(dfr: pd.DataFrame) -&gt; pd.DataFrame:
def treat_old_pluviometer(
dfr: pd.DataFrame,
wait=None, # pylint: disable=unused-argument
) -&gt; pd.DataFrame:
&#34;&#34;&#34;
Renomeia colunas no estilo do antigo flow.
&#34;&#34;&#34;

log(
f&#34;Starting treating pluviometer data to match old API.\
Pluviometer table enter as\n{dfr.iloc[0]}&#34;
)
rename_cols = {
&#34;acumulado_chuva_15min&#34;: &#34;acumulado_chuva_15_min&#34;,
&#34;acumulado_chuva_1h&#34;: &#34;acumulado_chuva_1_h&#34;,
Expand Down Expand Up @@ -649,6 +671,10 @@ <h2 class="section-title" id="header-functions">Functions</h2>
&#34;acumulado_chuva_96_h&#34;,
]
]
log(
f&#34;Ending treating pluviometer data to match old API.\
Pluviometer table finished as\n{dfr.iloc[0]}&#34;
)
return dfr</code></pre>
</details>
</dd>
Expand Down

0 comments on commit 1ebb3a6

Please sign in to comment.