Skip to content

Commit

Permalink
Create helper class for AIS tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaraphael committed Sep 23, 2023
1 parent a56563f commit a0a2501
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 280 deletions.
146 changes: 20 additions & 126 deletions cerulean_cloud/cloud_function_ais_analysis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,16 @@
import asyncio
import json
import os
from datetime import datetime, timedelta
from datetime import datetime

import pandas_gbq

# import shapely.geometry as sh # https://docs.aws.amazon.com/lambda/latest/dg/python-package.html
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

from cerulean_cloud.cloud_function_ais_analysis.utils.ais import AISConstructor
from cerulean_cloud.cloud_function_ais_analysis.utils.associate import (
associate_ais_to_slicks,
slicks_to_curves,
)
from cerulean_cloud.cloud_function_ais_analysis.utils.constants import (
AIS_BUFFER,
BUF_VEC,
D_FORMAT,
HOURS_AFTER,
HOURS_BEFORE,
NUM_TIMESTEPS,
T_FORMAT,
WEIGHT_VEC,
)
from cerulean_cloud.cloud_function_ais_analysis.utils.misc import (
build_time_vec,
get_utm_zone,
)
from cerulean_cloud.cloud_function_ais_analysis.utils.trajectory import (
ais_points_to_trajectories,
buffer_trajectories,
)
from cerulean_cloud.database_client import DatabaseClient, get_engine

# mypy: ignore-errors
Expand Down Expand Up @@ -75,7 +55,6 @@ async def handle_aaa_request(request):
Notes:
- The function gets scene information from the request.
- It uses the `DatabaseClient` for database operations.
- It calls `get_ais` and `automatic_ais_analysis` for AIS data retrieval and analysis.
"""
request_json = request.get_json()
if not request_json.get("dry_run"):
Expand All @@ -88,120 +67,35 @@ async def handle_aaa_request(request):
await db_client.get_slicks_without_sources_from_scene_id(scene_id)
)
if len(slicks_without_sources) > 0:
ais_traj, ais_buffered, ais_weighted, utm_zone = get_ais(s1)
if ais_traj:
ais_constructor = AISConstructor(s1)
ais_constructor.retrieve_ais()
if ais_constructor.ais_df:
ais_constructor.build_trajectories()
ais_constructor.buffer_trajectories()
for slick in slicks_without_sources:
automatic_ais_analysis(
slick, ais_traj, ais_buffered, ais_weighted, utm_zone
)
automatic_ais_analysis(ais_constructor, slick)
return "Success!"


async def get_ais(
s1, hours_before=HOURS_BEFORE, hours_after=HOURS_AFTER, ais_buffer=AIS_BUFFER
):
"""
Asynchronously fetches and processes AIS data.
Args:
s1 (Scene Object): The scene object for which AIS data is needed.
hours_before (int): The number of hours before the scene time to consider for AIS data.
hours_after (int): The number of hours after the scene time to consider for AIS data.
ais_buffer (float): The buffer distance around the scene geometry.
Returns:
tuple: A tuple containing ais_traj, ais_buffered, ais_weighted, and utm_zone.
Notes:
- AIS data is downloaded and then transformed into trajectories.
- The function also buffers and weighs the AIS trajectories.
"""
grd_buff = s1.geometry.buffer(ais_buffer)
ais = download_ais(s1.start_time, hours_before, hours_after, grd_buff)
time_vec = build_time_vec(s1.start_time, hours_before, hours_after, NUM_TIMESTEPS)
utm_zone = get_utm_zone(ais)
ais_traj = ais_points_to_trajectories(ais, time_vec)
ais_buffered, ais_weighted = buffer_trajectories(ais_traj, BUF_VEC, WEIGHT_VEC)

return ais_traj, ais_buffered, ais_weighted, utm_zone


def download_ais(
t_stamp,
hours_before,
hours_after,
poly,
):
"""
Downloads AIS data using a SQL query on BigQuery.
Args:
t_stamp (datetime): The timestamp for which AIS data is needed.
hours_before (int): The number of hours before the timestamp to consider for AIS data.
hours_after (int): The number of hours after the timestamp to consider for AIS data.
poly (str): The polygon geometry in WKT format to filter AIS data spatially.
Returns:
DataFrame: A Pandas DataFrame containing the downloaded AIS data.
Notes:
- The function uses Google's BigQuery Python client `pandas_gbq` to execute the SQL query.
- Make sure that the BigQuery project ID is set in the environment variable "BQ_PROJECT_ID".
def automatic_ais_analysis(ais_constructor, slick):
"""
sql = f"""
SELECT * FROM(
SELECT
seg.ssvid as ssvid,
seg.timestamp as timestamp,
seg.lon as lon,
seg.lat as lat,
seg.course as course,
seg.speed_knots as speed_knots,
ves.ais_identity.shipname_mostcommon.value as shipname,
ves.ais_identity.shiptype[SAFE_OFFSET(0)].value as shiptype,
ves.best.best_flag as flag,
ves.best.best_vessel_class as best_shiptype
FROM
`world-fishing-827.gfw_research.pipe_v20201001` as seg
LEFT JOIN
`world-fishing-827.gfw_research.vi_ssvid_v20230801` as ves
ON seg.ssvid = ves.ssvid
WHERE
seg._PARTITIONTIME between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), D_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), D_FORMAT)}'
AND seg.timestamp between '{datetime.strftime(t_stamp-timedelta(hours=hours_before), T_FORMAT)}' AND '{datetime.strftime(t_stamp+timedelta(hours=hours_after), T_FORMAT)}'
AND ST_COVEREDBY(ST_GEOGPOINT(seg.lon, seg.lat), ST_GeogFromText('{poly}'))
ORDER BY
seg.timestamp DESC
)
ORDER BY
ssvid, timestamp
"""
return pandas_gbq.read_gbq(sql, project_id=os.getenv("BQ_PROJECT_ID"))
Perform automatic analysis to associate AIS trajectories with slicks.

def automatic_ais_analysis(slick, ais_traj, ais_buffered, ais_weighted, utm_zone):
"""
Performs automatic AIS analysis for a given slick and AIS data.
Args:
slick (GeoDataFrame): The oil slick geometry.
ais_traj (GeoDataFrame): The AIS trajectories.
ais_buffered (GeoDataFrame): The buffered AIS trajectories.
ais_weighted (GeoDataFrame): The weighted AIS trajectories.
utm_zone (str): The UTM zone for coordinate transformation.
Parameters:
ais_constructor (AISTrajectoryAnalysis): An instance of the AISTrajectoryAnalysis class.
slick (GeoDataFrame): A GeoDataFrame containing the slick geometries.
Returns:
DataFrame: A Pandas DataFrame containing the AIS analysis results sorted by slick index and score.
Notes:
- The function performs geometry transformations and data association.
- It uses the helper functions `slicks_to_curves` and `associate_ais_to_slicks`.
GroupBy object: The AIS-slick associations sorted and grouped by slick index.
"""
slicks = slick.to_crs(utm_zone)
slicks = slick.to_crs(ais_constructor.ais_df.estimate_utm_crs())
slicks_clean, slicks_curves = slicks_to_curves(slicks)
slick_ais = associate_ais_to_slicks(
ais_traj, ais_buffered, ais_weighted, slicks_clean, slicks_curves
ais_constructor.ais_trajectories,
ais_constructor.ais_buffered,
ais_constructor.ais_weighted,
slicks_clean,
slicks_curves,
)
results = slick_ais.sort_values(
["slick_index", "slick_size", "total_score"], ascending=[True, False, False]
Expand Down
9 changes: 1 addition & 8 deletions cerulean_cloud/cloud_function_ais_analysis/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
asyncio==3.4.3
json==Package(s) not found
os==Package(s) not found
pandas_gbq==0.17.6
google-cloud-tasks==2.9.1
google-cloud-core==2.3.1
google-auth==1.35.0
sqlalchemy==1.4.32
datetime==Package(s) not found
math==Package(s) not found
centerline==1.0.1
geopandas==0.12.2
movingpandas==0.15rc1
numpy==1.21.5
scipy==1.8.0
shapely==2.0.1
earthengine-api==0.1.342
shapely==2.0.1
Loading

0 comments on commit a0a2501

Please sign in to comment.