Skip to content

Commit

Permalink
refactor, step 1 save and delete forecast, then delete the rest (#102)
Browse files Browse the repository at this point in the history
* refactor, step 1 save and delete forecast, then delete the rest

* remove return

* add log

* lint
  • Loading branch information
peterdudfield authored Nov 19, 2024
1 parent f7f7f84 commit 2a2d759
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 57 deletions.
97 changes: 53 additions & 44 deletions database-cleanup/database_cleanup/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,19 @@ def _profile(msg: str):
_log.debug(f"Done in {t1 - t0:.3f}s")


def _get_forecasts(session: Session, max_date: dt.datetime, limit: int) -> list[uuid.UUID]:
def _get_forecasts(
session: Session,
max_date: dt.datetime,
limit: int,
site_uuids: Optional[list[uuid.UUID]] = None,
) -> list[uuid.UUID]:
"""Get the `limit` older forecasts that are before `max_date`."""
stmt = (
sa.select(ForecastSQL.forecast_uuid)
.where(ForecastSQL.timestamp_utc < max_date)
.order_by(ForecastSQL.timestamp_utc)
.limit(limit)
)
stmt = sa.select(ForecastSQL.forecast_uuid).where(ForecastSQL.timestamp_utc < max_date)

if site_uuids is not None:
stmt = stmt.where(ForecastSQL.site_uuid.in_(site_uuids))

stmt = stmt.order_by(ForecastSQL.timestamp_utc).limit(limit)

return session.scalars(stmt).all()

Expand Down Expand Up @@ -144,48 +149,52 @@ def main(

num_forecast_deleted = 0

i = -1
while True:
i += 1
with Session.begin() as session:
forecast_uuids = _get_forecasts(
session,
max_date=date,
limit=batch_size,
)

if len(forecast_uuids) == 0:
_log.info(f"Done deleting forecasts made before {date}")
_log.info(
f"A total of {num_forecast_deleted} (and corresponding values) "
f"were deleted from the database."
)
_log.info("Exiting.")
return
# 1. save forecasts and delete them, 2. delete all other values
for save_forecasts in [True, False]:
_log.info(f"{'Saving and' if save_forecasts else 'Deleting'} forecasts and values")

if (save_dir is not None) and do_delete:
save_forecast_and_values(
session=session,
forecast_uuids=forecast_uuids,
directory=save_dir,
index=i,
site_uuids=site_uuids_all_sites,
)
i = -1
forecast_uuids = [1]
while len(forecast_uuids) > 0:
i += 1

if do_delete:
# Not that it is important to run this in a transaction for atomicity.
with Session.begin() as session:
_delete_forecasts_and_values(session, forecast_uuids)
else:
print(f"Would delete data from {len(forecast_uuids)} forecasts in a first batch.")
# Stop here because otherwise we would loop infinitely.
return
forecast_uuids = _get_forecasts(
session,
max_date=date,
limit=batch_size,
site_uuids=site_uuids_all_sites if save_forecasts else None,
)

if (save_dir is not None) and do_delete and save_forecasts:
save_forecast_and_values(
session=session,
forecast_uuids=forecast_uuids,
directory=save_dir,
index=i,
)

if do_delete:
# Not that it is important to run this in a transaction for atomicity.
with Session.begin() as session:
_delete_forecasts_and_values(session, forecast_uuids)
else:
print(f"Would delete data from {len(forecast_uuids)} forecasts in a first batch.")
# Stop here because otherwise we would loop infinitely.
return

num_forecast_deleted += len(forecast_uuids)
num_forecast_deleted += len(forecast_uuids)

if sleep:
_log.debug(f"Sleeping for {sleep} seconds")
time.sleep(sleep)
if sleep:
_log.debug(f"Sleeping for {sleep} seconds")
time.sleep(sleep)

_log.info(f"Done deleting forecasts made before {date}")
_log.info(
f"A total of {num_forecast_deleted} (and corresponding values) "
f"were deleted from the database."
)
_log.info("Exiting.")


def format_date(date) -> dt.datetime:
Expand Down
13 changes: 0 additions & 13 deletions database-cleanup/database_cleanup/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
import os
import uuid
import fsspec
from typing import Optional

from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL, SiteGroupSQL
from sqlalchemy.orm import Session
import sqlalchemy as sa
import pandas as pd


Expand Down Expand Up @@ -54,7 +52,6 @@ def save_forecast_and_values(
forecast_uuids: list[uuid.UUID],
directory: str,
index: int = 0,
site_uuids: Optional[list[uuid.UUID]] = None,
):
"""
Save forecast and forecast values to csv
Expand All @@ -63,7 +60,6 @@ def save_forecast_and_values(
:param directory: the directory where they should be saved
:param index: the index of the file, we delete the forecasts in batches,
so there will be several files to save
:param site_uuids: list of site uuids to save, if its None, then we ignore this
"""
_log.info(f"Saving data to {directory}")

Expand All @@ -72,15 +68,6 @@ def save_forecast_and_values(
if not fs.exists(directory):
fs.mkdir(directory)

if site_uuids is not None:
stmt = (
sa.select(ForecastSQL.forecast_uuid)
.filter(ForecastSQL.site_uuid.in_(site_uuids))
.filter(ForecastSQL.forecast_uuid.in_(forecast_uuids))
)

forecast_uuids = session.scalars(stmt).all()

# loop over both forecast and forecast_values tables
for table in ["forecast", "forecast_value"]:
model = ForecastSQL if table == "forecast" else ForecastValueSQL
Expand Down
10 changes: 10 additions & 0 deletions forecast-inference/forecast_inference/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@
from forecast_inference.utils.imports import import_from_module
from forecast_inference.utils.profiling import profile

logging.basicConfig(
level=getattr(logging, os.getenv("LOGLEVEL", "INFO")),
format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s",
)
_log = logging.getLogger(__name__)

# Get rid of the verbose logs
logging.getLogger('sqlalchemy').setLevel(logging.ERROR)
logging.getLogger('aiobotocore').setLevel(logging.ERROR)
logging.getLogger('aiobotocore').setLevel(logging.ERROR)


version = importlib.metadata.version("forecast_inference")

sentry_sdk.init(
Expand Down

0 comments on commit 2a2d759

Please sign in to comment.