Skip to content

Commit

Permalink
separate execution_time_optimizer and add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziyi Yang committed Aug 27, 2024
1 parent 026c37c commit 41febdb
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 122 deletions.
99 changes: 99 additions & 0 deletions src/step_function/execution_time_optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from src.exception.step_function_error import StepFunctionError
from src.logger import logger


class ExecutionTimeOptimizer:
def __init__(self, workflow, function_tasks_dict, config):
self.workflow = workflow
self.function_tasks_dict = function_tasks_dict
self.memory_increment = config.memory_size_increment
self.execution_time_threshold = config.constraint_execution_time_threshold

def optimize_for_execution_time_constraint(self):
"""Optimize the step function for execution time constraints."""
if self.execution_time_threshold is None:
logger.warning("No execution time threshold.")
return

critical_path_tasks, critical_path_time = self.workflow.get_critical_path()
logger.info(
f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {self.execution_time_threshold}ms, cost: {self.workflow.get_cost()}."
)

cost_increases = self._initialize_cost_increases()

while critical_path_time > self.execution_time_threshold:
time_reductions = self._calculate_time_reductions(critical_path_tasks)
best_function = self._find_best_function_to_optimize(cost_increases, time_reductions)

if best_function:
self._update_memory_size_and_cost(best_function, cost_increases)
else:
raise StepFunctionError("Execution time threshold too low.")

critical_path_tasks, critical_path_time = self.workflow.get_critical_path()
logger.debug(
f"Optimized function {best_function}, time: {critical_path_time}ms, cost: {self.workflow.get_cost()}.\n"
)

logger.info(
f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {self.execution_time_threshold}ms, cost: {self.workflow.get_cost()}.\n"
)
self._print_memory_sizes()

def _initialize_cost_increases(self):
"""Initialize the cost increases for each function."""
cost_increases = {}
for function in self.function_tasks_dict:
cost_increases[function] = 0.0
for task in self.function_tasks_dict[function]:
original_cost = task.get_cost(task.memory_size)
new_cost = task.get_cost(task.memory_size + self.memory_increment)
cost_increases[function] += new_cost - original_cost
return cost_increases

def _calculate_time_reductions(self, critical_path_tasks):
"""Calculate time reductions for tasks on the critical path."""
time_reductions = {}
for task in critical_path_tasks:
if task.memory_size + self.memory_increment > task.max_memory_size:
continue

original_time = task.get_execution_time()
new_time = task.get_execution_time(task.memory_size + self.memory_increment)

if task.function_name not in time_reductions:
time_reductions[task.function_name] = 0.0
time_reductions[task.function_name] += original_time - new_time
return time_reductions

def _find_best_function_to_optimize(self, cost_increases, time_reductions):
"""Find the function with the lowest cost to time reduction ratio."""
best_function = None
lowest_ratio = float('inf')
for function_name in time_reductions:
if time_reductions[function_name] > 0:
ratio = cost_increases[function_name] / time_reductions[function_name]
logger.debug(
f"ratio: {ratio}, {function_name}, {self.function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}"
)

if ratio < lowest_ratio:
lowest_ratio = ratio
best_function = function_name
return best_function

def _update_memory_size_and_cost(self, best_function, cost_increases):
"""Increase memory size of the best function and update cost increases."""
cost_increases[best_function] = 0.0
for task in self.function_tasks_dict[best_function]:
task.increase_memory_size(self.memory_increment)
original_cost = task.get_cost()
new_cost = task.get_cost(task.memory_size + self.memory_increment)
cost_increases[best_function] += new_cost - original_cost

def _print_memory_sizes(self):
"""Print memory sizes after optimization."""
print("Finish optimizing step function for execution time, optimized memory sizes:")
for function in self.function_tasks_dict:
print(f"{function}: {self.function_tasks_dict[function][0].memory_size}MB")
152 changes: 77 additions & 75 deletions src/step_function/step_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.exploration.aws.aws_config_manager import AWSConfigManager
from src.logger import logger
from src.parrotfish import Parrotfish
from .execution_time_optimizer import ExecutionTimeOptimizer
from .states import State, Task, Parallel, Map, Workflow


Expand All @@ -31,7 +32,8 @@ def optimize(self):
self._optimize_functions_in_parallel(self.function_tasks_dict)

# optimize for execution time constraint
self._optimize_for_execution_time_constraint()
execution_time_optimizer = ExecutionTimeOptimizer(self.workflow, self.function_tasks_dict, self.config)
execution_time_optimizer.optimize_for_execution_time_constraint()

def _load_definition(self, arn: str) -> dict:
"""
Expand Down Expand Up @@ -283,77 +285,77 @@ def _optimize_one_function(function_name: str, tasks: list[Task]) -> int:

logger.info("Finish optimizing all functions\n\n")

def _optimize_for_execution_time_constraint(self):
workflow = self.workflow
function_tasks_dict = self.function_tasks_dict
memory_increment = self.config.memory_size_increment
constraint_execution_time_threshold = self.config.constraint_execution_time_threshold

if constraint_execution_time_threshold is None:
logger.warning("No execution time threshold.")
return

critical_path_tasks, critical_path_time = workflow.get_critical_path()
logger.info(
f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.")

# Initialize cost increase dict
cost_increases = {}
for function in function_tasks_dict:
cost_increases[function] = 0.0
for task in function_tasks_dict[function]:
original_cost = task.get_cost(task.memory_size)
new_cost = task.get_cost(task.memory_size + memory_increment)
cost_increases[function] += new_cost - original_cost

# Update memory sizes until execution time threshold is reached
while critical_path_time > constraint_execution_time_threshold:
time_reductions = {}

# Iterate over tasks on critical path and calculate time reductions for each function
for task in critical_path_tasks:
if task.memory_size + memory_increment > task.max_memory_size:
continue

original_time = task.get_execution_time()
new_time = task.get_execution_time(task.memory_size + memory_increment)

if task.function_name not in time_reductions:
time_reductions[task.function_name] = 0.0
time_reductions[task.function_name] += original_time - new_time

# Find the function with the lowest cost to time reduction ratio
best_function = None
lowest_ratio = float('inf')
for function_name in time_reductions:
if time_reductions[function_name] > 0:
ratio = cost_increases[function_name] / time_reductions[function_name]
logger.debug(
f"ratio: {ratio}, {function_name}, {function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}")

if ratio < lowest_ratio:
lowest_ratio = ratio
best_function = function_name

# Increase memory size of best function, update cost increases
if best_function:
cost_increases[best_function] = 0.0
for task in function_tasks_dict[best_function]:
task.increase_memory_size(memory_increment)
original_cost = task.get_cost()
new_cost = task.get_cost(task.memory_size + memory_increment)
cost_increases[best_function] += new_cost - original_cost
else:
raise StepFunctionError("Execution time threshold too low.")

# Update critical path and time
critical_path_tasks, critical_path_time = workflow.get_critical_path()
logger.debug(
f"Optimized function {best_function}, {task.memory_size}MB, time: {critical_path_time}ms, cost: {workflow.get_cost()}.\n")

logger.info(
f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.\n")

print("Finish optimizing step function for execution time, optimized memory sizes:")
for function in function_tasks_dict:
print(f"{function}: {function_tasks_dict[function][0].memory_size}MB")
# def _optimize_for_execution_time_constraint(self):
# workflow = self.workflow
# function_tasks_dict = self.function_tasks_dict
# memory_increment = self.config.memory_size_increment
# constraint_execution_time_threshold = self.config.constraint_execution_time_threshold
#
# if constraint_execution_time_threshold is None:
# logger.warning("No execution time threshold.")
# return
#
# critical_path_tasks, critical_path_time = workflow.get_critical_path()
# logger.info(
# f"Start optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.")
#
# # Initialize cost increase dict
# cost_increases = {}
# for function in function_tasks_dict:
# cost_increases[function] = 0.0
# for task in function_tasks_dict[function]:
# original_cost = task.get_cost(task.memory_size)
# new_cost = task.get_cost(task.memory_size + memory_increment)
# cost_increases[function] += new_cost - original_cost
#
# # Update memory sizes until execution time threshold is reached
# while critical_path_time > constraint_execution_time_threshold:
# time_reductions = {}
#
# # Iterate over tasks on critical path and calculate time reductions for each function
# for task in critical_path_tasks:
# if task.memory_size + memory_increment > task.max_memory_size:
# continue
#
# original_time = task.get_execution_time()
# new_time = task.get_execution_time(task.memory_size + memory_increment)
#
# if task.function_name not in time_reductions:
# time_reductions[task.function_name] = 0.0
# time_reductions[task.function_name] += original_time - new_time
#
# # Find the function with the lowest cost to time reduction ratio
# best_function = None
# lowest_ratio = float('inf')
# for function_name in time_reductions:
# if time_reductions[function_name] > 0:
# ratio = cost_increases[function_name] / time_reductions[function_name]
# logger.debug(
# f"ratio: {ratio}, {function_name}, {function_tasks_dict[function_name][0].memory_size}MB, {cost_increases[function_name]}, {time_reductions[function_name]}")
#
# if ratio < lowest_ratio:
# lowest_ratio = ratio
# best_function = function_name
#
# # Increase memory size of best function, update cost increases
# if best_function:
# cost_increases[best_function] = 0.0
# for task in function_tasks_dict[best_function]:
# task.increase_memory_size(memory_increment)
# original_cost = task.get_cost()
# new_cost = task.get_cost(task.memory_size + memory_increment)
# cost_increases[best_function] += new_cost - original_cost
# else:
# raise StepFunctionError("Execution time threshold too low.")
#
# # Update critical path and time
# critical_path_tasks, critical_path_time = workflow.get_critical_path()
# logger.debug(
# f"Optimized function {best_function}, {task.memory_size}MB, time: {critical_path_time}ms, cost: {workflow.get_cost()}.\n")
#
# logger.info(
# f"Finish optimizing step function for execution time, time: {critical_path_time}ms, threshold: {constraint_execution_time_threshold}ms, cost: {workflow.get_cost()}.\n")
#
# print("Finish optimizing step function for execution time, optimized memory sizes:")
# for function in function_tasks_dict:
# print(f"{function}: {function_tasks_dict[function][0].memory_size}MB")
110 changes: 110 additions & 0 deletions tests/step_function/test_execution_time_optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest
from unittest.mock import MagicMock

from src.step_function.execution_time_optimizer import ExecutionTimeOptimizer


class TestInitializeCostIncreases:
@pytest.fixture
def optimizer(self):
workflow = MagicMock()
config = MagicMock(memory_size_increment=128)

function_tasks_dict = {
'func1': [MagicMock(memory_size=128, get_cost=MagicMock(side_effect=lambda mem_size: 0.02 if mem_size == 128 else 0.025))],
'func2': [MagicMock(memory_size=256, get_cost=MagicMock(side_effect=lambda mem_size: 0.04 if mem_size == 256 else 0.05))]
}

optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config)
return optimizer

def test_initialize_cost_increases(self, optimizer):
# Action
cost_increases = optimizer._initialize_cost_increases()

# Assert
assert cost_increases == {'func1': 0.025 - 0.02, 'func2': 0.05 - 0.04}


class TestCalculateTimeReductions:
@pytest.fixture
def optimizer(self):
workflow = MagicMock()
config = MagicMock(memory_size_increment=128)

task1 = MagicMock()
task1.function_name = 'function1'
task1.memory_size = 128
task1.max_memory_size = 256
task1.get_execution_time.side_effect = [100, 80]

task2 = MagicMock()
task2.function_name = 'function2'
task2.memory_size = 256
task2.max_memory_size = 512
task2.get_execution_time.side_effect = [200, 150]

optimizer = ExecutionTimeOptimizer(workflow, {}, config)
return optimizer, [task1, task2]

def test_calculate_time_reductions(self, optimizer):
# Arrange
optimizer_instance, critical_path_tasks = optimizer

# Action
time_reductions = optimizer_instance._calculate_time_reductions(critical_path_tasks)

# Assert
assert time_reductions['function1'] == 20
assert time_reductions['function2'] == 50


class TestFindBestFunctionToOptimize:
@pytest.fixture
def optimizer(self):
workflow = MagicMock()
config = MagicMock(memory_size_increment=128)
function_tasks_dict = MagicMock(memory_size_increment=128)

optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config)
return optimizer

def test_find_best_function_to_optimize(self, optimizer):
# Arrange
cost_increases = {'func1': 5, 'func2': 1}
time_reductions = {'func1': 20, 'func2': 50}

# Action
best_function = optimizer._find_best_function_to_optimize(cost_increases, time_reductions)

# Assert
assert best_function == 'func2' # 5 / 20 > 1 / 50

class TestUpdateMemorySizeAndCost:
@pytest.fixture
def optimizer(self):
workflow = MagicMock()
config = MagicMock(memory_size_increment=128)

task1 = MagicMock()
task1.memory_size = 128
task1.get_cost.side_effect = [0.02, 0.025]
task1.increase_memory_size = MagicMock()

function_tasks_dict = {'func1': [task1]}

optimizer = ExecutionTimeOptimizer(workflow, function_tasks_dict, config)
return optimizer, task1

def test_update_memory_size_and_cost(self, optimizer):
# Arrange
optimizer_instance, task1 = optimizer
cost_increases = {'func1': 0.0}

# Action
optimizer_instance._update_memory_size_and_cost('func1', cost_increases)

# Assert
task1.increase_memory_size.assert_called_once_with(128) # Check memory increment
assert cost_increases['func1'] == 0.025 - 0.02

Loading

0 comments on commit 41febdb

Please sign in to comment.