Skip to content

Commit

Permalink
Merge pull request #563 from prefeitura-rio/feat/deteccao-alagamento-…
Browse files Browse the repository at this point in the history
…dask

[ENHANCEMENT] Usar Dask para execução da pipeline de detecção de alagamentos
  • Loading branch information
gabriel-milan authored Nov 15, 2023
2 parents fc92660 + a539630 commit cd5fb63
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 107 deletions.
22 changes: 16 additions & 6 deletions pipelines/rj_escritorio/flooding_detection/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
Flow definition for flooding detection using AI.
"""
from prefect import Parameter
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.control_flow.filter import FilterTask
from prefect.utilities.edges import unmapped

from pipelines.constants import constants
Expand All @@ -21,12 +23,17 @@
)
from pipelines.utils.decorators import Flow

filter_results = FilterTask(
filter_func=lambda x: not isinstance(x, (BaseException, type(None)))
)

with Flow(
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API",
code_owners=[
"gabriel",
"diego",
],
skip_if_running=True,
) as rj_escritorio__flooding_detection__flow:
# Parameters
cameras_geodf_url = Parameter(
Expand Down Expand Up @@ -76,28 +83,31 @@
number_mock_rain_cameras=mocked_cameras_number,
)
openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path)
images = get_snapshot.map(
cameras_with_image = get_snapshot.map(
camera=cameras,
)
predictions = get_prediction.map(
image=images,
cameras_with_image = filter_results(cameras_with_image)
cameras_with_image_and_classification = get_prediction.map(
camera_with_image=cameras_with_image,
flooding_prompt=unmapped(openai_flooding_detection_prompt),
openai_api_key=unmapped(openai_api_key),
openai_api_model=unmapped(openai_api_model),
openai_api_max_tokens=unmapped(openai_api_max_tokens),
openai_api_url=unmapped(openai_api_url),
)
cameras_with_image_and_classification = filter_results(
cameras_with_image_and_classification
)
update_flooding_api_data(
predictions=predictions,
cameras=cameras,
images=images,
cameras_with_image_and_classification=cameras_with_image_and_classification,
data_key=redis_key_flooding_detection_data,
last_update_key=redis_key_flooding_detection_last_update,
predictions_buffer_key=redis_key_predictions_buffer,
)


rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor(num_workers=10)
rj_escritorio__flooding_detection__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
Expand Down
106 changes: 59 additions & 47 deletions pipelines/rj_escritorio/flooding_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# -*- coding: utf-8 -*-
# TODO: Make it resilient to camera failures
import base64
from datetime import datetime, timedelta
import io
import json
from pathlib import Path
import random
from typing import Dict, List, Union
from typing import Dict, List, Tuple, Union

import cv2
import geopandas as gpd
Expand Down Expand Up @@ -62,7 +61,7 @@ def get_openai_api_key(secret_path: str) -> str:

@task
def get_prediction(
image: str,
camera_with_image: Dict[str, Union[str, float]],
flooding_prompt: str,
openai_api_key: str,
openai_api_model: str,
Expand All @@ -73,7 +72,14 @@ def get_prediction(
Gets the flooding detection prediction from OpenAI API.
Args:
image: The image in base64 format.
camera_with_image: The camera with image in the following format:
{
"id_camera": "1",
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
"image_base64": "base64...",
}
flooding_prompt: The flooding prompt.
openai_api_key: The OpenAI API key.
openai_api_model: The OpenAI API model.
Expand Down Expand Up @@ -107,7 +113,9 @@ def get_prediction(
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image}"},
"image_url": {
"url": f"data:image/jpeg;base64,{camera_with_image['image_base64']}"
},
},
],
}
Expand All @@ -122,20 +130,24 @@ def get_prediction(
json_string = content.replace("```json\n", "").replace("\n```", "")
json_object = json.loads(json_string)
flooding_detected = json_object["flooding_detected"]
return {
"object": "alagamento",
"label": flooding_detected,
"confidence": 0.7,
}
log(f"Successfully got prediction: {flooding_detected}")
camera_with_image["ai_classification"] = [
{
"object": "alagamento",
"label": flooding_detected,
"confidence": 0.7,
}
]
return camera_with_image


@task(
max_retries=3,
retry_delay=timedelta(seconds=5),
max_retries=2,
retry_delay=timedelta(seconds=1),
)
def get_snapshot(
camera: Dict[str, Union[str, float]],
) -> str:
) -> Dict[str, Union[str, float]]:
"""
Gets a snapshot from a camera.
Expand All @@ -162,7 +174,8 @@ def get_snapshot(
img.save(buffer, format="JPEG")
img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
log(f"Successfully got snapshot from URL {rtsp_url}.")
return img_b64
camera["image_base64"] = img_b64
return camera


@task
Expand Down Expand Up @@ -261,9 +274,7 @@ def pick_cameras(

@task
def update_flooding_api_data(
predictions: List[Dict[str, Union[str, float, bool]]],
cameras: List[Dict[str, Union[str, float]]],
images: List[str],
cameras_with_image_and_classification: List[Dict[str, Union[str, float, bool]]],
data_key: str,
last_update_key: str,
predictions_buffer_key: str,
Expand All @@ -272,40 +283,39 @@ def update_flooding_api_data(
Updates Redis keys with flooding detection data and last update datetime (now).
Args:
predictions: The AI predictions in the following format:
[
{
"object": "alagamento",
"label": True,
"confidence": 0.7,
},
...
]
cameras: A list of cameras in the following format:
[
{
"id_camera": "1",
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
},
...
]
images: A list of images in base64 format.
cameras_with_image_and_classification: The cameras with image and classification
in the following format:
[
{
"id_camera": "1",
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
"image_base64": "base64...",
"ai_classification": [
{
"object": "alagamento",
"label": True,
"confidence": 0.7,
}
],
},
...
]
data_key: The Redis key for the flooding detection data.
last_update_key: The Redis key for the last update datetime.
predictions_buffer_key: The Redis key for the predictions buffer.
"""
# Build API data
last_update = pendulum.now(tz="America/Sao_Paulo")
api_data = []
for prediction, camera, image in zip(predictions, cameras, images):
for camera_with_image_and_classification in cameras_with_image_and_classification:
# Get AI classifications
ai_classification = []
current_prediction = prediction["label"]
predictions_buffer_camera_key = (
f"{predictions_buffer_key}_{camera['id_camera']}"
)
current_prediction = camera_with_image_and_classification["ai_classification"][
0
]["label"]
predictions_buffer_camera_key = f"{predictions_buffer_key}_{camera_with_image_and_classification['id_camera']}" # noqa
predictions_buffer = redis_add_to_prediction_buffer(
predictions_buffer_camera_key, current_prediction
)
Expand All @@ -325,11 +335,13 @@ def update_flooding_api_data(
api_data.append(
{
"datetime": last_update.to_datetime_string(),
"id_camera": camera["id_camera"],
"url_camera": camera["url_camera"],
"latitude": camera["latitude"],
"longitude": camera["longitude"],
"image_base64": image,
"id_camera": cameras_with_image_and_classification["id_camera"],
"url_camera": cameras_with_image_and_classification["url_camera"],
"latitude": cameras_with_image_and_classification["latitude"],
"longitude": cameras_with_image_and_classification["longitude"],
"image_base64": cameras_with_image_and_classification[
"image_base64"
],
"ai_classification": ai_classification,
}
)
Expand Down
Loading

0 comments on commit cd5fb63

Please sign in to comment.