Skip to content

Commit

Permalink
Update assigner functions docstring for fix lint warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelhsantana committed Jul 4, 2024
1 parent ef296ae commit 2fc449d
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 97 deletions.
2 changes: 0 additions & 2 deletions openfl/component/assigner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Assigner package."""

from .assigner import Assigner
from .random_grouped_assigner import RandomGroupedAssigner
from .static_grouped_assigner import StaticGroupedAssigner


__all__ = [
'Assigner',
'RandomGroupedAssigner',
Expand Down
13 changes: 6 additions & 7 deletions openfl/component/assigner/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ class Assigner:
r"""The task assigner maintains a list of tasks.
Also it decides the policy for which collaborator should run those tasks.
There may be many types of policies implemented, but a natural place to start
There may be many types of policies implemented, but a natural place to start
is with a:
- RandomGroupedTaskAssigner :
- RandomGroupedTaskAssigner :
Given a set of task groups, and a percentage,
assign that task group to that percentage of collaborators in the federation.
After assigning the tasks to collaborator, those tasks should be carried
out each round (no reassignment between rounds).
- GroupedTaskAssigner :
Given task groups and a list of collaborators that belong to that task group,
- GroupedTaskAssigner :
Given task groups and a list of collaborators that belong to that task group,
carry out tasks for each round of experiment.
Attributes:
Expand All @@ -32,8 +32,7 @@ class Assigner:
\* - ``tasks`` argument is taken from ``tasks`` section of FL plan YAML file.
"""

def __init__(self, tasks, authorized_cols,
rounds_to_train, **kwargs):
def __init__(self, tasks, authorized_cols, rounds_to_train, **kwargs):
"""Initializes the Assigner.
Args:
Expand Down Expand Up @@ -81,7 +80,7 @@ def get_all_tasks_for_round(self, round_number):

def get_aggregation_type_for_task(self, task_name):
"""Extract aggregation type from self.tasks.
Args:
task_name (str): Name of the task.
Expand Down
56 changes: 31 additions & 25 deletions openfl/component/assigner/custom_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
"""Custom Assigner module."""


import logging
from collections import defaultdict

Expand All @@ -13,31 +12,33 @@

class Assigner:
"""Custom assigner class.
Attributes:
agg_functions_by_task (dict): Dictionary mapping tasks to their respective aggregation functions.
agg_functions_by_task_name (dict): Dictionary mapping task names to their respective aggregation functions.
agg_functions_by_task (dict): Dictionary mapping tasks to their
respective aggregation functions.
agg_functions_by_task_name (dict): Dictionary mapping task names to
their respective aggregation functions.
authorized_cols (list of str): List of authorized collaborators.
rounds_to_train (int): Number of rounds to train.
all_tasks_for_round (defaultdict): Dictionary mapping round numbers to tasks.
collaborators_for_task (defaultdict): Dictionary mapping round numbers to collaborators for each task.
collaborator_tasks (defaultdict): Dictionary mapping round numbers to tasks for each collaborator.
assigner_function (function): Function to assign tasks to collaborators.
all_tasks_for_round (defaultdict): Dictionary mapping round numbers
to tasks.
collaborators_for_task (defaultdict): Dictionary mapping round numbers
to collaborators for each task.
collaborator_tasks (defaultdict): Dictionary mapping round numbers
to tasks for each collaborator.
assigner_function (function): Function to assign tasks to
collaborators.
"""

def __init__(
self,
*,
assigner_function,
aggregation_functions_by_task,
authorized_cols,
rounds_to_train
):
def __init__(self, *, assigner_function, aggregation_functions_by_task,
authorized_cols, rounds_to_train):
"""Initialize the Custom assigner object.
Args:
assigner_function (function): Function to assign tasks to collaborators.
aggregation_functions_by_task (dict): Dictionary mapping tasks to their respective aggregation functions.
assigner_function (function): Function to assign tasks to
collaborators.
aggregation_functions_by_task (dict): Dictionary mapping tasks to
their respective aggregation functions.
authorized_cols (list of str): List of authorized collaborators.
rounds_to_train (int): Number of rounds to train.
"""
Expand All @@ -55,8 +56,9 @@ def __init__(
def define_task_assignments(self):
"""Define task assignments for each round and collaborator.
This method uses the assigner function to assign tasks to collaborators for
each round. It also maps tasks to their respective aggregation functions.
This method uses the assigner function to assign tasks to
collaborators for each round. It also maps tasks to their respective
aggregation functions.
Abstract method.
Expand Down Expand Up @@ -84,7 +86,7 @@ def define_task_assignments(self):

def get_tasks_for_collaborator(self, collaborator_name, round_number):
"""Get tasks for a specific collaborator in a specific round.
Abstract method.
Args:
Expand Down Expand Up @@ -115,14 +117,17 @@ def get_all_tasks_for_round(self, round_number):
Currently all tasks are performed on each round,
But there may be a reason to change this.
Args:
round_number (int): Round number.
Returns:
list: List of all tasks for the specified round.
"""
return [task.name for task in self.all_tasks_for_round[round_number].values()]
return [
task.name
for task in self.all_tasks_for_round[round_number].values()
]

def get_aggregation_type_for_task(self, task_name):
"""Get the aggregation type for a specific task (from self.tasks).
Expand All @@ -133,5 +138,6 @@ def get_aggregation_type_for_task(self, task_name):
Returns:
function: Aggregation function for the task.
"""
agg_fn = self.agg_functions_by_task_name.get(task_name, WeightedAverage())
agg_fn = self.agg_functions_by_task_name.get(task_name,
WeightedAverage())
return agg_fn
62 changes: 32 additions & 30 deletions openfl/component/assigner/random_grouped_assigner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Random grouped assigner module."""


import numpy as np

from .assigner import Assigner
Expand All @@ -13,17 +11,18 @@ class RandomGroupedAssigner(Assigner):
r"""The task assigner maintains a list of tasks.
Also it decides the policy for which collaborator should run those tasks
There may be many types of policies implemented, but a natural place to start
is with a:
There may be many types of policies implemented, but a natural place to
start is with a:
- RandomGroupedAssigner :
- RandomGroupedAssigner :
Given a set of task groups, and a percentage,
assign that task group to that percentage of collaborators in the federation.
After assigning the tasks to collaborator, those tasks should be carried
out each round (no reassignment between rounds).
- GroupedAssigner :
Given task groups and a list of collaborators that belong to that task group,
carry out tasks for each round of experiment.
assign that task group to that percentage of collaborators in the
federation.
After assigning the tasks to collaborator, those tasks should be
carried out each round (no reassignment between rounds).
- GroupedAssigner :
Given task groups and a list of collaborators that belong to that
task group, carry out tasks for each round of experiment.
Attributes:
task_groups* (list of object): Task groups to assign.
Expand All @@ -45,7 +44,8 @@ def __init__(self, task_groups, **kwargs):
def define_task_assignments(self):
"""Define task assignments for each round and collaborator.
This method uses the assigner function to assign tasks to collaborators for each round.
This method uses the assigner function to assign tasks to
collaborators for each round.
It also maps tasks to their respective aggregation functions.
Args:
Expand All @@ -54,16 +54,16 @@ def define_task_assignments(self):
Returns:
None
"""
assert (np.abs(1.0 - np.sum([group['percentage']
for group in self.task_groups])) < 0.01), (
'Task group percentages must sum to 100%')
assert (
np.abs(1.0 -
np.sum([group['percentage'] for group in self.task_groups]))
< 0.01), ('Task group percentages must sum to 100%')

# Start by finding all of the tasks in all specified groups
self.all_tasks_in_groups = list({
task
for group in self.task_groups
for task in group['tasks']
})
self.all_tasks_in_groups = list(
{task
for group in self.task_groups
for task in group['tasks']})

# Initialize the map of collaborators for a given task on a given round
for task in self.all_tasks_in_groups:
Expand All @@ -76,28 +76,30 @@ def define_task_assignments(self):

col_list_size = len(self.authorized_cols)
for round_num in range(self.rounds):
randomized_col_idx = np.random.choice(
len(self.authorized_cols),
len(self.authorized_cols),
replace=False
)
randomized_col_idx = np.random.choice(len(self.authorized_cols),
len(self.authorized_cols),
replace=False)
col_idx = 0
for group in self.task_groups:
num_col_in_group = int(group['percentage'] * col_list_size)
rand_col_group_list = [
self.authorized_cols[i] for i in
randomized_col_idx[col_idx:col_idx + num_col_in_group]
self.authorized_cols[i]
for i in randomized_col_idx[col_idx:col_idx +
num_col_in_group]
]
self.task_group_collaborators[group['name']] = rand_col_group_list
self.task_group_collaborators[
group['name']] = rand_col_group_list
for col in rand_col_group_list:
self.collaborator_tasks[col][round_num] = group['tasks']
# Now populate reverse lookup of tasks->group
for task in group['tasks']:
# This should append the list of collaborators performing
# that task
self.collaborators_for_task[task][round_num] += rand_col_group_list
self.collaborators_for_task[task][
round_num] += rand_col_group_list
col_idx += num_col_in_group
assert (col_idx == col_list_size), 'Task groups were not divided properly'
assert (col_idx == col_list_size
), 'Task groups were not divided properly'

def get_tasks_for_collaborator(self, collaborator_name, round_number):
"""Get tasks for a specific collaborator in a specific round.
Expand Down
46 changes: 24 additions & 22 deletions openfl/component/assigner/static_grouped_assigner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Static grouped assigner module."""

from .assigner import Assigner
Expand All @@ -10,16 +9,19 @@ class StaticGroupedAssigner(Assigner):
r"""The task assigner maintains a list of tasks.
Also it decides the policy for which collaborator should run those tasks
There may be many types of policies implemented, but a natural place to start is with a:
There may be many types of policies implemented, but a natural place to
start is with a:
- StaticGroupedAssigner :
- StaticGroupedAssigner :
Given a set of task groups, and a list of
collaborators for that group, assign tasks for of collaborators in the federation.
After assigning the tasks to collaborator, those tasks should be carried
out each round (no reassignment between rounds).
- GroupedAssigner :
collaborators for that group, assign tasks for of collaborators in
the federation.
After assigning the tasks to collaborator, those tasks should be
carried out each round (no reassignment between rounds).
- GroupedAssigner :
Given task groups and a list of collaborators that
belong to that task group, carry out tasks for each round of experiment.
belong to that task group, carry out tasks for each round of
experiment.
Attributes:
task_groups* (list of object): Task groups to assign.
Expand All @@ -41,7 +43,8 @@ def __init__(self, task_groups, **kwargs):
def define_task_assignments(self):
"""Define task assignments for each round and collaborator.
This method uses the assigner function to assign tasks to collaborators for each round.
This method uses the assigner function to assign tasks to
collaborators for each round.
It also maps tasks to their respective aggregation functions.
Args:
Expand All @@ -50,9 +53,8 @@ def define_task_assignments(self):
Returns:
None
"""
cols_amount = sum([
len(group['collaborators']) for group in self.task_groups
])
cols_amount = sum(
[len(group['collaborators']) for group in self.task_groups])
authorized_cols_amount = len(self.authorized_cols)

unique_cols = {
Expand All @@ -62,22 +64,22 @@ def define_task_assignments(self):
}
unique_authorized_cols = set(self.authorized_cols)

assert (cols_amount == authorized_cols_amount and unique_cols == unique_authorized_cols), (
f'Collaborators in each group must be distinct: '
f'{unique_cols}, {unique_authorized_cols}'
)
assert (cols_amount == authorized_cols_amount
and unique_cols == unique_authorized_cols), (
f'Collaborators in each group must be distinct: '
f'{unique_cols}, {unique_authorized_cols}')

# Start by finding all of the tasks in all specified groups
self.all_tasks_in_groups = list({
task
for group in self.task_groups
for task in group['tasks']
})
self.all_tasks_in_groups = list(
{task
for group in self.task_groups
for task in group['tasks']})

# Initialize the map of collaborators for a given task on a given round
for task in self.all_tasks_in_groups:
self.collaborators_for_task[task] = {
i: [] for i in range(self.rounds)
i: []
for i in range(self.rounds)
}

for group in self.task_groups:
Expand Down
Loading

0 comments on commit 2fc449d

Please sign in to comment.