Skip to content

Commit

Permalink
feat: multiple inputs to one function
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziyi Yang committed Aug 14, 2024
1 parent 734221b commit b83c3b3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
3 changes: 3 additions & 0 deletions src/exploration/aws/aws_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def __init__(

def _invoke_with_retry(self, payload: str) -> dict:
sleeping_interval = 1
memory_size = None
timeout = None

for _ in range(self.max_invocation_attempts):
try:
# Invoking the function and getting back the response log to parse.
Expand Down
3 changes: 2 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def main():
exit(1)

if args.step_function:
StepFunction(configuration)
step_function = StepFunction(configuration)
pass

else:
parrotfish = Parrotfish(configuration)
Expand Down
41 changes: 25 additions & 16 deletions src/step_function/step_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, config: any):
aws_region (str): AWS region name.
"""
self.config = config
self.function_task_dict = {}
self.function_tasks_dict = {}

self.aws_session = boto3.Session(region_name=config.region)
self.definition = self._load_definition(config.arn)
Expand Down Expand Up @@ -88,8 +88,9 @@ def _create_state(name, state_def: dict) -> State:
function_name = state_def["Parameters"]["FunctionName"]
task = Task(name, function_name)

### TODO: Deal with different inputs to same function
self.function_task_dict[function_name] = task
if function_name not in self.function_tasks_dict:
self.function_tasks_dict[function_name] = []
self.function_tasks_dict[function_name].append(task)

return task

Expand Down Expand Up @@ -156,7 +157,7 @@ def _set_memory_for_function(function_name: str, memory_size: int):
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(_set_memory_for_function, function_name, 3008)
for function_name in self.function_task_dict
for function_name in self.function_tasks_dict
]

for future in as_completed(futures):
Expand Down Expand Up @@ -280,24 +281,31 @@ def run_parrotfish_in_parallel(self):
Optimizes all Lambda functions using Parrotfish in parallel.
"""

def _optimize_function(task: Task):
def _optimize_function(tasks: list[Task]):
"""
Optimizes a single Lambda function using Parrotfish.
Args:
task (Task): The task representing the Lambda function to optimize.
tasks: The tasks corresponding to the Lambda function to optimize.
Returns:
tuple: The function name and its minimum memory configuration.
Raises:
Exception: If an error occurs during the optimization of the function.
"""
function_name = tasks[0].function_name
weight = 1.0 / len(tasks)

payloads = []
for task in tasks:
payloads.append({"payload": json.loads(task.input), "weight": weight})

config = {
"function_name": task.function_name,
"function_name": function_name,
"vendor": "AWS",
"region": self.config.region,
"payload": json.loads(task.input),
"payloads": payloads,
"termination_threshold": self.config.termination_threshold,
"max_total_sample_count": self.config.max_total_sample_count,
"min_sample_per_config": self.config.min_sample_per_config,
Expand All @@ -306,29 +314,30 @@ def _optimize_function(task: Task):
}
parrotfish = Parrotfish(Configuration(config))
min_memory = parrotfish.optimize(apply=False)
task.parrotfish = parrotfish
return task.function_name, min_memory
return function_name, min_memory, parrotfish

error = None
results = {}
min_memories = {}
parrotfishs = {}
logger.info("Start optimizing all functions")

# Execute Parrotfish on functions in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(_optimize_function, task)
for task in self.function_task_dict.values()]
futures = [executor.submit(_optimize_function, tasks)
for tasks in self.function_tasks_dict.values()]

for future in as_completed(futures):
try:
function_name, min_memories = future.result()
results[function_name] = min_memories
function_name, min_memory, parrotfish = future.result()
min_memories[function_name] = min_memory
parrotfishs[function_name] = parrotfish
except Exception as e:
logger.debug(e)
if error is None:
error = e
continue
logger.info("Finish optimizing all functions")
print(f"Finish optimizing all functions, {results}")
print(f"Finish optimizing all functions, {min_memories}")

if error:
raise error

0 comments on commit b83c3b3

Please sign in to comment.