From eb0b9d68d032457a2764f5deb6f044708227c3de Mon Sep 17 00:00:00 2001 From: Jona Date: Fri, 22 Sep 2023 20:42:39 -0400 Subject: [PATCH] Identify files needed to add new cloud function --- .../cloud_function_ais_analysis/__init__.py | 1 + .../cloud_function_ais_analysis/main.py | 272 ++++++++++++++++++ .../requirements.txt | 17 ++ .../utils/__init__.py | 1 + .../utils/associate.py | 233 +++++++++++++++ .../utils/constants.py | 24 ++ .../cloud_function_ais_analysis/utils/misc.py | 43 +++ .../utils/scoring.py | 118 ++++++++ .../utils/trajectory.py | 102 +++++++ .../cloud_run_orchestrator/handler.py | 3 + cerulean_cloud/database_client.py | 42 +++ stack/cloud_function_ais_analysis.py | 107 +++++++ stack/stack_config/Pulumi.production.yaml | 5 +- stack/stack_config/Pulumi.staging.yaml | 1 + stack/stack_config/Pulumi.test.yaml | 1 + 15 files changed, 968 insertions(+), 2 deletions(-) create mode 100644 cerulean_cloud/cloud_function_ais_analysis/__init__.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/main.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/requirements.txt create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/__init__.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/associate.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/constants.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/misc.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/scoring.py create mode 100644 cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py create mode 100644 stack/cloud_function_ais_analysis.py diff --git a/cerulean_cloud/cloud_function_ais_analysis/__init__.py b/cerulean_cloud/cloud_function_ais_analysis/__init__.py new file mode 100644 index 00000000..095c6321 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/__init__.py @@ -0,0 +1 @@ +"""module for cloud function AIS analysis""" diff --git a/cerulean_cloud/cloud_function_ais_analysis/main.py b/cerulean_cloud/cloud_function_ais_analysis/main.py new file mode 100644 index 00000000..05f92c91 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/main.py @@ -0,0 +1,272 @@ +"""cloud function AIS analysis handler +""" + +import asyncio +import json +import os +from datetime import datetime, timedelta + +import pandas_gbq + +# import shapely.geometry as sh # https://docs.aws.amazon.com/lambda/latest/dg/python-package.html +from google.cloud import tasks_v2 +from google.protobuf import timestamp_pb2 + +from cerulean_cloud.cloud_function_ais_analysis.utils.associate import ( + associate_ais_to_slicks, + slicks_to_curves, +) +from cerulean_cloud.cloud_function_ais_analysis.utils.constants import ( + AIS_BUFFER, + BUF_VEC, + D_FORMAT, + HOURS_AFTER, + HOURS_BEFORE, + NUM_TIMESTEPS, + T_FORMAT, + WEIGHT_VEC, +) +from cerulean_cloud.cloud_function_ais_analysis.utils.misc import ( + build_time_vec, + get_utm_zone, +) +from cerulean_cloud.cloud_function_ais_analysis.utils.trajectory import ( + ais_points_to_trajectories, + buffer_trajectories, +) +from cerulean_cloud.database_client import DatabaseClient, get_engine + +# mypy: ignore-errors + + +def main(request): + """ + Entry point for handling HTTP requests. + + Args: + request (flask.Request): The incoming HTTP request object. + + Returns: + Any: The response object or any value that can be converted to a + Response object using Flask's `make_response`. + + Notes: + - This function sets up an asyncio event loop and delegates the actual + request handling to `handle_aaa_request`. + - It's important to set up a new event loop if the function is running + in a context where the default event loop is not available (e.g., in some WSGI servers). + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + res = loop.run_until_complete(handle_aaa_request(request)) + return res + + +async def handle_aaa_request(request): + """ + Asynchronously handles the main logic of Automatic AIS Analysis (AAA) for a given scene. + + Args: + request (flask.Request): The incoming HTTP request object containing scene information. + + Returns: + str: A string "Success!" upon successful completion. + + Notes: + - The function gets scene information from the request. + - It uses the `DatabaseClient` for database operations. + - It calls `get_ais` and `automatic_ais_analysis` for AIS data retrieval and analysis. + """ + request_json = request.get_json() + if not request_json.get("dry_run"): + scene_id = request_json.get("scene_id") + db_engine = get_engine(db_url=os.getenv("DB_URL")) + async with DatabaseClient(db_engine) as db_client: + async with db_client.session.begin(): + s1 = await db_client.get_scene_from_id(scene_id) + slicks_without_sources = ( + await db_client.get_slicks_without_sources_from_scene_id(scene_id) + ) + if len(slicks_without_sources) > 0: + ais_traj, ais_buffered, ais_weighted, utm_zone = get_ais(s1) + if ais_traj: + for slick in slicks_without_sources: + automatic_ais_analysis( + slick, ais_traj, ais_buffered, ais_weighted, utm_zone + ) + return "Success!" + + +async def get_ais( + s1, hours_before=HOURS_BEFORE, hours_after=HOURS_AFTER, ais_buffer=AIS_BUFFER +): + """ + Asynchronously fetches and processes AIS data. + + Args: + s1 (Scene Object): The scene object for which AIS data is needed. + hours_before (int): The number of hours before the scene time to consider for AIS data. + hours_after (int): The number of hours after the scene time to consider for AIS data. + ais_buffer (float): The buffer distance around the scene geometry. + + Returns: + tuple: A tuple containing ais_traj, ais_buffered, ais_weighted, and utm_zone. + + Notes: + - AIS data is downloaded and then transformed into trajectories. + - The function also buffers and weighs the AIS trajectories. + """ + grd_buff = s1.geometry.buffer(ais_buffer) + ais = download_ais(s1.start_time, hours_before, hours_after, grd_buff) + time_vec = build_time_vec(s1.start_time, hours_before, hours_after, NUM_TIMESTEPS) + utm_zone = get_utm_zone(ais) + ais_traj = ais_points_to_trajectories(ais, time_vec) + ais_buffered, ais_weighted = buffer_trajectories(ais_traj, BUF_VEC, WEIGHT_VEC) + + return ais_traj, ais_buffered, ais_weighted, utm_zone + + +def download_ais( + t_stamp, + hours_before, + hours_after, + poly, +): + """ + Downloads AIS data using a SQL query on BigQuery. + + Args: + t_stamp (datetime): The timestamp for which AIS data is needed. + hours_before (int): The number of hours before the timestamp to consider for AIS data. + hours_after (int): The number of hours after the timestamp to consider for AIS data. + poly (str): The polygon geometry in WKT format to filter AIS data spatially. + + Returns: + DataFrame: A Pandas DataFrame containing the downloaded AIS data. + + Notes: + - The function uses Google's BigQuery Python client `pandas_gbq` to execute the SQL query. + - Make sure that the BigQuery project ID is set in the environment variable "BQ_PROJECT_ID". + """ + sql = f""" + SELECT * FROM( + SELECT + seg.ssvid as ssvid, + seg.timestamp as timestamp, + seg.lon as lon, + seg.lat as lat, + seg.course as course, + seg.speed_knots as speed_knots, + ves.ais_identity.shipname_mostcommon.value as shipname, + ves.ais_identity.shiptype[SAFE_OFFSET(0)].value as shiptype, + ves.best.best_flag as flag, + ves.best.best_vessel_class as best_shiptype + FROM + `world-fishing-827.gfw_research.pipe_v20201001` as seg + LEFT JOIN + `world-fishing-827.gfw_research.vi_ssvid_v20230801` as ves + ON seg.ssvid = ves.ssvid + + WHERE + seg._PARTITIONTIME between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), D_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), D_FORMAT)}' + AND seg.timestamp between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), T_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), T_FORMAT)}' + AND ST_COVEREDBY(ST_GEOGPOINT(seg.lon, seg.lat), ST_GeogFromText('{poly}')) + ORDER BY + seg.timestamp DESC + ) + ORDER BY + ssvid, timestamp + """ + return pandas_gbq.read_gbq(sql, project_id=os.getenv("BQ_PROJECT_ID")) + + +def automatic_ais_analysis(slick, ais_traj, ais_buffered, ais_weighted, utm_zone): + """ + Performs automatic AIS analysis for a given slick and AIS data. + + Args: + slick (GeoDataFrame): The oil slick geometry. + ais_traj (GeoDataFrame): The AIS trajectories. + ais_buffered (GeoDataFrame): The buffered AIS trajectories. + ais_weighted (GeoDataFrame): The weighted AIS trajectories. + utm_zone (str): The UTM zone for coordinate transformation. + + Returns: + DataFrame: A Pandas DataFrame containing the AIS analysis results sorted by slick index and score. + + Notes: + - The function performs geometry transformations and data association. + - It uses the helper functions `slicks_to_curves` and `associate_ais_to_slicks`. + """ + slicks = slick.to_crs(utm_zone) + slicks_clean, slicks_curves = slicks_to_curves(slicks) + slick_ais = associate_ais_to_slicks( + ais_traj, ais_buffered, ais_weighted, slicks_clean, slicks_curves + ) + results = slick_ais.sort_values( + ["slick_index", "slick_size", "total_score"], ascending=[True, False, False] + ).groupby("slick_index") + return results + + +def add_to_aaa_queue(scene_id): + """ + Adds a new task to Google Cloud Tasks for automatic AIS analysis. + + Args: + scene_id (str): The ID of the scene for which AIS analysis is needed. + + Returns: + google.cloud.tasks_v2.types.Task: The created Task object. + + Notes: + - The function uses Google Cloud Tasks API to schedule the AIS analysis. + - Multiple retries are scheduled with different delays. + """ + # Create a client. + client = tasks_v2.CloudTasksClient() + + project = os.getenv("GCP_PROJECT") + location = os.getenv("GCP_LOCATION") + queue = os.getenv("QUEUE") + url = os.getenv("FUNCTION_URL") + dry_run = bool(os.getenv("IS_DRY_RUN")) + + # Construct the fully qualified queue name. + parent = client.queue_path(project, location, queue) + + # Construct the request body. + payload = {"sceneid": scene_id, "dry_run": dry_run} + + task = { + "http_request": { # Specify the type of request. + "http_method": tasks_v2.HttpMethod.POST, + "url": url, # The url path that the task will be sent to. + }, + "headers": { + "Content-type": "application/json", + "Authorization": f"Bearer {os.getenv('API_KEY')}", + }, + "body": json.dumps(payload).encode(), + } + + # Number of days that the Automatic AIS Analysis should be run after + # Each entry is another retry + ais_delays = [0, 3, 7] # TODO Magic number >>> Where should this live? + for delay in ais_delays: + d = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( + days=delay + ) + + # Create Timestamp protobuf. + timestamp = timestamp_pb2.Timestamp() + timestamp.FromDatetime(d) + + # Add the timestamp to the tasks. + task["schedule_time"] = timestamp + + # Use the client to build and send the task. + response = client.create_task(request={"parent": parent, "task": task}) + + print("Created task {}".format(response.name)) + return response diff --git a/cerulean_cloud/cloud_function_ais_analysis/requirements.txt b/cerulean_cloud/cloud_function_ais_analysis/requirements.txt new file mode 100644 index 00000000..0679862a --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/requirements.txt @@ -0,0 +1,17 @@ +asyncio==3.4.3 +json==Package(s) not found +os==Package(s) not found +pandas_gbq==0.17.6 +google-cloud-tasks==2.9.1 +google-cloud-core==2.3.1 +google-auth==1.35.0 +sqlalchemy==1.4.32 +datetime==Package(s) not found +math==Package(s) not found +centerline==1.0.1 +geopandas==0.12.2 +movingpandas==0.15rc1 +numpy==1.21.5 +scipy==1.8.0 +shapely==2.0.1 +earthengine-api==0.1.342 \ No newline at end of file diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/__init__.py b/cerulean_cloud/cloud_function_ais_analysis/utils/__init__.py new file mode 100644 index 00000000..9f9577a0 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/__init__.py @@ -0,0 +1 @@ +"""module for AIS utilities""" diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/associate.py b/cerulean_cloud/cloud_function_ais_analysis/utils/associate.py new file mode 100644 index 00000000..164117b7 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/associate.py @@ -0,0 +1,233 @@ +""" +Utilities and helper functions for the oil slick Leaflet map +""" + +from typing import List + +import centerline.geometry +import geopandas as gpd +import movingpandas as mpd +import numpy as np +import scipy.interpolate +import scipy.spatial.distance +import shapely.geometry +import shapely.ops + +from cerulean_cloud.cloud_function_ais_analysis.utils.scoring import ( + compute_frechet_distance, + compute_overlap_score, + compute_temporal_score, + compute_total_score, +) + + +def associate_ais_to_slicks( + ais: mpd.TrajectoryCollection, + buffered: gpd.GeoDataFrame, + weighted: List[gpd.GeoDataFrame], + slick: gpd.GeoDataFrame, + curves: gpd.GeoDataFrame, +): + """ + Measure association by computing multiple metrics between AIS trajectories and slicks + + Inputs: + ais: TrajectoryCollection of AIS trajectories + buffered: GeoDataFrame of buffered AIS trajectories + weighted: list of GeoDataFrames weighted AIS trajectories + slick: GeoDataFrame of slick detections + curves: GeoDataFrame of slick curves + Returns: + GeoDataFrame of slick associations + """ + # only consider trajectories that intersect slick detections + ais_filt = list() + weighted_filt = list() + buffered_filt = list() + for idx, t in enumerate(ais): + w = weighted[idx] + b = buffered.iloc[idx] + + # spatially join the weighted trajectory to the slick + b_gdf = gpd.GeoDataFrame(index=[0], geometry=[b.geometry], crs=slick.crs) + matches = gpd.sjoin(b_gdf, slick, how="inner", predicate="intersects") + if matches.empty: + continue + else: + ais_filt.append(t) + weighted_filt.append(w) + buffered_filt.append(b.geometry) + + associations = list() + if not weighted_filt: # no associations found + for idx in range(len(slick)): + # return a geodataframe that has the same format as the usual case + entry = dict() + entry["geometry"] = slick.iloc[idx].geometry + entry["slick_index"] = idx + entry["slick_size"] = None + entry["temporal_score"] = None + entry["overlap_score"] = None + entry["frechet_dist"] = None + entry["total_score"] = None + entry["traj_id"] = None + associations.append(entry) + + associations = gpd.GeoDataFrame(associations, crs=slick.crs) + return associations + + # create trajectory collection from filtered trajectories + ais_filt = mpd.TrajectoryCollection(ais_filt) + + # iterate over each slick + for idx in range(len(slick)): + s = slick.iloc[idx] + c = curves.iloc[idx] + + # iterate over filtered trajectories + for t, w, b in zip(ais_filt, weighted_filt, buffered_filt): + # compute temporal score + temporal_score = compute_temporal_score(w, s.geometry) + + # compute overlap score + overlap_score = compute_overlap_score(b, s.geometry) + + # compute frechet distance between trajectory and slick curve + frechet_dist = compute_frechet_distance(t, c.geometry) + + # compute total score from these three metrics + total_score = compute_total_score( + temporal_score, overlap_score, frechet_dist + ) + + entry = dict() + entry["geometry"] = s.geometry + entry["slick_index"] = idx + entry["slick_size"] = s.geometry.area + entry["temporal_score"] = temporal_score + entry["overlap_score"] = overlap_score + entry["frechet_dist"] = frechet_dist + entry["total_score"] = total_score + entry["traj_id"] = t.id + + associations.append(entry) + + associations = gpd.GeoDataFrame(associations, crs=slick.crs) + + return associations + + +def slicks_to_curves( + slicks: gpd.GeoDataFrame, + buf_size: int = 2000, + interp_dist: int = 200, + smoothing_factor: float = 1e9, +): + """ + From a set of oil slick detections, estimate curves that go through the detections + This process transforms a set of slick detections into LineStrings for each detection + + Inputs: + slicks: GeoDataFrame of slick detections + buf_size: buffer size for cleaning up slick detections + interp_dist: interpolation distance for centerline + smoothing_factor: smoothing factor for smoothing centerline + Returns: + GeoDataFrame of slick curves + """ + # clean up the slick detections by dilation followed by erosion + # this process can merge some polygons but not others, depending on proximity + slicks_clean = slicks.copy() + slicks_clean.geometry = slicks_clean.geometry.buffer(buf_size).buffer(-buf_size) + + # split slicks into individual polygons + slicks_clean = slicks_clean.explode(ignore_index=True, index_parts=False) + + # find a centerline through detections + slick_curves = list() + for idx, row in slicks_clean.iterrows(): + # create centerline -> MultiLineString + try: + cl = centerline.geometry.Centerline( + row.geometry, interpolation_distance=interp_dist + ) + except: # noqa # unclear what exception was originally thrown here. + # sometimes the voronoi polygonization fails + # in this case, just fit a a simple line from the start to the end + exterior_coords = row.geometry.exterior.coords + start_point = exterior_coords[0] + end_point = exterior_coords[-1] + curve = shapely.geometry.LineString([start_point, end_point]) + slick_curves.append(curve) + continue + + # grab coordinates from centerline + x = list() + y = list() + if type(cl.geometry) == shapely.geometry.MultiLineString: + # iterate through each linestring + for geom in cl.geometry.geoms: + x.extend(geom.coords.xy[0]) + y.extend(geom.coords.xy[1]) + else: + x.extend(cl.geometry.coords.xy[0]) + y.extend(cl.geometry.coords.xy[1]) + + # sort coordinates in both X and Y directions + coords = [(xc, yc) for xc, yc in zip(x, y)] + coords_sort_x = sorted(coords, key=lambda c: c[0]) + coords_sort_y = sorted(coords, key=lambda c: c[1]) + + # remove coordinate duplicates, preserving sorted order + coords_seen_x = set() + coords_unique_x = list() + for c in coords_sort_x: + if c not in coords_seen_x: + coords_unique_x.append(c) + coords_seen_x.add(c) + + coords_seen_y = set() + coords_unique_y = list() + for c in coords_sort_y: + if c not in coords_seen_y: + coords_unique_y.append(c) + coords_seen_y.add(c) + + # grab x and y coordinates for spline fit + x_fit_sort_x = [c[0] for c in coords_unique_x] + x_fit_sort_y = [c[0] for c in coords_unique_y] + y_fit_sort_x = [c[1] for c in coords_unique_x] + y_fit_sort_y = [c[1] for c in coords_unique_y] + + # fit a B-spline to the centerline + tck_sort_x, fp_sort_x, _, _ = scipy.interpolate.splrep( + x_fit_sort_x, y_fit_sort_x, k=3, s=smoothing_factor, full_output=True + ) + tck_sort_y, fp_sort_y, _, _ = scipy.interpolate.splrep( + y_fit_sort_y, x_fit_sort_y, k=3, s=smoothing_factor, full_output=True + ) + + # choose the spline that has the lowest fit error + if fp_sort_x <= fp_sort_y: + tck = tck_sort_x + x_fit = x_fit_sort_x + y_fit = y_fit_sort_x + + num_points = max(round((x_fit[-1] - x_fit[0]) / 100), 5) + x_new = np.linspace(x_fit[0], x_fit[-1], 10) + y_new = scipy.interpolate.BSpline(*tck)(x_new) + else: + tck = tck_sort_y + x_fit = x_fit_sort_y + y_fit = y_fit_sort_y + + num_points = max(round((y_fit[-1] - y_fit[0]) / 100), 5) + y_new = np.linspace(y_fit[0], y_fit[-1], num_points) + x_new = scipy.interpolate.BSpline(*tck)(y_new) + + # store as LineString + curve = shapely.geometry.LineString(zip(x_new, y_new)) + slick_curves.append(curve) + + slick_curves = gpd.GeoDataFrame(geometry=slick_curves, crs=slicks_clean.crs) + return slicks_clean, slick_curves diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/constants.py b/cerulean_cloud/cloud_function_ais_analysis/utils/constants.py new file mode 100644 index 00000000..8ac4627e --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/constants.py @@ -0,0 +1,24 @@ +"""Constants for AIS analysis handler +""" +import numpy as np + +# temporal parameters for AIS trajectory estimation +HOURS_BEFORE = 12 +HOURS_AFTER = 2 +TOTAL_TIME = HOURS_BEFORE + HOURS_AFTER +NUM_TIMESTEPS = TOTAL_TIME * 10 + +# buffering parameters for AIS trajectories +AIS_BUFFER = 5000 # buffer around GRD envelope to capture AIS +SPREAD_RATE = 2000 # meters/hour perpendicular to vessel track +BUF_START = 500 +BUF_END = BUF_START + SPREAD_RATE * TOTAL_TIME +BUF_VEC = np.linspace(BUF_START, BUF_END, NUM_TIMESTEPS) + +# weighting parameters for AIS trajectories +WEIGHT_START = 1.0 +WEIGHT_END = 0.1 +WEIGHT_VEC = np.logspace(WEIGHT_START, WEIGHT_END, NUM_TIMESTEPS) / 10.0 + +D_FORMAT = "%Y-%m-%d" +T_FORMAT = "%Y-%m-%d %H:%M:%S" diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py b/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py new file mode 100644 index 00000000..572cbf42 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/misc.py @@ -0,0 +1,43 @@ +""" +Miscellaneous collection of helper functions +""" + +import datetime + +import geopandas as gpd +import pandas as pd +import pyproj + + +def build_time_vec( + collect_time: datetime.datetime, + hours_before: int, + hours_after: int, + num_timesteps: int, +) -> pd.DatetimeIndex: + """ + Build a vector of times, starting at a given time and going back in time + + Inputs: + collect_time: datetime object for the time of collection + hours_before: number of hours before the collection time to start the vector + hours_after: number of hours after the collection time to end the vector + num_timesteps: number of timesteps in the vector + Returns: + DatetimeIndex of times + """ + start_time = collect_time - datetime.timedelta(hours=hours_before) + end_time = collect_time + datetime.timedelta(hours=hours_after) + return pd.date_range(start=start_time, end=end_time, periods=num_timesteps) + + +def get_utm_zone(gdf: gpd.GeoDataFrame) -> pyproj.crs.crs.CRS: + """ + Get the UTM zone for a GeoDataFrame + + Inputs: + gdf: GeoDataFrame to get the UTM zone for + Returns: + UTM zone as a pyproj CRS object + """ + return gdf.estimate_utm_crs() diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/scoring.py b/cerulean_cloud/cloud_function_ais_analysis/utils/scoring.py new file mode 100644 index 00000000..cfa7abcf --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/scoring.py @@ -0,0 +1,118 @@ +""" +Utilities for calculating scoring metrics between AIS trajectories and oil slick detections +""" + +import geopandas as gpd +import movingpandas as mpd +import shapely.geometry +import shapely.ops +from shapely import frechet_distance + + +def compute_frechet_distance(traj: mpd.Trajectory, curve: shapely.geometry.LineString): + """ + Compute the frechet distance between an AIS trajectory and an oil slick curve + + Args: + traj (mpd.Trajectory): AIS trajectory + curve (shapely.geometry.LineString): oil slick curve + + Returns: + float: frechet distance between traj and curve + """ + # get the trajectory coordinates as points in descending time order from collect + traj_gdf = traj.to_point_gdf().sort_values(by="timestamp", ascending=False) + + # take the points and put them in a linestring + traj_line = shapely.geometry.LineString(traj_gdf.geometry) + + # get the first and last points of the slick curve + first_point = shapely.geometry.Point(curve.coords[0]) + last_point = shapely.geometry.Point(curve.coords[-1]) + + # compute the distance from these points to the start of the trajectory + first_dist = first_point.distance(shapely.geometry.Point(traj_line.coords[0])) + last_dist = last_point.distance(shapely.geometry.Point(traj_line.coords[0])) + + if last_dist < first_dist: + # change input orientation by reversing the slick curve + curve = shapely.geometry.LineString(list(curve.coords)[::-1]) + + # for every point in the curve, find the closest trajectory point and store it off + traj_points = list() + for curve_point in curve.coords: + # compute the distance between this point and every point in the trajectory + these_distances = list() + for traj_point in traj_line.coords: + dist = shapely.geometry.Point(curve_point).distance( + shapely.geometry.Point(traj_point) + ) + these_distances.append(dist) + + closest_distance = min(these_distances) + closest_idx = these_distances.index(closest_distance) + traj_points.append(shapely.geometry.Point(traj_line.coords[closest_idx])) + + # compute the frechet distance between the sampled trajectory curve and the slick curve + traj_line_clip = shapely.geometry.LineString(traj_points) + dist = frechet_distance(traj_line_clip, curve) + + return dist + + +def compute_temporal_score( + weighted_traj: gpd.GeoDataFrame, slick: shapely.geometry.Polygon +): + """ + Compute the temporal score between a weighted AIS trajectory and an oil slick + + Args: + weighted_traj (gpd.GeoDataFrame): weighted AIS trajectory, with convex hull geometries and associated weights + slick (shapely.geometry.Polygon): oil slick polygon + Returns: + float: temporal score between weighted_traj and slick + """ + # spatially join the weighted convex hulls to the slick geometry + s_gdf = gpd.GeoDataFrame(index=[0], geometry=[slick], crs=weighted_traj.crs) + matches = gpd.sjoin(weighted_traj, s_gdf, how="inner", predicate="intersects") + + temporal_score = 0.0 + if ~matches.empty: + # take the sum of the weights of the matched convex hulls + temporal_score = matches["weight"].sum() + + return temporal_score + + +def compute_overlap_score( + buffered_traj: shapely.geometry.Polygon, slick: shapely.geometry.Polygon +): + """ + Compute the amount of overlap between a buffered AIS trajectory and an oil slick + + Args: + buffered_traj (shapely.geometry.Polygon): buffered AIS trajectory created by a convex hull operation + slick (shapely.geometry.Polygon): oil slick polygon + Returns: + float: overlap score between buffered_traj and slick + """ + overlap_score = slick.intersection(buffered_traj).area / slick.area + return overlap_score + + +def compute_total_score( + temporal_score: float, overlap_score: float, frechet_dist: float +): + """ + Compute the total score by combining the temporal score, overlap score, and frechet distance + The final weights were determined by a coarse grid search + + Args: + temporal_score (float): temporal score between a weighted AIS trajectory and an oil slick + overlap_score (float): overlap score between a buffered AIS trajectory and an oil slick + frechet_dist (float): frechet distance between an AIS trajectory and an oil slick curve + Returns: + float: total weighted score between a weighted AIS trajectory and an oil slick + """ + total_score = 0.8 * temporal_score + 1.4 * overlap_score + 5000.0 / frechet_dist + return total_score diff --git a/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py b/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py new file mode 100644 index 00000000..b96158c8 --- /dev/null +++ b/cerulean_cloud/cloud_function_ais_analysis/utils/trajectory.py @@ -0,0 +1,102 @@ +""" +Utilities for working with AIS trajectories +""" + +import geopandas as gpd +import movingpandas as mpd +import numpy as np +import pandas as pd +import shapely.geometry +import shapely.ops + + +def ais_points_to_trajectories(ais: gpd.GeoDataFrame, time_vec: pd.DatetimeIndex): + """ + Convert a set of AIS points into trajectories, grouped by ssvid + For each trajectory, interpolate the position along a specified time vector + + Args: + ais (gpd.GeoDataFrame): AIS points with an SSVID field + time_vec (pd.DatetimeIndex): Time vector to interpolate to + Returns: + mpd.TrajectoryCollection: Trajectories with interpolated positions + """ + ais_trajectories = list() + for ssvid, group in ais.groupby("ssvid"): + if len(group) > 1: # ignore single points + # build trajectory + traj = mpd.Trajectory(df=group, traj_id=ssvid, t="timestamp") + + # interpolate/extrapolate to times in time_vec + times = list() + positions = list() + for t in time_vec: + pos = traj.interpolate_position_at(t) + times.append(t) + positions.append(pos) + + # store as trajectory + interpolated_traj = mpd.Trajectory( + df=gpd.GeoDataFrame( + {"timestamp": times, "geometry": positions}, crs=ais.crs + ), + traj_id=ssvid, + t="timestamp", + ) + + ais_trajectories.append(interpolated_traj) + + return mpd.TrajectoryCollection(ais_trajectories) + + +def buffer_trajectories( + ais: mpd.TrajectoryCollection, buf_vec: np.ndarray +) -> gpd.GeoDataFrame: + """ + Build conic buffers around each trajectory + Buffer is narrowest at the start and widest at the end + Weight is highest at the start and lowest at the end + + Args: + ais (mpd.TrajectoryCollection): Trajectories to buffer + buf_vec (np.ndarray): Buffer radii, in meters + weight_vec (np.ndarray): Weights for each buffer + Returns: + gpd.GeoDataFrame: Buffer polygons for every trajectory + List[gpd.GeoDataFrame]: Corresponding weighted buffer polygons + """ + ais_buf = list() + ais_weighted = list() + for traj in ais: + # grab points + points = traj.to_point_gdf() + points = points.sort_values(by="timestamp", ascending=False) + + # create buffered circles at points + ps = list() + for idx, buffer in enumerate(buf_vec): + ps.append(points.iloc[idx].geometry.buffer(buffer)) + + # create convex hulls from circles + n = range(len(ps) - 1) + convex_hulls = [ + shapely.geometry.MultiPolygon([ps[i], ps[i + 1]]).convex_hull for i in n + ] + + # weight convex hulls + weighted = list() + for cidx, c in enumerate(convex_hulls): + entry = dict() + entry["geometry"] = c + entry["weight"] = 1.0 / (cidx + 1) # weight is the inverse of the index + weighted.append(entry) + weighted = gpd.GeoDataFrame(weighted, crs=traj.crs) + ais_weighted.append(weighted) + + # create polygon from hulls + poly = shapely.ops.unary_union(convex_hulls) + ais_buf.append(poly) + + ais_buf = gpd.GeoDataFrame(geometry=ais_buf, crs=traj.crs) + + return ais_buf, ais_weighted diff --git a/cerulean_cloud/cloud_run_orchestrator/handler.py b/cerulean_cloud/cloud_run_orchestrator/handler.py index 6f1d36c7..c89fce88 100644 --- a/cerulean_cloud/cloud_run_orchestrator/handler.py +++ b/cerulean_cloud/cloud_run_orchestrator/handler.py @@ -27,6 +27,7 @@ from rasterio.merge import merge from cerulean_cloud.auth import api_key_auth +from cerulean_cloud.cloud_function_ais_analysis.main import add_to_aaa_queue from cerulean_cloud.cloud_run_offset_tiles.schema import ( InferenceResult, InferenceResultStack, @@ -448,6 +449,8 @@ async def _orchestrate( feat.get("properties").get("machine_confidence"), ) print(f"Added slick {slick}") + if merged_inferences.get("features"): + add_to_aaa_queue(sentinel1_grd.scene_id) end_time = datetime.now() print(f"End time: {end_time}") diff --git a/cerulean_cloud/database_client.py b/cerulean_cloud/database_client.py index 5ce2c9f7..0c9fa3f0 100644 --- a/cerulean_cloud/database_client.py +++ b/cerulean_cloud/database_client.py @@ -165,3 +165,45 @@ async def add_slick( machine_confidence=machine_confidence, ) return slick + + async def get_slicks_without_sources_from_scene_id(self, scene_id, active=True): + """ + Asynchronously queries the database to fetch slicks without associated sources for a given scene ID. + + Args: + scene_id (str): The ID of the scene for which slicks are needed. + active (bool): Flag to filter slicks based on their active status. Default is True. + + Returns: + list: A list of Slick objects that do not have associated sources and belong to the specified scene. + + Notes: + - The function uses SQLAlchemy for database queries. + - It joins multiple tables: `db.Slick`, `db.SlickToSource`, `db.OrchestratorRun`, and `db.Sentinel1Grd`. + - The query uses an outer join to filter out slicks that have associated sources. + """ + return ( + self.session.query(db.Slick) + .outerjoin(db.SlickToSource, db.Slick.id == db.SlickToSource.slick) + .filter(db.SlickToSource.slick is None) + .join(db.OrchestratorRun) + .join(db.Sentinel1Grd) + .filter(db.Sentinel1Grd.id == scene_id, db.Slick.active == active) + .all() + ) + + async def get_scene_from_id(self, scene_id): + """ + Asynchronously fetches a scene object from the database based on its ID. + + Args: + scene_id (str): The ID of the scene to fetch. + + Returns: + Scene Object: The fetched Sentinel1Grd scene object, or `None` if not found. + + Notes: + - The function delegates the actual database fetch operation to a generic `get()` function. + - It looks for a scene in the `db.Sentinel1Grd` table. + """ + return get(self.session, db.Sentinel1Grd, False, scene_id=scene_id) diff --git a/stack/cloud_function_ais_analysis.py b/stack/cloud_function_ais_analysis.py new file mode 100644 index 00000000..95e2a529 --- /dev/null +++ b/stack/cloud_function_ais_analysis.py @@ -0,0 +1,107 @@ +"""cloud function to find slick culprits from AIS tracks""" +import os +import time + +import database +import pulumi +from pulumi_gcp import cloudfunctions, cloudtasks, projects, serviceaccount, storage +from utils import construct_name + +stack = pulumi.get_stack() +# We will store the source code to the Cloud Function in a Google Cloud Storage bucket. +bucket = storage.Bucket( + construct_name("bucket-cloud-function"), + location="EU", + labels={"pulumi": "true", "environment": pulumi.get_stack()}, +) + +# Create the Queue for tasks +queue = cloudtasks.Queue( + construct_name("queue-cloud-tasks-ais-analysis"), + location=pulumi.Config("gcp").require("region"), + rate_limits=cloudtasks.QueueRateLimitsArgs( + max_concurrent_dispatches=1, + max_dispatches_per_second=1, + ), + retry_config=cloudtasks.QueueRetryConfigArgs( + max_attempts=1, + max_backoff="300s", + max_doublings=1, + max_retry_duration="4s", + min_backoff="60s", + ), + stackdriver_logging_config=cloudtasks.QueueStackdriverLoggingConfigArgs( + sampling_ratio=0.9, + ), +) + +function_name = construct_name("cloud-function-ais-analysis") +config_values = { + "DB_URL": database.sql_instance_url, + "GCP_PROJECT": pulumi.Config("gcp").require("project"), + "GCP_LOCATION": pulumi.Config("gcp").require("region"), + "QUEUE": queue.name, + "FUNCTION_NAME": function_name, + "API_KEY": pulumi.Config("cerulean-cloud").require("apikey"), + "IS_DRY_RUN": pulumi.Config("cerulean-cloud").require("dryrun_ais"), + "BQ_PROJECT_ID": "world-fishing-827", +} + +# The Cloud Function source code itself needs to be zipped up into an +# archive, which we create using the pulumi.AssetArchive primitive. +PATH_TO_SOURCE_CODE = "../cerulean_cloud/cloud_function_ais_analysis" +assets = {} +for file in os.listdir(PATH_TO_SOURCE_CODE): + location = os.path.join(PATH_TO_SOURCE_CODE, file) + asset = pulumi.FileAsset(path=location) + assets[file] = asset + +archive = pulumi.AssetArchive(assets=assets) + +# Create the single Cloud Storage object, which contains all of the function's +# source code. ("main.py" and "requirements.txt".) +source_archive_object = storage.BucketObject( + construct_name("source-cloud-function-ais-analysis"), + name="handler.py-%f" % time.time(), + bucket=bucket.name, + source=archive, +) + +# Assign access to cloud SQL +cloud_function_service_account = serviceaccount.Account( + construct_name("cloud-function"), + account_id=f"{stack}-cloud-function", + display_name="Service Account for cloud function.", +) +cloud_function_service_account_iam = projects.IAMMember( + construct_name("cloud-function-iam"), + project=pulumi.Config("gcp").require("project"), + role="projects/cerulean-338116/roles/cloudfunctionaisanalysisrole", + member=cloud_function_service_account.email.apply( + lambda email: f"serviceAccount:{email}" + ), +) + +fxn = cloudfunctions.Function( + function_name, + name=function_name, + entry_point="main", + environment_variables=config_values, + region=pulumi.Config("gcp").require("region"), + runtime="python38", + source_archive_bucket=bucket.name, + source_archive_object=source_archive_object.name, + trigger_http=True, + service_account_email=cloud_function_service_account.email, +) + +invoker = cloudfunctions.FunctionIamMember( + construct_name("cloud-function-ais-analysis-invoker"), + project=fxn.project, + region=fxn.region, + cloud_function=fxn.name, + role="roles/cloudfunctions.invoker", + member="allUsers", +) + +config_values["FUNCTION_URL"] = fxn.https_trigger_url diff --git a/stack/stack_config/Pulumi.production.yaml b/stack/stack_config/Pulumi.production.yaml index da26c981..d18f3efb 100644 --- a/stack/stack_config/Pulumi.production.yaml +++ b/stack/stack_config/Pulumi.production.yaml @@ -3,8 +3,9 @@ config: aws:region: eu-central-1 cerulean-cloud:apikey: secure: v1:IuQ6n+tMG3oHe0pn:0/PQL0VoNoR2uCA9We2m9acc4nX73a0ViqSiHJhNe9X1srrZI4TvmsQZcuUrtzy4EoKSbQ== - cerulean-cloud:dryrun_historical: "True" - cerulean-cloud:dryrun_relevancy: "True" + cerulean-cloud:dryrun_historical: "" # Make empty "" to turn on historical runs + cerulean-cloud:dryrun_relevancy: "" # Make empty "" to turn on recurring runs + cerulean-cloud:dryrun_ais: "" # Make empty "" to turn on AIS analysis db:db-instance: db-g1-small db:deletion-protection: "True" db:db-password: diff --git a/stack/stack_config/Pulumi.staging.yaml b/stack/stack_config/Pulumi.staging.yaml index 934233c0..18f819ef 100644 --- a/stack/stack_config/Pulumi.staging.yaml +++ b/stack/stack_config/Pulumi.staging.yaml @@ -3,6 +3,7 @@ config: aws:region: eu-central-1 cerulean-cloud:dryrun_historical: "" # Make empty "" to turn on historical runs cerulean-cloud:dryrun_relevancy: "" # Make empty "" to turn on recurring runs + cerulean-cloud:dryrun_ais: "" # Make empty "" to turn on AIS analysis cerulean-cloud:apikey: secure: v1:GEqP+nWvrQb16t/f:uhLbrS+C7yFjQunkOnBOw9eZXt8JUS5dsiDTSH5hytK5vJ3zO8DJO5bOjQNBOKn/T5AWGA== db:db-instance: db-f1-micro diff --git a/stack/stack_config/Pulumi.test.yaml b/stack/stack_config/Pulumi.test.yaml index 3e160c47..e2764b42 100644 --- a/stack/stack_config/Pulumi.test.yaml +++ b/stack/stack_config/Pulumi.test.yaml @@ -3,6 +3,7 @@ config: aws:region: eu-central-1 cerulean-cloud:dryrun_historical: "True" # Make empty "" to turn on historical runs cerulean-cloud:dryrun_relevancy: "True" # Make empty "" to turn on recurring runs + cerulean-cloud:dryrun_ais: "" # Make empty "" to turn on AIS analysis cerulean-cloud:apikey: secure: v1:l5zmQowhnE9RJZC7:tyonyHOMqQ34wlv3eBD1LPUrqnJ0jMtWAzJdkJE5rp75r/qSG9CGWVeLM176tGOZz6G4sA== db:db-instance: db-f1-micro