From a5396308baba07127345caa2a242869d2f1499e7 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 10:42:30 -0300 Subject: [PATCH] feat: increase parallelism --- pipelines/rj_escritorio/flooding_detection/flows.py | 7 ++----- pipelines/rj_escritorio/flooding_detection/tasks.py | 11 +++++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index fcbf815ac..179985df4 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -28,7 +28,7 @@ ) with Flow( - name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API (Dask)", + name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API", code_owners=[ "gabriel", "diego", @@ -107,10 +107,7 @@ rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# rj_escritorio__flooding_detection__flow.executor = DaskExecutor( -# address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786" -# ) -rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor() +rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor(num_workers=10) rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 4fd6af7a3..cd0ecb66c 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# TODO: Make it resilient to camera failures import base64 from datetime import datetime, timedelta import io @@ -60,7 +59,7 @@ def get_openai_api_key(secret_path: str) -> str: return secret["api_key"] -@task(nout=3) +@task def get_prediction( camera_with_image: Dict[str, Union[str, float]], flooding_prompt: str, @@ -68,7 +67,7 @@ def get_prediction( openai_api_model: str, openai_api_max_tokens: int = 300, openai_api_url: str = "https://api.openai.com/v1/chat/completions", -) -> Tuple[Dict[str, Union[str, float, bool]], str, Dict[str, Union[str, float]]]: +) -> Dict[str, Union[str, float, bool]]: """ Gets the flooding detection prediction from OpenAI API. @@ -143,12 +142,12 @@ def get_prediction( @task( - max_retries=3, - retry_delay=timedelta(seconds=5), + max_retries=2, + retry_delay=timedelta(seconds=1), ) def get_snapshot( camera: Dict[str, Union[str, float]], -) -> Tuple[str, Dict[str, Union[str, float]]]: +) -> Dict[str, Union[str, float]]: """ Gets a snapshot from a camera.