diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 696ed98d..b9217179 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -3,7 +3,7 @@ from bloom.infra.database import sql_model from dependency_injector.providers import Callable from sqlalchemy.orm import Session -from sqlalchemy import select, and_ +from sqlalchemy import select, and_, delete, Result from datetime import datetime from bloom.logger import logger @@ -159,6 +159,81 @@ def get_all_data_by_mmsi( result = session.execute(stmt).scalars() return [SpireAisDataRepository.map_to_domain(e) for e in result] + def get_all_data_before_as_df(self, session: Session, date_before: datetime) -> pd.DataFrame: + stmt = select(sql_model.SpireAisData.id, + sql_model.SpireAisData.spire_update_statement, + sql_model.SpireAisData.vessel_ais_class, + sql_model.SpireAisData.vessel_flag, + sql_model.SpireAisData.vessel_name, + sql_model.SpireAisData.vessel_callsign, + sql_model.SpireAisData.vessel_timestamp, + sql_model.SpireAisData.vessel_update_timestamp, + sql_model.SpireAisData.vessel_ship_type, + sql_model.SpireAisData.vessel_sub_ship_type, + sql_model.SpireAisData.vessel_mmsi, + sql_model.SpireAisData.vessel_imo, + sql_model.SpireAisData.vessel_width, + sql_model.SpireAisData.vessel_length, + sql_model.SpireAisData.position_accuracy, + sql_model.SpireAisData.position_collection_type, + sql_model.SpireAisData.position_course, + sql_model.SpireAisData.position_heading, + sql_model.SpireAisData.position_latitude, + sql_model.SpireAisData.position_longitude, + sql_model.SpireAisData.position_maneuver, + sql_model.SpireAisData.position_navigational_status, + sql_model.SpireAisData.position_rot, + sql_model.SpireAisData.position_speed, + sql_model.SpireAisData.position_timestamp, + sql_model.SpireAisData.position_update_timestamp, + sql_model.SpireAisData.voyage_destination, + sql_model.SpireAisData.voyage_draught, + sql_model.SpireAisData.voyage_eta, + sql_model.SpireAisData.voyage_timestamp, + sql_model.SpireAisData.voyage_update_timestamp, + sql_model.SpireAisData.created_at + ).where(sql_model.SpireAisData.created_at < date_before) + result = session.execute(stmt) + return pd.DataFrame(result, columns=[ + "id", + "spire_update_statement", + "vessel_ais_class", + "vessel_flag", + "vessel_name", + "vessel_callsign", + "vessel_timestamp", + "vessel_update_timestamp", + "vessel_ship_type", + "vessel_sub_ship_type", + "vessel_mmsi", + "vessel_imo", + "vessel_width", + "vessel_length", + "position_accuracy", + "position_collection_type", + "position_course", + "position_heading", + "position_latitude", + "position_longitude", + "position_maneuver", + "position_navigational_status", + "position_rot", + "position_speed", + "position_timestamp", + "position_update_timestamp", + "voyage_destination", + "voyage_draught", + "voyage_eta", + "voyage_timestamp", + "voyage_update_timestamp", + "created_at" + ]) + + def delete_rows(self, session: Session, row_ids: list[int]) -> int: + stmt = delete(sql_model.SpireAisData).where(sql_model.SpireAisData.id.in_(row_ids)) + result = session.execute(stmt) + return result.rowcount + @staticmethod def map_to_orm(data: SpireAisData) -> sql_model.SpireAisData: return sql_model.SpireAisData(**data.__dict__) diff --git a/backend/bloom/tasks/archive_spire_ais_data.py b/backend/bloom/tasks/archive_spire_ais_data.py new file mode 100644 index 00000000..88af4ce9 --- /dev/null +++ b/backend/bloom/tasks/archive_spire_ais_data.py @@ -0,0 +1,56 @@ +import argparse +from time import perf_counter +from datetime import timedelta, datetime, timezone +from pathlib import Path + +from bloom.container import UseCases +from bloom.logger import logger + + +def run(window: int, output_path: str): + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + + db = use_cases.db() + with db.session() as session: + now = datetime.now(timezone.utc) + date_limit = now - timedelta(days=window) + logger.info(f"Suppression des données antérieures au {date_limit}") + df = spire_ais_data_repository.get_all_data_before_as_df(session, date_limit) + if len(df) > 0: + min_date = df["created_at"].min().strftime("%Y-%m-%dT%H:%M:%S") + max_date = df["created_at"].max().strftime("%Y-%m-%dT%H:%M:%S") + file_name = Path(output_path).joinpath(f"./spire_ais_data_{min_date}_{max_date}.parquet") + df.to_parquet(file_name) + logger.info(f"{len(df)} enregistrements archivés dans le fichier {file_name}") + count = spire_ais_data_repository.delete_rows(session, list(df["id"])) + logger.info(f"{count} enregistrements supprimés en base de données") + else: + logger.info("Aucune donnée à archiver") + session.commit() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Archivage de la table spire_ais_data") + parser.add_argument( + "-w", + "--history-window", + type=int, + help="history window in days", + required=False, + default=365/2, + ) + parser.add_argument( + "-o", + "--output-path", + type=str, + help="output path", + required=False, + default="./", + ) + args = parser.parse_args() + time_start = perf_counter() + logger.info("DEBUT - Archivage des données de la table spire_ais_data") + run(args.history_window, args.output_path) + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Archivage des données de la table spire_ais_data en {duration:.2f}s")