Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parrotfish for Step Function, Execution Time Constraint Optimization #164

Merged
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,3 @@ venv/
.coverage

dump/

src/step_function/test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"arn": "arn:aws:states:us-west-2:898429789601:stateMachine:ImageProcessing",
"region": "us-west-2",
"payload": {}
"arn": "arn:aws:states:us-west-2:898429789601:stateMachine:ImageProcessing",
"region": "us-west-2",
"payload": {}
}
6 changes: 5 additions & 1 deletion src/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ dynamic_sampling_params.max_sample_count=5
and when the calculated coefficient of variation reaches this threshold we terminate the dynamic sampling (Default is 0.05),
} (Optional),
"max_number_of_invocation_attempts": The maximum number of attempts per invocation when this number is reached an error is raised. (Optional, Default is 5)
"constraint_execution_time_threshold": The step function execution time threshold constraint. We leverages the execution time model and step function workflow structure
to recommend a configuration that minimizes cost while adhering to the specified execution time constraint. (Optional, Default is +infinity)
"memory_size_increment": The step size by which memory size is increased to meet execution time threshold. (Optional, Default is 10)
}
```

Expand All @@ -121,7 +124,8 @@ dynamic_sampling_params.max_sample_count=5
{
"arn": "example_step_function_arn",
"region": "example_region",
"payload": "payload"
"payload": "payload",
"constraint_execution_time_threshold": 5000
}
```

1 change: 1 addition & 0 deletions src/configuration/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
MAX_TOTAL_SAMPLE_COUNT = 20
MIN_SAMPLE_PER_CONFIG = 4
TERMINATION_THRESHOLD = 3
MEMORY_SIZE_INCREMENT = 10

LOG_LEVEL = logging.WARNING
6 changes: 5 additions & 1 deletion src/configuration/step_function_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class StepFunctionConfiguration:
def __init__(self, config_file: TextIO):
def __init__(self, config_file: Union[TextIO, dict]):
self._load_config_schema()

# Setup default values
Expand All @@ -16,6 +16,8 @@ def __init__(self, config_file: TextIO):
self.max_total_sample_count = MAX_TOTAL_SAMPLE_COUNT
self.min_sample_per_config = MIN_SAMPLE_PER_CONFIG
self.max_number_of_invocation_attempts = MAX_NUMBER_OF_INVOCATION_ATTEMPTS
self.memory_size_increment = MEMORY_SIZE_INCREMENT
self.constraint_execution_time_threshold = None

# Parse the configuration file
self._deserialize(config_file)
Expand Down Expand Up @@ -58,6 +60,8 @@ def _load_config_schema(self):
},
},
"max_number_of_invocation_attempts": {"type": "integer", "minimum": 0},
"constraint_execution_time_threshold": {"type": "integer", "minimum": 1},
"memory_size_increment": {"type": "integer", "minimum": 1},
},
"required": ["arn", "region", "payload"],
"if": {"not": {"required": ["payload"]}},
Expand Down
3 changes: 3 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def main():
exit(1)

if args.step_function:
# Create step function
step_function = StepFunction(configuration)

# Run cost and execution time optimization
step_function.optimize()

else:
Expand Down
2 changes: 1 addition & 1 deletion src/objective/parametric_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ParametricFunction:
bounds (tuple): Lower and upper bounds on parameters.
"""

function: callable = lambda x, a0, a1, a2: a0 + a1 * np.exp(-x / a2)
function: callable = lambda x, a0, a1, a2: (a0 + a1 * np.exp(-x / a2)) if a2 != 0 else a0
bounds: tuple = ([-np.inf, -np.inf, -np.inf], [np.inf, np.inf, np.inf])
params: any = None

Expand Down
86 changes: 82 additions & 4 deletions src/step_function/states.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
from abc import ABC, abstractmethod
from typing import Tuple

import boto3

Expand All @@ -12,14 +13,22 @@ class State(ABC):
def __init__(self, name: str):
self.name = name

@abstractmethod
def get_cost(self) -> float:
pass


class Task(State):
"""Task: a single Lambda function """

def __init__(self, name: str, function_name: str):
super().__init__(name)
self.function_name = function_name
self.input = None
self.param_function = None
self.memory_size = None
self.initial_memory_size = None
self.max_memory_size = None

def set_input(self, input: str):
self.input = input
Expand All @@ -37,6 +46,29 @@ def get_output(self, aws_session: boto3.Session) -> str:
logger.debug(f"Finish invoking {self.function_name}, output: {output}")
return output

def increase_memory_size(self, increment: int):
self.memory_size += increment

def decrease_memory_size(self, decrement: int):
self.memory_size -= decrement

def reset_memory_size(self):
self.memory_size = self.initial_memory_size

def get_execution_time(self, memory_size: int = None):
if memory_size is not None:
execution_time = self.param_function(memory_size)
else:
execution_time = self.param_function(self.memory_size)
return execution_time

def get_cost(self, memory_size: int = None):
if memory_size is not None:
execution_time = memory_size * self.param_function(memory_size)
else:
execution_time = self.memory_size * self.param_function(self.memory_size)
return execution_time


class Parallel(State):
"""Parallel: parallel workflows (branches) with same input."""
Expand All @@ -48,19 +80,46 @@ def __init__(self, name: str):
def add_branch(self, workflow: "Workflow"):
self.branches.append(workflow)

def get_critical_path(self) -> Tuple[list[Task], float]:
"""Get tasks on critical path and execution time."""
max_time = 0.0
critical_path = None
for workflow in self.branches:
states, time = workflow.get_critical_path()
if time > max_time:
max_time = time
critical_path = states
return critical_path, max_time

def get_cost(self) -> float:
return sum(workflow.get_cost() for workflow in self.branches)


class Map(State):
"""Map: multiple same workflows with different inputs"""
"""Map: multiple same workflows with different inputs."""

def __init__(self, name: str):
super().__init__(name)
self.iterations: list[Workflow] = []
self.items_path = None
self.workflow_def = None
self.items_path = ""
self.iterations: list[Workflow] = []

def add_iteration(self, workflow: "Workflow"):
self.iterations.append(workflow)

def get_critical_path(self) -> Tuple[list[Task], float]:
max_time = 0.0
critical_path = None
for workflow in self.iterations:
states, time = workflow.get_critical_path()
if time > max_time:
max_time = time
critical_path = states
return critical_path, max_time

def get_cost(self) -> float:
return sum(workflow.get_cost() for workflow in self.iterations)


class Workflow:
"""A workflow, containing a sequence of states."""
Expand All @@ -70,3 +129,22 @@ def __init__(self):

def add_state(self, state: State):
self.states.append(state)

def get_critical_path(self) -> Tuple[list[Task], float]:
critical_path: list[Task] = []
total_time = 0.0

for state in self.states:
if isinstance(state, Task):
critical_path.append(state)
time = state.get_execution_time()
total_time += time
elif isinstance(state, (Parallel, Map)):
states, time = state.get_critical_path()
critical_path.extend(states)
total_time += time

return critical_path, total_time

def get_cost(self) -> float:
return sum(state.get_cost() for state in self.states)
Loading
Loading