diff --git a/database-cleanup/database_cleanup/app.py b/database-cleanup/database_cleanup/app.py index 1df7597..5f2e69d 100644 --- a/database-cleanup/database_cleanup/app.py +++ b/database-cleanup/database_cleanup/app.py @@ -50,14 +50,19 @@ def _profile(msg: str): _log.debug(f"Done in {t1 - t0:.3f}s") -def _get_forecasts(session: Session, max_date: dt.datetime, limit: int) -> list[uuid.UUID]: +def _get_forecasts( + session: Session, + max_date: dt.datetime, + limit: int, + site_uuids: Optional[list[uuid.UUID]] = None, +) -> list[uuid.UUID]: """Get the `limit` older forecasts that are before `max_date`.""" - stmt = ( - sa.select(ForecastSQL.forecast_uuid) - .where(ForecastSQL.timestamp_utc < max_date) - .order_by(ForecastSQL.timestamp_utc) - .limit(limit) - ) + stmt = sa.select(ForecastSQL.forecast_uuid).where(ForecastSQL.timestamp_utc < max_date) + + if site_uuids is not None: + stmt = stmt.where(ForecastSQL.site_uuid.in_(site_uuids)) + + stmt = stmt.order_by(ForecastSQL.timestamp_utc).limit(limit) return session.scalars(stmt).all() @@ -144,48 +149,52 @@ def main( num_forecast_deleted = 0 - i = -1 - while True: - i += 1 - with Session.begin() as session: - forecast_uuids = _get_forecasts( - session, - max_date=date, - limit=batch_size, - ) - - if len(forecast_uuids) == 0: - _log.info(f"Done deleting forecasts made before {date}") - _log.info( - f"A total of {num_forecast_deleted} (and corresponding values) " - f"were deleted from the database." - ) - _log.info("Exiting.") - return + # 1. save forecasts and delete them, 2. delete all other values + for save_forecasts in [True, False]: + _log.info(f"{'Saving and' if save_forecasts else 'Deleting'} forecasts and values") - if (save_dir is not None) and do_delete: - save_forecast_and_values( - session=session, - forecast_uuids=forecast_uuids, - directory=save_dir, - index=i, - site_uuids=site_uuids_all_sites, - ) + i = -1 + forecast_uuids = [1] + while len(forecast_uuids) > 0: + i += 1 - if do_delete: - # Not that it is important to run this in a transaction for atomicity. with Session.begin() as session: - _delete_forecasts_and_values(session, forecast_uuids) - else: - print(f"Would delete data from {len(forecast_uuids)} forecasts in a first batch.") - # Stop here because otherwise we would loop infinitely. - return + forecast_uuids = _get_forecasts( + session, + max_date=date, + limit=batch_size, + site_uuids=site_uuids_all_sites if save_forecasts else None, + ) + + if (save_dir is not None) and do_delete and save_forecasts: + save_forecast_and_values( + session=session, + forecast_uuids=forecast_uuids, + directory=save_dir, + index=i, + ) + + if do_delete: + # Not that it is important to run this in a transaction for atomicity. + with Session.begin() as session: + _delete_forecasts_and_values(session, forecast_uuids) + else: + print(f"Would delete data from {len(forecast_uuids)} forecasts in a first batch.") + # Stop here because otherwise we would loop infinitely. + return - num_forecast_deleted += len(forecast_uuids) + num_forecast_deleted += len(forecast_uuids) - if sleep: - _log.debug(f"Sleeping for {sleep} seconds") - time.sleep(sleep) + if sleep: + _log.debug(f"Sleeping for {sleep} seconds") + time.sleep(sleep) + + _log.info(f"Done deleting forecasts made before {date}") + _log.info( + f"A total of {num_forecast_deleted} (and corresponding values) " + f"were deleted from the database." + ) + _log.info("Exiting.") def format_date(date) -> dt.datetime: diff --git a/database-cleanup/database_cleanup/save.py b/database-cleanup/database_cleanup/save.py index d1f6f01..a654499 100644 --- a/database-cleanup/database_cleanup/save.py +++ b/database-cleanup/database_cleanup/save.py @@ -2,11 +2,9 @@ import os import uuid import fsspec -from typing import Optional from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL, SiteGroupSQL from sqlalchemy.orm import Session -import sqlalchemy as sa import pandas as pd @@ -54,7 +52,6 @@ def save_forecast_and_values( forecast_uuids: list[uuid.UUID], directory: str, index: int = 0, - site_uuids: Optional[list[uuid.UUID]] = None, ): """ Save forecast and forecast values to csv @@ -63,7 +60,6 @@ def save_forecast_and_values( :param directory: the directory where they should be saved :param index: the index of the file, we delete the forecasts in batches, so there will be several files to save - :param site_uuids: list of site uuids to save, if its None, then we ignore this """ _log.info(f"Saving data to {directory}") @@ -72,15 +68,6 @@ def save_forecast_and_values( if not fs.exists(directory): fs.mkdir(directory) - if site_uuids is not None: - stmt = ( - sa.select(ForecastSQL.forecast_uuid) - .filter(ForecastSQL.site_uuid.in_(site_uuids)) - .filter(ForecastSQL.forecast_uuid.in_(forecast_uuids)) - ) - - forecast_uuids = session.scalars(stmt).all() - # loop over both forecast and forecast_values tables for table in ["forecast", "forecast_value"]: model = ForecastSQL if table == "forecast" else ForecastValueSQL diff --git a/forecast-inference/forecast_inference/app.py b/forecast-inference/forecast_inference/app.py index e971275..9e0cf9f 100644 --- a/forecast-inference/forecast_inference/app.py +++ b/forecast-inference/forecast_inference/app.py @@ -23,8 +23,18 @@ from forecast_inference.utils.imports import import_from_module from forecast_inference.utils.profiling import profile +logging.basicConfig( + level=getattr(logging, os.getenv("LOGLEVEL", "INFO")), + format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s", +) _log = logging.getLogger(__name__) +# Get rid of the verbose logs +logging.getLogger('sqlalchemy').setLevel(logging.ERROR) +logging.getLogger('aiobotocore').setLevel(logging.ERROR) +logging.getLogger('aiobotocore').setLevel(logging.ERROR) + + version = importlib.metadata.version("forecast_inference") sentry_sdk.init(