Skip to content

Commit

Permalink
Separate AAA Queuer to own file to simplify cross imports
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaraphael committed Sep 25, 2023
1 parent 3269dc4 commit bc6d3d7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 72 deletions.
70 changes: 0 additions & 70 deletions cerulean_cloud/cloud_function_ais_analysis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@
"""

import asyncio
import json
import os
from datetime import datetime

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

from cerulean_cloud.database_client import DatabaseClient, get_engine

from .utils.ais import AISConstructor
from .utils.associate import associate_ais_to_slicks, slicks_to_curves

# mypy: ignore-errors


def main(request):
"""
Expand Down Expand Up @@ -99,66 +92,3 @@ def automatic_ais_analysis(ais_constructor, slick):
["slick_index", "slick_size", "total_score"], ascending=[True, False, False]
).groupby("slick_index")
return results


def add_to_aaa_queue(scene_id):
"""
Adds a new task to Google Cloud Tasks for automatic AIS analysis.
Args:
scene_id (str): The ID of the scene for which AIS analysis is needed.
Returns:
google.cloud.tasks_v2.types.Task: The created Task object.
Notes:
- The function uses Google Cloud Tasks API to schedule the AIS analysis.
- Multiple retries are scheduled with different delays.
"""
# Create a client.
client = tasks_v2.CloudTasksClient()

project = os.getenv("GCP_PROJECT")
location = os.getenv("GCP_LOCATION")
queue = os.getenv("QUEUE")
url = os.getenv("FUNCTION_URL")
dry_run = bool(os.getenv("IS_DRY_RUN"))

# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)

# Construct the request body.
payload = {"sceneid": scene_id, "dry_run": dry_run}

task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url, # The url path that the task will be sent to.
},
"headers": {
"Content-type": "application/json",
"Authorization": f"Bearer {os.getenv('API_KEY')}",
},
"body": json.dumps(payload).encode(),
}

# Number of days that the Automatic AIS Analysis should be run after
# Each entry is another retry
ais_delays = [0, 3, 7] # TODO Magic number >>> Where should this live?
for delay in ais_delays:
d = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
days=delay
)

# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)

# Add the timestamp to the tasks.
task["schedule_time"] = timestamp

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

print("Created task {}".format(response.name))
return response
74 changes: 74 additions & 0 deletions cerulean_cloud/cloud_function_ais_analysis/queuer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Code for handling queue requests for Automatic AIS Analysis
"""
import json
import os
from datetime import datetime

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

# mypy: ignore-errors


def add_to_aaa_queue(scene_id):
"""
Adds a new task to Google Cloud Tasks for automatic AIS analysis.
Args:
scene_id (str): The ID of the scene for which AIS analysis is needed.
Returns:
google.cloud.tasks_v2.types.Task: The created Task object.
Notes:
- The function uses Google Cloud Tasks API to schedule the AIS analysis.
- Multiple retries are scheduled with different delays.
"""
# Create a client.
client = tasks_v2.CloudTasksClient()

project = os.getenv("GCP_PROJECT")
location = os.getenv("GCP_LOCATION")
queue = os.getenv("QUEUE")
url = os.getenv("FUNCTION_URL")
dry_run = bool(os.getenv("IS_DRY_RUN"))

# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)

# Construct the request body.
payload = {"sceneid": scene_id, "dry_run": dry_run}

task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url, # The url path that the task will be sent to.
},
"headers": {
"Content-type": "application/json",
"Authorization": f"Bearer {os.getenv('API_KEY')}",
},
"body": json.dumps(payload).encode(),
}

# Number of days that the Automatic AIS Analysis should be run after
# Each entry is another retry
ais_delays = [0, 3, 7] # TODO Magic number >>> Where should this live?
for delay in ais_delays:
d = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
days=delay
)

# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)

# Add the timestamp to the tasks.
task["schedule_time"] = timestamp

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

print("Created task {}".format(response.name))
return response
2 changes: 1 addition & 1 deletion cerulean_cloud/cloud_run_orchestrator/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from rasterio.merge import merge

from cerulean_cloud.auth import api_key_auth
from cerulean_cloud.cloud_function_ais_analysis.main import add_to_aaa_queue
from cerulean_cloud.cloud_function_ais_analysis.queuer import add_to_aaa_queue
from cerulean_cloud.cloud_run_offset_tiles.schema import (
InferenceResult,
InferenceResultStack,
Expand Down
4 changes: 3 additions & 1 deletion cerulean_cloud/cloud_run_orchestrator/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ global-land-mask
libpysal
geopandas
pygeos
networkx
networkx
google-cloud-tasks
protobuf

0 comments on commit bc6d3d7

Please sign in to comment.