diff --git a/src/step_function/execution_time_optimizer.py b/src/step_function/execution_time_optimizer.py new file mode 100644 index 0000000..c1a9596 --- /dev/null +++ b/src/step_function/execution_time_optimizer.py @@ -0,0 +1,99 @@ +from src.exception.step_function_error import StepFunctionError +from src.logger import logger + + +class ExecutionTimeOptimizer: + def __init__(self, workflow, function_tasks_dict, config): + self.workflow = workflow + self.function_tasks_dict = function_tasks_dict + self.memory_increment = config.memory_size_increment + self.execution_time_threshold = config.constraint_execution_time_threshold + + def optimize_for_execution_time_constraint(self): + """Optimize the step function for execution time constraints.""" + if self.execution_time_threshold is None: + logger.warning("No execution time threshold.") + return + + critical_path_tasks, critical_path_time = self.workflow.get_critical_path() + logger.info( + f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {self.execution_time_threshold}ms, cost: {self.workflow.get_cost()}." + ) + + cost_increases = self._initialize_cost_increases() + + while critical_path_time > self.execution_time_threshold: + time_reductions = self._calculate_time_reductions(critical_path_tasks) + best_function = self._find_best_function_to_optimize(cost_increases, time_reductions) + + if best_function: + self._update_memory_size_and_cost(best_function, cost_increases) + else: + raise StepFunctionError("Execution time threshold too low.") + + critical_path_tasks, critical_path_time = self.workflow.get_critical_path() + logger.debug( + f"Optimized function {best_function}, time: {critical_path_time}ms, cost: {self.workflow.get_cost()}.\n" + ) + + logger.info( + f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {self.execution_time_threshold}ms, cost: {self.workflow.get_cost()}.\n" + ) + self._print_memory_sizes() + + def _initialize_cost_increases(self): + """Initialize the cost increases for each function.""" + cost_increases = {} + for function in self.function_tasks_dict: + cost_increases[function] = 0.0 + for task in self.function_tasks_dict[function]: + original_cost = task.get_cost(task.memory_size) + new_cost = task.get_cost(task.memory_size + self.memory_increment) + cost_increases[function] += new_cost - original_cost + return cost_increases + + def _calculate_time_reductions(self, critical_path_tasks): + """Calculate time reductions for tasks on the critical path.""" + time_reductions = {} + for task in critical_path_tasks: + if task.memory_size + self.memory_increment > task.max_memory_size: + continue + + original_time = task.get_execution_time() + new_time = task.get_execution_time(task.memory_size + self.memory_increment) + + if task.function_name not in time_reductions: + time_reductions[task.function_name] = 0.0 + time_reductions[task.function_name] += original_time - new_time + return time_reductions + + def _find_best_function_to_optimize(self, cost_increases, time_reductions): + """Find the function with the lowest cost to time reduction ratio.""" + best_function = None + lowest_ratio = float('inf') + for function_name in time_reductions: + if time_reductions[function_name] > 0: + ratio = cost_increases[function_name] / time_reductions[function_name] + logger.debug( + f"ratio: {ratio}, {function_name}, {self.function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}" + ) + + if ratio < lowest_ratio: + lowest_ratio = ratio + best_function = function_name + return best_function + + def _update_memory_size_and_cost(self, best_function, cost_increases): + """Increase memory size of the best function and update cost increases.""" + cost_increases[best_function] = 0.0 + for task in self.function_tasks_dict[best_function]: + task.increase_memory_size(self.memory_increment) + original_cost = task.get_cost() + new_cost = task.get_cost(task.memory_size + self.memory_increment) + cost_increases[best_function] += new_cost - original_cost + + def _print_memory_sizes(self): + """Print memory sizes after optimization.""" + print("Finish optimizing step function for execution time, optimized memory sizes:") + for function in self.function_tasks_dict: + print(f"{function}: {self.function_tasks_dict[function][0].memory_size}MB") diff --git a/src/step_function/step_function.py b/src/step_function/step_function.py index e3e9de1..7d6db6f 100644 --- a/src/step_function/step_function.py +++ b/src/step_function/step_function.py @@ -10,6 +10,7 @@ from src.exploration.aws.aws_config_manager import AWSConfigManager from src.logger import logger from src.parrotfish import Parrotfish +from .execution_time_optimizer import ExecutionTimeOptimizer from .states import State, Task, Parallel, Map, Workflow @@ -31,7 +32,8 @@ def optimize(self): self._optimize_functions_in_parallel(self.function_tasks_dict) # optimize for execution time constraint - self._optimize_for_execution_time_constraint() + execution_time_optimizer = ExecutionTimeOptimizer(self.workflow, self.function_tasks_dict, self.config) + execution_time_optimizer.optimize_for_execution_time_constraint() def _load_definition(self, arn: str) -> dict: """ @@ -283,77 +285,77 @@ def _optimize_one_function(function_name: str, tasks: list[Task]) -> int: logger.info("Finish optimizing all functions\n\n") - def _optimize_for_execution_time_constraint(self): - workflow = self.workflow - function_tasks_dict = self.function_tasks_dict - memory_increment = self.config.memory_size_increment - constraint_execution_time_threshold = self.config.constraint_execution_time_threshold - - if constraint_execution_time_threshold is None: - logger.warning("No execution time threshold.") - return - - critical_path_tasks, critical_path_time = workflow.get_critical_path() - logger.info( - f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.") - - # Initialize cost increase dict - cost_increases = {} - for function in function_tasks_dict: - cost_increases[function] = 0.0 - for task in function_tasks_dict[function]: - original_cost = task.get_cost(task.memory_size) - new_cost = task.get_cost(task.memory_size + memory_increment) - cost_increases[function] += new_cost - original_cost - - # Update memory sizes until execution time threshold is reached - while critical_path_time > constraint_execution_time_threshold: - time_reductions = {} - - # Iterate over tasks on critical path and calculate time reductions for each function - for task in critical_path_tasks: - if task.memory_size + memory_increment > task.max_memory_size: - continue - - original_time = task.get_execution_time() - new_time = task.get_execution_time(task.memory_size + memory_increment) - - if task.function_name not in time_reductions: - time_reductions[task.function_name] = 0.0 - time_reductions[task.function_name] += original_time - new_time - - # Find the function with the lowest cost to time reduction ratio - best_function = None - lowest_ratio = float('inf') - for function_name in time_reductions: - if time_reductions[function_name] > 0: - ratio = cost_increases[function_name] / time_reductions[function_name] - logger.debug( - f"ratio: {ratio}, {function_name}, {function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}") - - if ratio < lowest_ratio: - lowest_ratio = ratio - best_function = function_name - - # Increase memory size of best function, update cost increases - if best_function: - cost_increases[best_function] = 0.0 - for task in function_tasks_dict[best_function]: - task.increase_memory_size(memory_increment) - original_cost = task.get_cost() - new_cost = task.get_cost(task.memory_size + memory_increment) - cost_increases[best_function] += new_cost - original_cost - else: - raise StepFunctionError("Execution time threshold too low.") - - # Update critical path and time - critical_path_tasks, critical_path_time = workflow.get_critical_path() - logger.debug( - f"Optimized function {best_function}, {task.memory_size}MB, time: {critical_path_time}ms, cost: {workflow.get_cost()}.\n") - - logger.info( - f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.\n") - - print("Finish optimizing step function for execution time, optimized memory sizes:") - for function in function_tasks_dict: - print(f"{function}: {function_tasks_dict[function][0].memory_size}MB") + # def _optimize_for_execution_time_constraint(self): + # workflow = self.workflow + # function_tasks_dict = self.function_tasks_dict + # memory_increment = self.config.memory_size_increment + # constraint_execution_time_threshold = self.config.constraint_execution_time_threshold + # + # if constraint_execution_time_threshold is None: + # logger.warning("No execution time threshold.") + # return + # + # critical_path_tasks, critical_path_time = workflow.get_critical_path() + # logger.info( + # f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.") + # + # # Initialize cost increase dict + # cost_increases = {} + # for function in function_tasks_dict: + # cost_increases[function] = 0.0 + # for task in function_tasks_dict[function]: + # original_cost = task.get_cost(task.memory_size) + # new_cost = task.get_cost(task.memory_size + memory_increment) + # cost_increases[function] += new_cost - original_cost + # + # # Update memory sizes until execution time threshold is reached + # while critical_path_time > constraint_execution_time_threshold: + # time_reductions = {} + # + # # Iterate over tasks on critical path and calculate time reductions for each function + # for task in critical_path_tasks: + # if task.memory_size + memory_increment > task.max_memory_size: + # continue + # + # original_time = task.get_execution_time() + # new_time = task.get_execution_time(task.memory_size + memory_increment) + # + # if task.function_name not in time_reductions: + # time_reductions[task.function_name] = 0.0 + # time_reductions[task.function_name] += original_time - new_time + # + # # Find the function with the lowest cost to time reduction ratio + # best_function = None + # lowest_ratio = float('inf') + # for function_name in time_reductions: + # if time_reductions[function_name] > 0: + # ratio = cost_increases[function_name] / time_reductions[function_name] + # logger.debug( + # f"ratio: {ratio}, {function_name}, {function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}") + # + # if ratio < lowest_ratio: + # lowest_ratio = ratio + # best_function = function_name + # + # # Increase memory size of best function, update cost increases + # if best_function: + # cost_increases[best_function] = 0.0 + # for task in function_tasks_dict[best_function]: + # task.increase_memory_size(memory_increment) + # original_cost = task.get_cost() + # new_cost = task.get_cost(task.memory_size + memory_increment) + # cost_increases[best_function] += new_cost - original_cost + # else: + # raise StepFunctionError("Execution time threshold too low.") + # + # # Update critical path and time + # critical_path_tasks, critical_path_time = workflow.get_critical_path() + # logger.debug( + # f"Optimized function {best_function}, {task.memory_size}MB, time: {critical_path_time}ms, cost: {workflow.get_cost()}.\n") + # + # logger.info( + # f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.\n") + # + # print("Finish optimizing step function for execution time, optimized memory sizes:") + # for function in function_tasks_dict: + # print(f"{function}: {function_tasks_dict[function][0].memory_size}MB") diff --git a/tests/step_function/test_execution_time_optimizer.py b/tests/step_function/test_execution_time_optimizer.py new file mode 100644 index 0000000..c767815 --- /dev/null +++ b/tests/step_function/test_execution_time_optimizer.py @@ -0,0 +1,110 @@ +import pytest +from unittest.mock import MagicMock + +from src.step_function.execution_time_optimizer import ExecutionTimeOptimizer + + +class TestInitializeCostIncreases: + @pytest.fixture + def optimizer(self): + workflow = MagicMock() + config = MagicMock(memory_size_increment=128) + + function_tasks_dict = { + 'func1': [MagicMock(memory_size=128, get_cost=MagicMock(side_effect=lambda mem_size: 0.02 if mem_size == 128 else 0.025))], + 'func2': [MagicMock(memory_size=256, get_cost=MagicMock(side_effect=lambda mem_size: 0.04 if mem_size == 256 else 0.05))] + } + + optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config) + return optimizer + + def test_initialize_cost_increases(self, optimizer): + # Action + cost_increases = optimizer._initialize_cost_increases() + + # Assert + assert cost_increases == {'func1': 0.025 - 0.02, 'func2': 0.05 - 0.04} + + +class TestCalculateTimeReductions: + @pytest.fixture + def optimizer(self): + workflow = MagicMock() + config = MagicMock(memory_size_increment=128) + + task1 = MagicMock() + task1.function_name = 'function1' + task1.memory_size = 128 + task1.max_memory_size = 256 + task1.get_execution_time.side_effect = [100, 80] + + task2 = MagicMock() + task2.function_name = 'function2' + task2.memory_size = 256 + task2.max_memory_size = 512 + task2.get_execution_time.side_effect = [200, 150] + + optimizer = ExecutionTimeOptimizer(workflow, {}, config) + return optimizer, [task1, task2] + + def test_calculate_time_reductions(self, optimizer): + # Arrange + optimizer_instance, critical_path_tasks = optimizer + + # Action + time_reductions = optimizer_instance._calculate_time_reductions(critical_path_tasks) + + # Assert + assert time_reductions['function1'] == 20 + assert time_reductions['function2'] == 50 + + +class TestFindBestFunctionToOptimize: + @pytest.fixture + def optimizer(self): + workflow = MagicMock() + config = MagicMock(memory_size_increment=128) + function_tasks_dict = MagicMock(memory_size_increment=128) + + optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config) + return optimizer + + def test_find_best_function_to_optimize(self, optimizer): + # Arrange + cost_increases = {'func1': 5, 'func2': 1} + time_reductions = {'func1': 20, 'func2': 50} + + # Action + best_function = optimizer._find_best_function_to_optimize(cost_increases, time_reductions) + + # Assert + assert best_function == 'func2' # 5 / 20 > 1 / 50 + +class TestUpdateMemorySizeAndCost: + @pytest.fixture + def optimizer(self): + workflow = MagicMock() + config = MagicMock(memory_size_increment=128) + + task1 = MagicMock() + task1.memory_size = 128 + task1.get_cost.side_effect = [0.02, 0.025] + task1.increase_memory_size = MagicMock() + + function_tasks_dict = {'func1': [task1]} + + optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config) + return optimizer, task1 + + def test_update_memory_size_and_cost(self, optimizer): + # Arrange + optimizer_instance, task1 = optimizer + cost_increases = {'func1': 0.0} + + # Action + optimizer_instance._update_memory_size_and_cost('func1', cost_increases) + + # Assert + task1.increase_memory_size.assert_called_once_with(128) # Check memory increment + assert cost_increases['func1'] == 0.025 - 0.02 + diff --git a/tests/step_function/test_step_function.py b/tests/step_function/test_step_function.py index 08c1c13..8595e28 100644 --- a/tests/step_function/test_step_function.py +++ b/tests/step_function/test_step_function.py @@ -108,51 +108,4 @@ def test_create_workflow_task_state(self, mock_config_manager, step_function): assert isinstance(workflow.states[2], Parallel) assert len(workflow.states[2].branches) == 2 -class TestExecutionTimeConstraintOptimization: - def create_step_function(self): - step_function = StepFunction() # Replace with your actual class - step_function.config = MagicMock() - step_function.workflow = MagicMock() - step_function.function_tasks_dict = {} - step_function.config.memory_size_increment = 128 - return step_function - - def create_mock_task(self, memory_size, max_memory_size, execution_time, cost): - """Helper function to create a mock task with specified properties.""" - task = MagicMock() - task.memory_size = memory_size - task.max_memory_size = max_memory_size - task.get_execution_time = MagicMock(return_value=execution_time) - task.get_cost = MagicMock(side_effect=lambda memory: cost) - task.function_name = 'function1' - return task - - def test_threshold_high(self): - # Arrange - step_function = self.create_step_function() - step_function.config.constraint_execution_time_threshold = 300 - task = self.create_mock_task(128, 512, 250, 0.01) - step_function.function_tasks_dict = {'function1': [task]} - step_function.workflow.get_critical_path = MagicMock(return_value=([task], 250)) - - # Action - step_function._optimize_for_execution_time_constraint() - - # Assert - task.increase_memory_size.assert_not_called() - - def test_threshold_low(self): - # Arrange - step_function = self.create_step_function() - step_function.config.constraint_execution_time_threshold = 50 - task = self.create_mock_task(128, 512, 250, 0.01) - step_function.function_tasks_dict = {'function1': [task]} - - # Action - step_function.workflow.get_critical_path = MagicMock(return_value=([task], 250)) - - # Assert - with pytest.raises(StepFunctionError) as e: - step_function._optimize_for_execution_time_constraint() - assert e.type == StepFunctionError