From a0a2501e015094c404511a50545b8c62048693ac Mon Sep 17 00:00:00 2001 From: Jona Date: Fri, 22 Sep 2023 23:41:51 -0400 Subject: [PATCH] Create helper class for AIS tasks --- .../cloud_function_ais_analysis/main.py | 146 ++----------- .../requirements.txt | 9 +- .../cloud_function_ais_analysis/utils/ais.py | 201 ++++++++++++++++++ .../cloud_function_ais_analysis/utils/misc.py | 43 ---- .../utils/trajectory.py | 102 --------- requirements.txt | 7 +- 6 files changed, 228 insertions(+), 280 deletions(-) create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/ais.py delete mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/misc.py delete mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py diff --git a/cerulean_cloud/cloud_function_ais_analysis/main.py b/cerulean_cloud/cloud_function_ais_analysis/main.py index 05f92c91..35df60e4 100644 --- a/cerulean_cloud/cloud_function_ais_analysis/main.py +++ b/cerulean_cloud/cloud_function_ais_analysis/main.py @@ -4,36 +4,16 @@ import asyncio import json import os -from datetime import datetime, timedelta +from datetime import datetime -import pandas_gbq - -# import shapely.geometry as sh # https://docs.aws.amazon.com/lambda/latest/dg/python-package.html from google.cloud import tasks_v2 from google.protobuf import timestamp_pb2 +from cerulean_cloud.cloud_function_ais_analysis.utils.ais import AISConstructor from cerulean_cloud.cloud_function_ais_analysis.utils.associate import ( associate_ais_to_slicks, slicks_to_curves, ) -from cerulean_cloud.cloud_function_ais_analysis.utils.constants import ( - AIS_BUFFER, - BUF_VEC, - D_FORMAT, - HOURS_AFTER, - HOURS_BEFORE, - NUM_TIMESTEPS, - T_FORMAT, - WEIGHT_VEC, -) -from cerulean_cloud.cloud_function_ais_analysis.utils.misc import ( - build_time_vec, - get_utm_zone, -) -from cerulean_cloud.cloud_function_ais_analysis.utils.trajectory import ( - ais_points_to_trajectories, - buffer_trajectories, -) from cerulean_cloud.database_client import DatabaseClient, get_engine # mypy: ignore-errors @@ -75,7 +55,6 @@ async def handle_aaa_request(request): Notes: - The function gets scene information from the request. - It uses the `DatabaseClient` for database operations. - - It calls `get_ais` and `automatic_ais_analysis` for AIS data retrieval and analysis. """ request_json = request.get_json() if not request_json.get("dry_run"): @@ -88,120 +67,35 @@ async def handle_aaa_request(request): await db_client.get_slicks_without_sources_from_scene_id(scene_id) ) if len(slicks_without_sources) > 0: - ais_traj, ais_buffered, ais_weighted, utm_zone = get_ais(s1) - if ais_traj: + ais_constructor = AISConstructor(s1) + ais_constructor.retrieve_ais() + if ais_constructor.ais_df: + ais_constructor.build_trajectories() + ais_constructor.buffer_trajectories() for slick in slicks_without_sources: - automatic_ais_analysis( - slick, ais_traj, ais_buffered, ais_weighted, utm_zone - ) + automatic_ais_analysis(ais_constructor, slick) return "Success!" -async def get_ais( - s1, hours_before=HOURS_BEFORE, hours_after=HOURS_AFTER, ais_buffer=AIS_BUFFER -): - """ - Asynchronously fetches and processes AIS data. - - Args: - s1 (Scene Object): The scene object for which AIS data is needed. - hours_before (int): The number of hours before the scene time to consider for AIS data. - hours_after (int): The number of hours after the scene time to consider for AIS data. - ais_buffer (float): The buffer distance around the scene geometry. - - Returns: - tuple: A tuple containing ais_traj, ais_buffered, ais_weighted, and utm_zone. - - Notes: - - AIS data is downloaded and then transformed into trajectories. - - The function also buffers and weighs the AIS trajectories. - """ - grd_buff = s1.geometry.buffer(ais_buffer) - ais = download_ais(s1.start_time, hours_before, hours_after, grd_buff) - time_vec = build_time_vec(s1.start_time, hours_before, hours_after, NUM_TIMESTEPS) - utm_zone = get_utm_zone(ais) - ais_traj = ais_points_to_trajectories(ais, time_vec) - ais_buffered, ais_weighted = buffer_trajectories(ais_traj, BUF_VEC, WEIGHT_VEC) - - return ais_traj, ais_buffered, ais_weighted, utm_zone - - -def download_ais( - t_stamp, - hours_before, - hours_after, - poly, -): - """ - Downloads AIS data using a SQL query on BigQuery. - - Args: - t_stamp (datetime): The timestamp for which AIS data is needed. - hours_before (int): The number of hours before the timestamp to consider for AIS data. - hours_after (int): The number of hours after the timestamp to consider for AIS data. - poly (str): The polygon geometry in WKT format to filter AIS data spatially. - - Returns: - DataFrame: A Pandas DataFrame containing the downloaded AIS data. - - Notes: - - The function uses Google's BigQuery Python client `pandas_gbq` to execute the SQL query. - - Make sure that the BigQuery project ID is set in the environment variable "BQ_PROJECT_ID". +def automatic_ais_analysis(ais_constructor, slick): """ - sql = f""" - SELECT * FROM( - SELECT - seg.ssvid as ssvid, - seg.timestamp as timestamp, - seg.lon as lon, - seg.lat as lat, - seg.course as course, - seg.speed_knots as speed_knots, - ves.ais_identity.shipname_mostcommon.value as shipname, - ves.ais_identity.shiptype[SAFE_OFFSET(0)].value as shiptype, - ves.best.best_flag as flag, - ves.best.best_vessel_class as best_shiptype - FROM - `world-fishing-827.gfw_research.pipe_v20201001` as seg - LEFT JOIN - `world-fishing-827.gfw_research.vi_ssvid_v20230801` as ves - ON seg.ssvid = ves.ssvid - - WHERE - seg._PARTITIONTIME between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), D_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), D_FORMAT)}' - AND seg.timestamp between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), T_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), T_FORMAT)}' - AND ST_COVEREDBY(ST_GEOGPOINT(seg.lon, seg.lat), ST_GeogFromText('{poly}')) - ORDER BY - seg.timestamp DESC - ) - ORDER BY - ssvid, timestamp - """ - return pandas_gbq.read_gbq(sql, project_id=os.getenv("BQ_PROJECT_ID")) + Perform automatic analysis to associate AIS trajectories with slicks. - -def automatic_ais_analysis(slick, ais_traj, ais_buffered, ais_weighted, utm_zone): - """ - Performs automatic AIS analysis for a given slick and AIS data. - - Args: - slick (GeoDataFrame): The oil slick geometry. - ais_traj (GeoDataFrame): The AIS trajectories. - ais_buffered (GeoDataFrame): The buffered AIS trajectories. - ais_weighted (GeoDataFrame): The weighted AIS trajectories. - utm_zone (str): The UTM zone for coordinate transformation. + Parameters: + ais_constructor (AISTrajectoryAnalysis): An instance of the AISTrajectoryAnalysis class. + slick (GeoDataFrame): A GeoDataFrame containing the slick geometries. Returns: - DataFrame: A Pandas DataFrame containing the AIS analysis results sorted by slick index and score. - - Notes: - - The function performs geometry transformations and data association. - - It uses the helper functions `slicks_to_curves` and `associate_ais_to_slicks`. + GroupBy object: The AIS-slick associations sorted and grouped by slick index. """ - slicks = slick.to_crs(utm_zone) + slicks = slick.to_crs(ais_constructor.ais_df.estimate_utm_crs()) slicks_clean, slicks_curves = slicks_to_curves(slicks) slick_ais = associate_ais_to_slicks( - ais_traj, ais_buffered, ais_weighted, slicks_clean, slicks_curves + ais_constructor.ais_trajectories, + ais_constructor.ais_buffered, + ais_constructor.ais_weighted, + slicks_clean, + slicks_curves, ) results = slick_ais.sort_values( ["slick_index", "slick_size", "total_score"], ascending=[True, False, False] diff --git a/cerulean_cloud/cloud_function_ais_analysis/requirements.txt b/cerulean_cloud/cloud_function_ais_analysis/requirements.txt index 0679862a..bc8b3093 100644 --- a/cerulean_cloud/cloud_function_ais_analysis/requirements.txt +++ b/cerulean_cloud/cloud_function_ais_analysis/requirements.txt @@ -1,17 +1,10 @@ asyncio==3.4.3 -json==Package(s) not found -os==Package(s) not found pandas_gbq==0.17.6 google-cloud-tasks==2.9.1 -google-cloud-core==2.3.1 -google-auth==1.35.0 sqlalchemy==1.4.32 -datetime==Package(s) not found -math==Package(s) not found centerline==1.0.1 geopandas==0.12.2 movingpandas==0.15rc1 numpy==1.21.5 scipy==1.8.0 -shapely==2.0.1 -earthengine-api==0.1.342 \ No newline at end of file +shapely==2.0.1 \ No newline at end of file diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/ais.py b/cerulean_cloud/cloud_function_ais_analysis/utils/ais.py new file mode 100644 index 00000000..868fecb5 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/ais.py @@ -0,0 +1,201 @@ +""" +Utilities and helper functions for the AIS class +""" +import os +from datetime import datetime, timedelta + +import geopandas as gpd +import movingpandas as mpd +import pandas as pd +import pandas_gbq +import shapely + +from cerulean_cloud.cloud_function_ais_analysis.utils.constants import ( + AIS_BUFFER, + BUF_VEC, + D_FORMAT, + HOURS_AFTER, + HOURS_BEFORE, + NUM_TIMESTEPS, + T_FORMAT, + WEIGHT_VEC, +) + + +class AISConstructor: + """ + A class for constructing AIS (Automatic Identification System) data for maritime trajectories. + + Attributes: + s1 (object): Initial S1 scene with start time and geometry. + hours_before (int): Number of hours before the start time to consider. + hours_after (int): Number of hours after the start time to consider. + ais_buffer (float): Buffer radius for the S1 scene. + num_timesteps (int): Number of timesteps for interpolation/extrapolation. + buf_vec (list): Vector of buffer radii for trajectory points. + weight_vec (list): Vector of weights for trajectory points. + poly (object): Buffered geometry of the S1 scene. + start_time (datetime): Start time for the analysis. + end_time (datetime): End time for the analysis. + time_vec (list): Vector of datetime objects for interpolation/extrapolation. + ais_df (DataFrame): AIS data retrieved from the database. + ais_trajectories (list): List of interpolated/extrapolated trajectories. + ais_buffered (DataFrame): Dataframe containing buffered trajectories. + ais_weighted (list): List of weighted geometries for each trajectory. + """ + + def __init__( + self, + s1, + hours_before=HOURS_BEFORE, + hours_after=HOURS_AFTER, + ais_buffer=AIS_BUFFER, + num_timesteps=NUM_TIMESTEPS, + buf_vec=BUF_VEC, + weight_vec=WEIGHT_VEC, + ): + """ + Initialize an AISTrajectoryAnalysis object. + + Parameters: + s1 (object): Initial spatial object with start time and geometry. + hours_before (int, optional): Number of hours before the start time to consider. Defaults to HOURS_BEFORE. + hours_after (int, optional): Number of hours after the start time to consider. Defaults to HOURS_AFTER. + ais_buffer (float, optional): Buffer radius for the spatial object. Defaults to AIS_BUFFER. + num_timesteps (int, optional): Number of timesteps for interpolation/extrapolation. Defaults to NUM_TIMESTEPS. + buf_vec (list, optional): Vector of buffer radii for trajectory points. Defaults to BUF_VEC. + weight_vec (list, optional): Vector of weights for trajectory points. Defaults to WEIGHT_VEC. + """ + # Default values + self.s1 = s1 + self.hours_before = hours_before + self.hours_after = hours_after + self.ais_buffer = ais_buffer + self.num_timesteps = num_timesteps + self.buf_vec = buf_vec + self.weight_vec = weight_vec + + # Calculated values + self.poly = self.s1.geometry.buffer(self.ais_buffer) + self.start_time = self.s1.start_time - timedelta(hours=self.hours_before) + self.end_time = self.s1.start_time + timedelta(hours=self.hours_after) + self.time_vec = pd.date_range( + start=self.start_time, end=self.end_time, periods=self.num_timesteps + ) + + # Placeholder values + self.ais_df = None + self.ais_trajectories = None + self.ais_buffered = None + self.ais_weighted = None + + def retrieve_ais(self): + """ + Retrieve AIS data from Google BigQuery database. + + The function constructs a SQL query and fetches AIS data based on time and spatial constraints. + The retrieved data is stored in the ais_df attribute. + """ + sql = f""" + SELECT + seg.ssvid as ssvid, + seg.timestamp as timestamp, + seg.lon as lon, + seg.lat as lat, + seg.course as course, + seg.speed_knots as speed_knots, + ves.ais_identity.shipname_mostcommon.value as shipname, + ves.ais_identity.shiptype[SAFE_OFFSET(0)].value as shiptype, + ves.best.best_flag as flag, + ves.best.best_vessel_class as best_shiptype + FROM + `world-fishing-827.gfw_research.pipe_v20201001` as seg + LEFT JOIN + `world-fishing-827.gfw_research.vi_ssvid_v20230801` as ves + ON seg.ssvid = ves.ssvid + WHERE + seg._PARTITIONTIME between '{datetime.strftime(self.start_time, D_FORMAT)}' AND '{datetime.strftime(self.end_time, D_FORMAT)}' + AND seg.timestamp between '{datetime.strftime(self.start_time, T_FORMAT)}' AND '{datetime.strftime(self.end_time, T_FORMAT)}' + AND ST_COVEREDBY(ST_GEOGPOINT(seg.lon, seg.lat), ST_GeogFromText('{self.poly}')) + """ + self.ais_df = pandas_gbq.read_gbq(sql, project_id=os.getenv("BQ_PROJECT_ID")) + + def build_trajectories(self): + """ + Build maritime trajectories based on the AIS data. + + The function groups AIS data by ssvid (ship identifier) and constructs trajectories. + It then interpolates or extrapolates the trajectories to match the time vector. + The resulting trajectories are stored in the ais_trajectories attribute. + """ + ais_trajectories = list() + for ssvid, group in self.ais_df.groupby("ssvid"): + if ( + len(group) > 1 + ): # ignore single points # XXX Should NOT ignore single points! + # build trajectory + traj = mpd.Trajectory(df=group, traj_id=ssvid, t="timestamp") + + # interpolate/extrapolate to times in time_vec + times = list() + positions = list() + for t in self.time_vec: + pos = traj.interpolate_position_at(t) + times.append(t) + positions.append(pos) + + # store as trajectory + interpolated_traj = mpd.Trajectory( + df=gpd.GeoDataFrame( + {"timestamp": times, "geometry": positions}, crs=self.ais_df.crs + ), + traj_id=ssvid, + t="timestamp", + ) + + ais_trajectories.append(interpolated_traj) + + self.ais_trajectories = mpd.TrajectoryCollection(ais_trajectories) + + def buffer_trajectories(self): + """ + Create buffered geometries around the trajectories. + + The function iterates over each trajectory, buffers individual points, and then constructs + convex hulls around these points. Weighted geometries are also created based on the buffer radii. + The resulting buffered and weighted geometries are stored in the ais_buffered and ais_weighted attributes. + """ + ais_buf = list() + ais_weighted = list() + for traj in self.ais_df: + # grab points + points = traj.to_point_gdf() + points = points.sort_values(by="timestamp", ascending=False) + + # create buffered circles at points + ps = list() + for idx, buffer in enumerate(self.buf_vec): + ps.append(points.iloc[idx].geometry.buffer(buffer)) + + # create convex hulls from circles + n = range(len(ps) - 1) + convex_hulls = [ + shapely.geometry.MultiPolygon([ps[i], ps[i + 1]]).convex_hull for i in n + ] + + # weight convex hulls + weighted = list() + for cidx, c in enumerate(convex_hulls): + entry = dict() + entry["geometry"] = c + entry["weight"] = 1.0 / (cidx + 1) # weight is the inverse of the index + weighted.append(entry) + weighted = gpd.GeoDataFrame(weighted, crs=traj.crs) + ais_weighted.append(weighted) + + # create polygon from hulls + poly = shapely.ops.unary_union(convex_hulls) + ais_buf.append(poly) + + self.ais_buffered = gpd.GeoDataFrame(geometry=ais_buf, crs=traj.crs) + self.ais_weighted = ais_weighted diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py b/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py deleted file mode 100644 index 572cbf42..00000000 --- a/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -Miscellaneous collection of helper functions -""" - -import datetime - -import geopandas as gpd -import pandas as pd -import pyproj - - -def build_time_vec( - collect_time: datetime.datetime, - hours_before: int, - hours_after: int, - num_timesteps: int, -) -> pd.DatetimeIndex: - """ - Build a vector of times, starting at a given time and going back in time - - Inputs: - collect_time: datetime object for the time of collection - hours_before: number of hours before the collection time to start the vector - hours_after: number of hours after the collection time to end the vector - num_timesteps: number of timesteps in the vector - Returns: - DatetimeIndex of times - """ - start_time = collect_time - datetime.timedelta(hours=hours_before) - end_time = collect_time + datetime.timedelta(hours=hours_after) - return pd.date_range(start=start_time, end=end_time, periods=num_timesteps) - - -def get_utm_zone(gdf: gpd.GeoDataFrame) -> pyproj.crs.crs.CRS: - """ - Get the UTM zone for a GeoDataFrame - - Inputs: - gdf: GeoDataFrame to get the UTM zone for - Returns: - UTM zone as a pyproj CRS object - """ - return gdf.estimate_utm_crs() diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py b/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py deleted file mode 100644 index b96158c8..00000000 --- a/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -Utilities for working with AIS trajectories -""" - -import geopandas as gpd -import movingpandas as mpd -import numpy as np -import pandas as pd -import shapely.geometry -import shapely.ops - - -def ais_points_to_trajectories(ais: gpd.GeoDataFrame, time_vec: pd.DatetimeIndex): - """ - Convert a set of AIS points into trajectories, grouped by ssvid - For each trajectory, interpolate the position along a specified time vector - - Args: - ais (gpd.GeoDataFrame): AIS points with an SSVID field - time_vec (pd.DatetimeIndex): Time vector to interpolate to - Returns: - mpd.TrajectoryCollection: Trajectories with interpolated positions - """ - ais_trajectories = list() - for ssvid, group in ais.groupby("ssvid"): - if len(group) > 1: # ignore single points - # build trajectory - traj = mpd.Trajectory(df=group, traj_id=ssvid, t="timestamp") - - # interpolate/extrapolate to times in time_vec - times = list() - positions = list() - for t in time_vec: - pos = traj.interpolate_position_at(t) - times.append(t) - positions.append(pos) - - # store as trajectory - interpolated_traj = mpd.Trajectory( - df=gpd.GeoDataFrame( - {"timestamp": times, "geometry": positions}, crs=ais.crs - ), - traj_id=ssvid, - t="timestamp", - ) - - ais_trajectories.append(interpolated_traj) - - return mpd.TrajectoryCollection(ais_trajectories) - - -def buffer_trajectories( - ais: mpd.TrajectoryCollection, buf_vec: np.ndarray -) -> gpd.GeoDataFrame: - """ - Build conic buffers around each trajectory - Buffer is narrowest at the start and widest at the end - Weight is highest at the start and lowest at the end - - Args: - ais (mpd.TrajectoryCollection): Trajectories to buffer - buf_vec (np.ndarray): Buffer radii, in meters - weight_vec (np.ndarray): Weights for each buffer - Returns: - gpd.GeoDataFrame: Buffer polygons for every trajectory - List[gpd.GeoDataFrame]: Corresponding weighted buffer polygons - """ - ais_buf = list() - ais_weighted = list() - for traj in ais: - # grab points - points = traj.to_point_gdf() - points = points.sort_values(by="timestamp", ascending=False) - - # create buffered circles at points - ps = list() - for idx, buffer in enumerate(buf_vec): - ps.append(points.iloc[idx].geometry.buffer(buffer)) - - # create convex hulls from circles - n = range(len(ps) - 1) - convex_hulls = [ - shapely.geometry.MultiPolygon([ps[i], ps[i + 1]]).convex_hull for i in n - ] - - # weight convex hulls - weighted = list() - for cidx, c in enumerate(convex_hulls): - entry = dict() - entry["geometry"] = c - entry["weight"] = 1.0 / (cidx + 1) # weight is the inverse of the index - weighted.append(entry) - weighted = gpd.GeoDataFrame(weighted, crs=traj.crs) - ais_weighted.append(weighted) - - # create polygon from hulls - poly = shapely.ops.unary_union(convex_hulls) - ais_buf.append(poly) - - ais_buf = gpd.GeoDataFrame(geometry=ais_buf, crs=traj.crs) - - return ais_buf, ais_weighted diff --git a/requirements.txt b/requirements.txt index 3addd45c..26c2668b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,4 +27,9 @@ eodag-sentinelsat==0.4.1 geopandas libpysal pygeos -pandas_gbq \ No newline at end of file +asyncio==3.4.3 +pandas_gbq==0.17.6 +sqlalchemy==1.4.32 +centerline==1.0.1 +movingpandas==0.15rc1 +scipy==1.8.0 \ No newline at end of file