Skip to content

Commit

Permalink
Merge branch 'main' into make-https-default
Browse files Browse the repository at this point in the history
  • Loading branch information
panh99 authored Nov 16, 2023
2 parents 80760ed + c3cf430 commit a872d5f
Show file tree
Hide file tree
Showing 7 changed files with 558 additions and 4 deletions.
10 changes: 10 additions & 0 deletions doc/source/ref-api-flwr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ server.strategy.Krum

.. automethod:: __init__

.. _flwr-server-strategy-Bulyan-apiref:

server.strategy.Bulyan
^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: flwr.server.strategy.Bulyan
:members:

.. automethod:: __init__


.. _flwr-server-strategy-FedXgbNnAvg-apiref:

Expand Down
4 changes: 4 additions & 0 deletions doc/source/ref-changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@

Flower received many improvements under the hood, too many to list here.

- **Add new** `Bulyan` **strategy** ([#1817](https://github.com/adap/flower/pull/1817), [#1891](https://github.com/adap/flower/pull/1891))

The new `Bulyan` strategy implements Bulyan by [El Mhamdi et al., 2018](https://arxiv.org/abs/1802.07927)

### Incompatible changes

- **Remove support for Python 3.7** ([#2280](https://github.com/adap/flower/pull/2280), [#2299](https://github.com/adap/flower/pull/2299), [#2304](https://github.com/adap/flower/pull/2304), [#2306](https://github.com/adap/flower/pull/2306), [#2355](https://github.com/adap/flower/pull/2355), [#2356](https://github.com/adap/flower/pull/2356))
Expand Down
2 changes: 2 additions & 0 deletions src/py/flwr/server/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Contains the strategy abstraction and different implementations."""


from .bulyan import Bulyan as Bulyan
from .dpfedavg_adaptive import DPFedAvgAdaptive as DPFedAvgAdaptive
from .dpfedavg_fixed import DPFedAvgFixed as DPFedAvgFixed
from .fault_tolerant_fedavg import FaultTolerantFedAvg as FaultTolerantFedAvg
Expand Down Expand Up @@ -48,6 +49,7 @@
"FedMedian",
"FedTrimmedAvg",
"Krum",
"Bulyan",
"DPFedAvgAdaptive",
"DPFedAvgFixed",
"Strategy",
Expand Down
176 changes: 173 additions & 3 deletions src/py/flwr/server/strategy/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.
# ==============================================================================
"""Aggregation functions for strategy implementations."""

# mypy: disallow_untyped_calls=False

from functools import reduce
from typing import List, Tuple
from typing import Any, Callable, List, Tuple

import numpy as np

Expand Down Expand Up @@ -56,7 +56,7 @@ def aggregate_median(results: List[Tuple[NDArrays, int]]) -> NDArrays:
def aggregate_krum(
results: List[Tuple[NDArrays, int]], num_malicious: int, to_keep: int
) -> NDArrays:
"""Choose one parameter vector according to the Krum fucntion.
"""Choose one parameter vector according to the Krum function.
If to_keep is not None, then MultiKrum is applied.
"""
Expand Down Expand Up @@ -91,6 +91,89 @@ def aggregate_krum(
return weights[np.argmin(scores)]


# pylint: disable=too-many-locals
def aggregate_bulyan(
results: List[Tuple[NDArrays, int]],
num_malicious: int,
aggregation_rule: Callable, # type: ignore
**aggregation_rule_kwargs: Any,
) -> NDArrays:
"""Perform Bulyan aggregation.
Parameters
----------
results: List[Tuple[NDArrays, int]]
Weights and number of samples for each of the client.
num_malicious: int
The maximum number of malicious clients.
aggregation_rule: Callable
Byzantine resilient aggregation rule used as the first step of the Bulyan
aggregation_rule_kwargs: Any
The arguments to the aggregation rule.
Returns
-------
aggregated_parameters: NDArrays
Aggregated parameters according to the Bulyan strategy.
"""
byzantine_resilient_single_ret_model_aggregation = [aggregate_krum]
# also GeoMed (but not implemented yet)
byzantine_resilient_many_return_models_aggregation = [] # type: ignore
# Brute, Medoid (but not implemented yet)

num_clients = len(results)
if num_clients < 4 * num_malicious + 3:
raise ValueError(
"The Bulyan aggregation requires then number of clients to be greater or "
"equal to the 4 * num_malicious + 3. This is the assumption of this method."
"It is needed to ensure that the method reduces the attacker's leeway to "
"the one proved in the paper."
)
selected_models_set: List[Tuple[NDArrays, int]] = []

theta = len(results) - 2 * num_malicious
beta = theta - 2 * num_malicious

for _ in range(theta):
best_model = aggregation_rule(
results=results, num_malicious=num_malicious, **aggregation_rule_kwargs
)
list_of_weights = [weights for weights, num_samples in results]
# This group gives exact result
if aggregation_rule in byzantine_resilient_single_ret_model_aggregation:
best_idx = _find_reference_weights(best_model, list_of_weights)
# This group requires finding the closest model to the returned one
# (weights distance wise)
elif aggregation_rule in byzantine_resilient_many_return_models_aggregation:
# when different aggregation strategies available
# write a function to find the closest model
raise NotImplementedError(
"aggregate_bulyan currently does not support the aggregation rules that"
" return many models as results. "
"Such aggregation rules are currently not available in Flower."
)
else:
raise ValueError(
"The given aggregation rule is not added as Byzantine resilient. "
"Please choose from Byzantine resilient rules."
)

selected_models_set.append(results[best_idx])

# remove idx from tracker and weights_results
results.pop(best_idx)

# Compute median parameter vector across selected_models_set
median_vect = aggregate_median(selected_models_set)

# Take the averaged beta parameters of the closest distance to the median
# (coordinate-wise)
parameters_aggregated = _aggregate_n_closest_weights(
median_vect, selected_models_set, beta_closest=beta
)
return parameters_aggregated


def weighted_loss_avg(results: List[Tuple[int, float]]) -> float:
"""Aggregate evaluation results obtained from multiple clients."""
num_total_evaluation_examples = sum([num_examples for num_examples, _ in results])
Expand Down Expand Up @@ -168,3 +251,90 @@ def aggregate_trimmed_avg(
]

return trimmed_w


def _check_weights_equality(weights1: NDArrays, weights2: NDArrays) -> bool:
"""Check if weights are the same."""
if len(weights1) != len(weights2):
return False
return all(
np.array_equal(layer_weights1, layer_weights2)
for layer_weights1, layer_weights2 in zip(weights1, weights2)
)


def _find_reference_weights(
reference_weights: NDArrays, list_of_weights: List[NDArrays]
) -> int:
"""Find the reference weights by looping through the `list_of_weights`.
Raise Error if the reference weights is not found.
Parameters
----------
reference_weights: NDArrays
Weights that will be searched for.
list_of_weights: List[NDArrays]
List of weights that will be searched through.
Returns
-------
index: int
The index of `reference_weights` in the `list_of_weights`.
Raises
------
ValueError
If `reference_weights` is not found in `list_of_weights`.
"""
for idx, weights in enumerate(list_of_weights):
if _check_weights_equality(reference_weights, weights):
return idx
raise ValueError("The reference weights not found in list_of_weights.")


def _aggregate_n_closest_weights(
reference_weights: NDArrays, results: List[Tuple[NDArrays, int]], beta_closest: int
) -> NDArrays:
"""Calculate element-wise mean of the `N` closest values.
Note, each i-th coordinate of the result weight is the average of the beta_closest
-ith coordinates to the reference weights
Parameters
----------
reference_weights: NDArrays
The weights from which the distances will be computed
results: List[Tuple[NDArrays, int]]
The weights from models
beta_closest: int
The number of the closest distance weights that will be averaged
Returns
-------
aggregated_weights: NDArrays
Averaged (element-wise) beta weights that have the closest distance to
reference weights
"""
list_of_weights = [weights for weights, num_examples in results]
aggregated_weights = []

for layer_id, layer_weights in enumerate(reference_weights):
other_weights_layer_list = []
for other_w in list_of_weights:
other_weights_layer = other_w[layer_id]
other_weights_layer_list.append(other_weights_layer)
other_weights_layer_np = np.array(other_weights_layer_list)
diff_np = np.abs(layer_weights - other_weights_layer_np)
# Create indices of the smallest differences
# We do not need the exact order but just the beta closest weights
# therefore np.argpartition is used instead of np.argsort
indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0)
# Take the weights (coordinate-wise) corresponding to the beta of the
# closest distances
beta_closest_weights = np.take_along_axis(
other_weights_layer_np, indices=indices, axis=0
)[:beta_closest]
aggregated_weights.append(np.mean(beta_closest_weights, axis=0))
return aggregated_weights
77 changes: 76 additions & 1 deletion src/py/flwr/server/strategy/aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@

import numpy as np

from .aggregate import aggregate, weighted_loss_avg
from .aggregate import (
_aggregate_n_closest_weights,
_check_weights_equality,
_find_reference_weights,
aggregate,
weighted_loss_avg,
)


def test_aggregate() -> None:
Expand Down Expand Up @@ -64,3 +70,72 @@ def test_weighted_loss_avg_multiple_values() -> None:

# Assert
assert expected == actual


def test_check_weights_equality_true() -> None:
"""Check weights equality - the same weights."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
results = _check_weights_equality(weights1, weights2)
expected = True
assert expected == results


def test_check_weights_equality_numeric_false() -> None:
"""Check weights equality - different weights, same length."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([2, 2]), np.array([[1, 2], [3, 4]])]
results = _check_weights_equality(weights1, weights2)
expected = False
assert expected == results


def test_check_weights_equality_various_length_false() -> None:
"""Check weights equality - the same first layer weights, different length."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([1, 2])]
results = _check_weights_equality(weights1, weights2)
expected = False
assert expected == results


def test_find_reference_weights() -> None:
"""Check if the finding weights from list of weigths work."""
reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
list_of_weights = [
[np.array([2, 2]), np.array([[1, 2], [3, 4]])],
[np.array([3, 2]), np.array([[1, 2], [3, 4]])],
[np.array([3, 2]), np.array([[1, 2], [10, 4]])],
[np.array([1, 2]), np.array([[1, 2], [3, 4]])],
]

result = _find_reference_weights(reference_weights, list_of_weights)

expected = 3
assert result == expected


def test_aggregate_n_closest_weights_mean() -> None:
"""Check if aggregation of n closest weights to the reference works."""
beta_closest = 2
reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]

list_of_weights = [
[np.array([1, 2]), np.array([[1, 2], [3, 4]])],
[np.array([1.1, 2.1]), np.array([[1.1, 2.1], [3.1, 4.1]])],
[np.array([1.2, 2.2]), np.array([[1.2, 2.2], [3.2, 4.2]])],
[np.array([1.3, 2.3]), np.array([[0.9, 2.5], [3.4, 3.8]])],
]
list_of_weights_and_samples = [(weights, 0) for weights in list_of_weights]

beta_closest_weights = _aggregate_n_closest_weights(
reference_weights, list_of_weights_and_samples, beta_closest=beta_closest
)
expected_averaged = [np.array([1.05, 2.05]), np.array([[0.95, 2.05], [3.05, 4.05]])]

assert all(
(
np.array_equal(expected, result)
for expected, result in zip(expected_averaged, beta_closest_weights)
)
)
Loading

0 comments on commit a872d5f

Please sign in to comment.