Skip to content

Commit

Permalink
Deployment Solver Timeout (#316)
Browse files Browse the repository at this point in the history
* added timeout functionality + unit tests

* linting

* added timeout to stochastic algo

* linting
  • Loading branch information
pjavanrood authored Sep 11, 2024
1 parent 1ee692d commit 9995f32
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 15 deletions.
3 changes: 3 additions & 0 deletions caribou/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,6 @@

# Caribou Go Path
GO_PATH = pathlib.Path(__file__).parents[2].resolve() / "caribou-go"

# AWS Lambda Timeout
AWS_TIMEOUT_SECONDS = 850 # Lambda functions must terminate in 900 seconds, we leave 50 seconds as buffer time
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import time
from typing import Optional

from caribou.deployment_solver.deployment_algorithms.deployment_algorithm import DeploymentAlgorithm


class CoarseGrainedDeploymentAlgorithm(DeploymentAlgorithm):
def _run_algorithm(self) -> list[tuple[list[int], dict[str, float]]]:
deployments = self._generate_all_possible_coarse_deployments()
def _run_algorithm(self, timeout: float = float("inf")) -> list[tuple[list[int], dict[str, float]]]:
deployments = self._generate_all_possible_coarse_deployments(timeout)
return deployments

def _generate_all_possible_coarse_deployments(self) -> list[tuple[list[int], dict[str, float]]]:
def _generate_all_possible_coarse_deployments(
self, timeout: float = float("inf")
) -> list[tuple[list[int], dict[str, float]]]:
deployments = []
start_time = time.time()
for index_value in self._region_indexer.get_value_indices().values():
if (time.time() - start_time) >= timeout:
break
deployment = self._generate_and_check_deployment(index_value)
if deployment is not None:
deployments.append(deployment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Optional

from caribou.common.constants import (
AWS_TIMEOUT_SECONDS,
DEFAULT_MONITOR_COOLDOWN,
GLOBAL_TIME_ZONE,
TIME_FORMAT,
Expand Down Expand Up @@ -35,6 +36,7 @@ def __init__(
n_workers: int = 1,
record_transmission_execution_carbon: bool = False,
deployment_metrics_calculator_type: str = "simple",
lambda_timeout: bool = False,
):
self._workflow_config = workflow_config

Expand Down Expand Up @@ -85,13 +87,17 @@ def __init__(
for instance in range(self._number_of_instances)
]

self._timeout = AWS_TIMEOUT_SECONDS if lambda_timeout else float("inf")

def run(self, hours_to_run: Optional[list[str]] = None) -> None:
hour_to_run_to_result: dict[str, Any] = {"time_keys_to_staging_area_data": {}, "deployment_metrics": {}}
if hours_to_run is None:
hours_to_run = [None] # type: ignore
# The solver for every hour must terminate in `timeout_per_hour` seconds
timeout_per_hour = self._timeout / len(hours_to_run)
for hour_to_run in hours_to_run:
self._update_data_for_new_hour(hour_to_run)
deployments = self._run_algorithm()
deployments = self._run_algorithm(timeout=timeout_per_hour)
ranked_deployments = self._ranker.rank(deployments)
selected_deployment = self._select_deployment(ranked_deployments)
formatted_deployment = self._formatter.format(
Expand Down Expand Up @@ -128,7 +134,7 @@ def _add_expiry_date_to_results(self, hour_to_run_to_result: dict[str, Any]) ->
hour_to_run_to_result["expiry_time"] = expiry_date_str

@abstractmethod
def _run_algorithm(self) -> list[tuple[list[int], dict[str, float]]]:
def _run_algorithm(self, timeout: float = float("inf")) -> list[tuple[list[int], dict[str, float]]]:
raise NotImplementedError

def _select_deployment(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import time
from itertools import product
from typing import Optional

from caribou.deployment_solver.deployment_algorithms.deployment_algorithm import DeploymentAlgorithm


class FineGrainedDeploymentAlgorithm(DeploymentAlgorithm):
def _run_algorithm(self) -> list[tuple[list[int], dict[str, float]]]:
deployments = self._generate_all_possible_fine_deployments()
def _run_algorithm(self, timeout: float = float("inf")) -> list[tuple[list[int], dict[str, float]]]:
deployments = self._generate_all_possible_fine_deployments(timeout)
return deployments

def _generate_all_possible_fine_deployments(self) -> list[tuple[list[int], dict[str, float]]]:
def _generate_all_possible_fine_deployments(
self, timeout: float = float("inf")
) -> list[tuple[list[int], dict[str, float]]]:
deployments = []
all_combinations = product(
self._region_indexer.get_value_indices().values(),
repeat=self._number_of_instances,
)
start_time = time.time()
for deployment_tuple in all_combinations:
if (time.time() - start_time) > timeout:
break
deployment = self._generate_and_check_deployment(deployment_tuple)
if deployment is not None:
deployments.append(deployment)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import time
from copy import deepcopy
from typing import Optional

Expand All @@ -14,13 +15,15 @@ def __init__(
n_workers: int = 1,
record_transmission_execution_carbon: bool = False,
deployment_metrics_calculator_type: str = "simple",
lambda_timeout: bool = False,
) -> None:
super().__init__(
workflow_config,
expiry_time_delta_seconds,
n_workers,
record_transmission_execution_carbon,
deployment_metrics_calculator_type,
lambda_timeout=lambda_timeout,
)
self._setup()

Expand All @@ -40,22 +43,31 @@ def _setup(self) -> None:
for instance in range(self._number_of_instances):
self._max_number_combinations *= len(self._per_instance_permitted_regions[instance])

def _run_algorithm(self) -> list[tuple[list[int], dict[str, float]]]:
def _run_algorithm(self, timeout: float = float("inf")) -> list[tuple[list[int], dict[str, float]]]:
start_time = time.time()
remaining_time = timeout
self._best_deployment_metrics = deepcopy( # pylint: disable=attribute-defined-outside-init
self._home_deployment_metrics
)
deployments = self._generate_all_possible_coarse_deployments()
deployments = self._generate_all_possible_coarse_deployments(timeout=remaining_time)
if len(deployments) == 0:
deployments.append((self._home_deployment, self._home_deployment_metrics))
self._generate_stochastic_heuristic_deployments(deployments)
remaining_time -= time.time() - start_time
if remaining_time <= 0:
return deployments
self._generate_stochastic_heuristic_deployments(deployments, timeout=remaining_time)
return deployments

def _generate_stochastic_heuristic_deployments(self, deployments: list[tuple[list[int], dict[str, float]]]) -> None:
def _generate_stochastic_heuristic_deployments(
self, deployments: list[tuple[list[int], dict[str, float]]], timeout: float = float("inf")
) -> None:
start_time = time.time()

current_deployment = deepcopy(self._home_deployment)

generated_deployments: set[tuple[int, ...]] = {tuple(deployment) for deployment, _ in deployments}
for _ in range(self._num_iterations):
if len(generated_deployments) >= self._max_number_combinations:
if len(generated_deployments) >= self._max_number_combinations or (time.time() - start_time) >= timeout:
break

new_deployment = self._generate_new_deployment(current_deployment)
Expand All @@ -76,9 +88,14 @@ def _generate_stochastic_heuristic_deployments(self, deployments: list[tuple[lis

self._temperature *= 0.99

def _generate_all_possible_coarse_deployments(self) -> list[tuple[list[int], dict[str, float]]]:
def _generate_all_possible_coarse_deployments(
self, timeout: float = float("inf")
) -> list[tuple[list[int], dict[str, float]]]:
deployments = []
start_time = time.time()
for index_value in self._region_indexer.get_value_indices().values():
if (time.time() - start_time) > timeout:
break
deployment = self._generate_and_check_deployment(index_value)
if deployment is not None:
deployments.append(deployment)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from unittest.mock import MagicMock, patch
from caribou.deployment_solver.deployment_algorithms.coarse_grained_deployment_algorithm import (
Expand Down Expand Up @@ -35,6 +36,34 @@ def test_run_algorithm(self, mock_pool):
expected_result = [([1, 1, 1], {"metric1": 1.0, "metric2": 2.0})]
self.assertEqual(result, expected_result)

@patch("multiprocessing.Pool")
def test_run_algorithm_timeout(self, mock_pool):
# Create a mock for the _generate_and_check_deployment method
mock_generate_and_check_deployment = MagicMock()

results = [([1, 1, 1], {"metric1": 1.0, "metric2": 2.0}), ([0, 0, 0], {"metric1": 2.0, "metric2": 1.0})]

def func(*args, **kwargs):
time.sleep(2)
return results.pop(0)

mock_generate_and_check_deployment.side_effect = func

# Create a mock for the _region_indexer attribute
mock_region_indexer = MagicMock()
mock_region_indexer.get_value_indices.return_value = {1: 1, 0: 0}

# Set the mocks on the instance
self._algorithm._generate_and_check_deployment = mock_generate_and_check_deployment
self._algorithm._region_indexer = mock_region_indexer

# Call the _run_algorithm method
result = self._algorithm._run_algorithm(timeout=1)

# Check the result
expected_result = [([1, 1, 1], {"metric1": 1.0, "metric2": 2.0})]
self.assertEqual(result, expected_result)

def test_generate_and_check_deployment(self):
# Arrange
self._algorithm._per_instance_permitted_regions = [[0, 1, 2], [0, 1, 2]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
class ConcreteDeploymentAlgorithm(DeploymentAlgorithm):
def __init__(self, workflow_config):
self._input_manager = MagicMock()
self._timeout = float("inf")
pass

def _run_algorithm(self):
def _run_algorithm(self, timeout: float):
# Example implementation for testing
return [(["r1"], {"cost": 100})]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from unittest.mock import MagicMock, patch
from caribou.deployment_solver.deployment_algorithms.fine_grained_deployment_algorithm import (
Expand Down Expand Up @@ -48,6 +49,34 @@ def test_generate_all_possible_fine_deployments(self):
expected_result = [([1, 1], {"metric1": 1.0, "metric2": 2.0}), ([2, 1], {"metric1": 1.0, "metric2": 2.0})]
self.assertEqual(result, expected_result)

def test_generate_all_possible_fine_deployments_timeout(self):
# Arrange
self._algorithm._region_indexer = MagicMock()
self._algorithm._region_indexer.get_value_indices.return_value = {1: 1, 2: 2}
self._algorithm._number_of_instances = 2
self._algorithm._generate_and_check_deployment = MagicMock()

results = [
([1, 1], {"metric1": 1.0, "metric2": 2.0}),
None,
([2, 1], {"metric1": 1.0, "metric2": 2.0}),
None,
]

def func(*args, **kwargs):
time.sleep(2)
return results.pop(0)

self._algorithm._generate_and_check_deployment.side_effect = func

# Act
result = self._algorithm._generate_all_possible_fine_deployments(timeout=1)

# Assert
expected_result = [([1, 1], {"metric1": 1.0, "metric2": 2.0})]
self.assertEqual(result, expected_result)
self._algorithm._generate_and_check_deployment.assert_called_once()

def test_generate_and_check_deployment_positive(self):
# Arrange
self._algorithm._per_instance_permitted_regions = [[0, 1, 2], [0, 1, 2]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from caribou.deployment_solver.deployment_algorithms.stochastic_heuristic_deployment_algorithm import (
Expand Down Expand Up @@ -81,6 +82,48 @@ def test_generate_stochastic_heuristic_deployments(self):
expected_result = [([2, 2, 2], {"metric1": 2.0, "metric2": 3.0})]
self.assertEqual(result, expected_result)

def test_generate_stochastic_heuristic_deployments_timeout(self):
# Arrange
self._algorithm._home_deployment_metrics = {"metric1": 1.0, "metric2": 2.0}
self._algorithm._home_deployment = [1, 1, 1]
self._algorithm._num_iterations = 2
self._algorithm._generate_new_deployment = MagicMock()
self._algorithm._generate_new_deployment.side_effect = [[2, 2, 2], [1, 1, 1]]
self._algorithm._deployment_metrics_calculator = MagicMock()

results = [
{
"metric1": 2.0,
"metric2": 3.0,
}
]

def func(*args, **kwargs):
time.sleep(2)
if results:
return results.pop(0)
else:
raise Exception("Timeout was ignored!")

self._algorithm._deployment_metrics_calculator.calculate_deployment_metrics.side_effect = func
self._algorithm._is_hard_constraint_failed = MagicMock()
self._algorithm._is_hard_constraint_failed.return_value = False
self._algorithm._is_improvement = MagicMock()
self._algorithm._is_improvement.return_value = True
self._algorithm._number_of_instances = 3
self._algorithm._temperature = 0.99
self._algorithm._max_number_combinations = 10
self._algorithm._per_instance_permitted_regions = [[0, 1, 2], [0, 1, 2], [0, 1, 2]]

result = []

# Act
self._algorithm._generate_stochastic_heuristic_deployments(result, timeout=1)

# Assert
expected_result = [([2, 2, 2], {"metric1": 2.0, "metric2": 3.0})]
self.assertEqual(result, expected_result)

def test_is_improvement(self):
# Arrange
self._algorithm._ranker = MagicMock()
Expand Down

0 comments on commit 9995f32

Please sign in to comment.