Skip to content

Commit

Permalink
feat: increase parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed Nov 15, 2023
1 parent c2133ef commit a539630
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
7 changes: 2 additions & 5 deletions pipelines/rj_escritorio/flooding_detection/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)

with Flow(
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API (Dask)",
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API",
code_owners=[
"gabriel",
"diego",
Expand Down Expand Up @@ -107,10 +107,7 @@


rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
# rj_escritorio__flooding_detection__flow.executor = DaskExecutor(
# address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786"
# )
rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor()
rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor(num_workers=10)
rj_escritorio__flooding_detection__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
Expand Down
11 changes: 5 additions & 6 deletions pipelines/rj_escritorio/flooding_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
# TODO: Make it resilient to camera failures
import base64
from datetime import datetime, timedelta
import io
Expand Down Expand Up @@ -60,15 +59,15 @@ def get_openai_api_key(secret_path: str) -> str:
return secret["api_key"]


@task(nout=3)
@task
def get_prediction(
camera_with_image: Dict[str, Union[str, float]],
flooding_prompt: str,
openai_api_key: str,
openai_api_model: str,
openai_api_max_tokens: int = 300,
openai_api_url: str = "https://api.openai.com/v1/chat/completions",
) -> Tuple[Dict[str, Union[str, float, bool]], str, Dict[str, Union[str, float]]]:
) -> Dict[str, Union[str, float, bool]]:
"""
Gets the flooding detection prediction from OpenAI API.
Expand Down Expand Up @@ -143,12 +142,12 @@ def get_prediction(


@task(
max_retries=3,
retry_delay=timedelta(seconds=5),
max_retries=2,
retry_delay=timedelta(seconds=1),
)
def get_snapshot(
camera: Dict[str, Union[str, float]],
) -> Tuple[str, Dict[str, Union[str, float]]]:
) -> Dict[str, Union[str, float]]:
"""
Gets a snapshot from a camera.
Expand Down

0 comments on commit a539630

Please sign in to comment.