From 9a6e8b03d7ef40f905630a37729a24157464bdf1 Mon Sep 17 00:00:00 2001 From: "herve.le-bars" Date: Sun, 24 Mar 2024 21:42:57 +0100 Subject: [PATCH] feat: migrate compute_port_geometry Signed-off-by: herve.le-bars --- .env.template | 4 + src/bloom/config.py | 5 +- .../infra/repositories/repository_port.py | 7 +- .../tasks/compute_port_geometry_buffer.py | 129 ------------------ src/tasks/data/load_port_data.py | 2 +- .../dimensions/load_dim_port_from_csv.py | 57 ++++---- .../compute_port_geometry_buffer.py | 129 ++++++++++++++++++ 7 files changed, 168 insertions(+), 165 deletions(-) delete mode 100644 src/bloom/tasks/compute_port_geometry_buffer.py create mode 100644 src/tasks/transformations/compute_port_geometry_buffer.py diff --git a/.env.template b/.env.template index 56bbcd57..8422d0a0 100644 --- a/.env.template +++ b/.env.template @@ -19,10 +19,14 @@ LOGGING_LEVEL=INFO AMP_DATA_CSV_PATH= PORT_DATA_CSV_PATH= +PORT_POLYGON_DATA_CSV_PATH= VESSEL_DATA_CSV_PATH= VESSEL_POSITIONS_DATA_CSV_PATH= +#PORT_RADIUS_M=3000 +#PORT_RESOLUTION=10 + ############################################################################################### # DOCKER SPECIFIC VARIABLES diff --git a/src/bloom/config.py b/src/bloom/config.py index 41c30894..340e4e81 100644 --- a/src/bloom/config.py +++ b/src/bloom/config.py @@ -51,7 +51,10 @@ class Settings(BaseSettings): ) amp_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/zones_subset.csv") - port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv") + port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports.csv") + port_polygon_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv") + port_radius_m:int = 3000 # Radius in meters + port_resolution:int = 10 # Number of points in the resulting polygon vessel_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/chalutiers_pelagiques.csv") diff --git a/src/bloom/infra/repositories/repository_port.py b/src/bloom/infra/repositories/repository_port.py index 68f143a4..b55bf85a 100644 --- a/src/bloom/infra/repositories/repository_port.py +++ b/src/bloom/infra/repositories/repository_port.py @@ -40,8 +40,11 @@ def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]: return [PortRepository.map_to_domain(entity) for entity in q] def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]: - stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after, - sql_model.Port.updated_at >= created_updated_after)) + if created_updated_after is None: + stmt = select(sql_model.Port) + else: + stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after, + sql_model.Port.updated_at >= created_updated_after)) q = session.execute(stmt).scalars() if not q: return [] diff --git a/src/bloom/tasks/compute_port_geometry_buffer.py b/src/bloom/tasks/compute_port_geometry_buffer.py deleted file mode 100644 index 186caa86..00000000 --- a/src/bloom/tasks/compute_port_geometry_buffer.py +++ /dev/null @@ -1,129 +0,0 @@ -from time import perf_counter - -import geopandas as gpd -import pandas as pd -import pyproj -import shapely -from bloom.config import settings -from bloom.container import UseCases -from bloom.logger import logger -from scipy.spatial import Voronoi -from shapely.geometry import LineString, Polygon -from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository -from datetime import datetime, timezone - -radius_m = 3000 # Radius in meters -resolution = 10 # Number of points in the resulting polygon - - -# Function to create geodesic buffer around a point -def geodesic_point_buffer(lat: float, lon: float, radius_m: int, resolution: int) -> Polygon: - """ - Input - lat: latitude of the center point - lon: longitude of the center point - radius_m: radius of the buffer in meters - resolution: number of points in the resulting polygon - """ - geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid - # Create a circle in geodesic coordinates - angles = range(0, 360, 360 // resolution) - circle_points = [] - for angle in angles: - # Calculate the point on the circle for this angle - lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m) - circle_points.append((lon2, lat2)) - # Create a polygon from these points - return Polygon(circle_points) - - -def assign_voronoi_buffer(ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame: - """Computes a buffer around each port such as buffers do not overlap each other - - :param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point" - from the table "dim_ports" - :return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced - by "geometry_buffer" - """ - # Convert to CRS 6933 to write distances as meters (instead of degrees) - ports.to_crs(6933, inplace=True) - - # Create an 8km buffer around each port - # FIXME: maybe put the buffer distance as a global parameter - ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000)) - - # Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933 - ports.to_crs(settings.srid, inplace=True) - - # Convert buffers to 4326 - ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid) - - # Create Voronoi polygons, i.e. match each port to the area which is closest to this port - vor = Voronoi(list(zip(ports.longitude, ports.latitude))) - lines = [] - for line in vor.ridge_vertices: - if -1 not in line: - lines.append(LineString(vor.vertices[line])) - else: - lines.append(LineString()) - vor_polys = [poly for poly in shapely.ops.polygonize(lines)] - - # Match each port to its Voronoi polygon - vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326) - vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy() - ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326) - vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"]) - - # Corner case: ports at extremes latitude and longitude have no Voroni polygon - # (15 / >5800) cases --> duplicate regular buffer instead - vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[ - vor_ports.voronoi_poly.isna(), "buffer" - ] - - # Get intersection between fixed-size buffer and Voronoi polygon to get final buffer - vor_ports["geometry_buffer"] = vor_ports.apply( - lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]), - axis=1 - ) - vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy() - vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326) - vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy() - - return vor_ports - - -def run() -> None: - use_cases = UseCases() - port_repository = use_cases.port_repository() - db = use_cases.db() - items = [] - with db.session() as session: - point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer") - logger.info(f"Point in time={point_in_time}") - now = datetime.now(timezone.utc) - ports = port_repository.get_ports_updated_created_after(session, point_in_time) - if ports: - df = pd.DataFrame( - [[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports], - columns=["id", "geometry_point", "latitude", "longitude"], - ) - gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) - - # Apply the buffer function to create buffers - gdf = assign_voronoi_buffer(gdf) - - for row in gdf.itertuples(): - items.append({"id": row.id, "geometry_buffer": row.geometry_buffer}) - port_repository.batch_update_geometry_buffer(session, items) - TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now) - session.commit() - logger.info(f"{len(items)} buffer de ports mis à jour") - - -if __name__ == "__main__": - time_start = perf_counter() - logger.info("DEBUT - Calcul des buffer de ports") - run() - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s") diff --git a/src/tasks/data/load_port_data.py b/src/tasks/data/load_port_data.py index 6d2e9a92..cb3f47d6 100644 --- a/src/tasks/data/load_port_data.py +++ b/src/tasks/data/load_port_data.py @@ -15,7 +15,7 @@ def run(self,*args,**kwargs): engine = create_engine(settings.db_url, echo=False) df = pd.read_csv( - settings.port_data_csv_path, + settings.port_polygon_data_csv_path, sep=";", ) diff --git a/src/tasks/dimensions/load_dim_port_from_csv.py b/src/tasks/dimensions/load_dim_port_from_csv.py index 4a84c1ad..992add6c 100644 --- a/src/tasks/dimensions/load_dim_port_from_csv.py +++ b/src/tasks/dimensions/load_dim_port_from_csv.py @@ -1,6 +1,5 @@ from tasks.base import BaseTask from bloom.config import settings -import logging from pathlib import Path from time import perf_counter @@ -15,12 +14,10 @@ from pydantic import ValidationError from shapely import wkt -logging.basicConfig() -logging.getLogger("bloom.tasks").setLevel(settings.logging_level) class LoadDimPortFromCsv(BaseTask): - def map_to_domain(row) -> Port: + def map_to_domain(self,row) -> Port: iso_code = pycountry.countries.get(name=row["country"]) iso_code = iso_code.alpha_3 if iso_code is not None else "XXX" @@ -33,38 +30,34 @@ def map_to_domain(row) -> Port: longitude=float(row["longitude"]), geometry_point=row["geometry_point"], ) - def run(self): - - -def run(csv_file_name: str) -> None: - use_cases = UseCases() - port_repository = use_cases.port_repository() - db = use_cases.db() - - ports = [] - total = 0 - try: - df = pd.read_csv(csv_file_name, sep=";") - df["geometry_point"] = df["geometry_point"].apply(wkt.loads) - gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) - ports = gdf.apply(map_to_domain, axis=1) - with db.session() as session: - ports = port_repository.batch_create_port(session, list(ports)) - session.commit() - total = len(ports) - except ValidationError as e: - logger.error("Erreur de validation des données de port") - logger.error(e.errors()) - except DBException as e: - logger.error("Erreur d'insertion en base") - logger.info(f"{total} ports(s) créés") + def run(self,*args,**kwargs): + use_cases = UseCases() + port_repository = use_cases.port_repository() + db = use_cases.db() + + ports = [] + total = 0 + try: + df = pd.read_csv(settings.port_data_csv_path, sep=";") + df["geometry_point"] = df["geometry_point"].apply(wkt.loads) + gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) + ports = gdf.apply(self.map_to_domain, axis=1) + with db.session() as session: + ports = port_repository.batch_create_port(session, list(ports)) + session.commit() + total = len(ports) + except ValidationError as e: + logger.error("Erreur de validation des données de port") + logger.error(e.errors()) + except DBException as e: + logger.error("Erreur d'insertion en base") + logger.info(f"{total} ports(s) créés") if __name__ == "__main__": time_start = perf_counter() - file_name = Path(settings.data_folder).joinpath("./ports.csv") - logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {file_name}") - run(file_name) + logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {settings.port_data_csv_path}") + LoadDimPortFromCsv().start() time_end = perf_counter() duration = time_end - time_start logger.info(f"FIN - Chargement des données de ports en {duration:.2f}s") diff --git a/src/tasks/transformations/compute_port_geometry_buffer.py b/src/tasks/transformations/compute_port_geometry_buffer.py new file mode 100644 index 00000000..02647ce3 --- /dev/null +++ b/src/tasks/transformations/compute_port_geometry_buffer.py @@ -0,0 +1,129 @@ +from time import perf_counter +from tasks.base import BaseTask +import geopandas as gpd +import pandas as pd +import pyproj +import shapely +from bloom.config import settings +from bloom.container import UseCases +from bloom.logger import logger +from scipy.spatial import Voronoi +from shapely.geometry import LineString, Polygon +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository +from datetime import datetime, timezone + + + +class ComputePort_GeometryBuffer(BaseTask): + radius_m = settings.port_radius_m # Radius in meters + resolution = settings.port_resolution # Number of points in the resulting polygon + + # Function to create geodesic buffer around a point + def geodesic_point_buffer(lat: float, lon: float, radius_m: int, resolution: int) -> Polygon: + """ + Input + lat: latitude of the center point + lon: longitude of the center point + radius_m: radius of the buffer in meters + resolution: number of points in the resulting polygon + """ + geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid + # Create a circle in geodesic coordinates + angles = range(0, 360, 360 // resolution) + circle_points = [] + for angle in angles: + # Calculate the point on the circle for this angle + lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m) + circle_points.append((lon2, lat2)) + # Create a polygon from these points + return Polygon(circle_points) + + def assign_voronoi_buffer(self,ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + """Computes a buffer around each port such as buffers do not overlap each other + + :param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point" + from the table "dim_ports" + :return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced + by "geometry_buffer" + """ + # Convert to CRS 6933 to write distances as meters (instead of degrees) + ports.to_crs(6933, inplace=True) + + # Create an 8km buffer around each port + # FIXME: maybe put the buffer distance as a global parameter + ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000)) + + # Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933 + ports.to_crs(settings.srid, inplace=True) + + # Convert buffers to 4326 + ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid) + + # Create Voronoi polygons, i.e. match each port to the area which is closest to this port + vor = Voronoi(list(zip(ports.longitude, ports.latitude))) + lines = [] + for line in vor.ridge_vertices: + if -1 not in line: + lines.append(LineString(vor.vertices[line])) + else: + lines.append(LineString()) + vor_polys = [poly for poly in shapely.ops.polygonize(lines)] + + # Match each port to its Voronoi polygon + vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326) + vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy() + ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326) + vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"]) + + # Corner case: ports at extremes latitude and longitude have no Voroni polygon + # (15 / >5800) cases --> duplicate regular buffer instead + vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[ + vor_ports.voronoi_poly.isna(), "buffer" + ] + + # Get intersection between fixed-size buffer and Voronoi polygon to get final buffer + vor_ports["geometry_buffer"] = vor_ports.apply( + lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]), + axis=1 + ) + vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy() + vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326) + vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy() + + return vor_ports + + + def run(self,*args,**kwargs)-> None: + use_cases = UseCases() + port_repository = use_cases.port_repository() + db = use_cases.db() + items = [] + with db.session() as session: + point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer") + logger.info(f"Point in time={point_in_time}") + now = datetime.now(timezone.utc) + ports = port_repository.get_ports_updated_created_after(session, point_in_time) + if ports: + df = pd.DataFrame( + [[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports], + columns=["id", "geometry_point", "latitude", "longitude"], + ) + gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) + + # Apply the buffer function to create buffers + gdf = self.assign_voronoi_buffer(gdf) + + for row in gdf.itertuples(): + items.append({"id": row.id, "geometry_buffer": row.geometry_buffer}) + port_repository.batch_update_geometry_buffer(session, items) + TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now) + session.commit() + logger.info(f"{len(items)} buffer de ports mis à jour") + +if __name__ == "__main__": + time_start = perf_counter() + logger.info("DEBUT - Calcul des buffer de ports") + ComputePort_GeometryBuffer().start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s")