From 643abc34672bf6eae3fc897b4a1b8ed0cab4998d Mon Sep 17 00:00:00 2001 From: Edoardo Gabrielli Date: Tue, 23 May 2023 12:47:37 +0200 Subject: [PATCH 01/36] Implement Bulyan (#1817) * Initial commit * Initial commit * Fix bugs * Add tests and fix bugs * Remove prints * Formatting --------- Co-authored-by: Daniel J. Beutel --- src/py/flwr/server/strategy/aggregate.py | 60 ++++++ src/py/flwr/server/strategy/bulyan.py | 162 ++++++++++++++ src/py/flwr/server/strategy/bulyan_test.py | 240 +++++++++++++++++++++ 3 files changed, 462 insertions(+) create mode 100644 src/py/flwr/server/strategy/bulyan.py create mode 100644 src/py/flwr/server/strategy/bulyan_test.py diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 1d14f26ed8c1..b3789fff766e 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -91,6 +91,66 @@ def aggregate_krum( return weights[np.argmin(scores)] +def aggregate_bulyan( + results: List[Tuple[NDArrays, int]], num_malicious: int +) -> NDArrays: + # S must be Dict[int, Tuple[NDArrays, int]] (selection set) + S = {} + + # List of idx to keep track of the order of clients + tracker = np.arange(len(results)) + + # Create a list of weights and ignore the number of examples + weights = [weights for weights, _ in results] + + theta = len(weights) - 2 * num_malicious + if theta <= 0: + theta = 1 + + beta = theta - 2 * num_malicious + if beta <= 0: + beta = 1 + + for _ in range(theta): + best_model = aggregate_krum(results, num_malicious, to_keep=0) + + best_idx = None + for idx, el in enumerate(results): + if list(el[0][0]) == list(best_model[0]): + best_idx = idx + break + + S[tracker[best_idx]] = results[best_idx] + + # remove idx from tracker and weights_results + tracker = np.delete(tracker, best_idx) + results.pop(best_idx) + + # Compute median parameter vector across S + median_vect = aggregate_median(S.values()) + + # Take the beta closest params to the median + distances = {} + for i in S.keys(): + dist = [ + np.abs(S[i][0][0][j] - median_vect[0][j]) for j in range(len(weights[0][0])) + ] + norm_sums = 0 + for k in dist: + norm_sums += np.linalg.norm(k) + distances[i] = norm_sums + + closest_idx = sorted(distances, key=distances.get)[:beta] + M = [S[i][0] for i in closest_idx] + + # Apply FevAvg on M + parameters_aggregated: NDArrays = [ + reduce(np.add, layers) / beta for layers in zip(*M) + ] + + return parameters_aggregated + + def weighted_loss_avg(results: List[Tuple[int, float]]) -> float: """Aggregate evaluation results obtained from multiple clients.""" num_total_evaluation_examples = sum([num_examples for num_examples, _ in results]) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py new file mode 100644 index 000000000000..3a8271dd2c03 --- /dev/null +++ b/src/py/flwr/server/strategy/bulyan.py @@ -0,0 +1,162 @@ +# Copyright 2020 Adap GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Bulyan [El Mhamdi et al., 2018] strategy. + +Paper: https://arxiv.org/pdf/1802.07927.pdf +""" + + +from logging import WARNING +from typing import Callable, Dict, List, Optional, Tuple, Union + +from flwr.common import ( + FitRes, + MetricsAggregationFn, + NDArrays, + Parameters, + Scalar, + ndarrays_to_parameters, + parameters_to_ndarrays, +) +from flwr.common.logger import log +from flwr.server.client_proxy import ClientProxy + +from .aggregate import aggregate_bulyan +from .fedavg import FedAvg + +WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW = """ +Setting `min_available_clients` lower than `min_fit_clients` or +`min_evaluate_clients` can cause the server to fail when there are too few clients +connected to the server. `min_available_clients` must be set to a value larger +than or equal to the values of `min_fit_clients` and `min_evaluate_clients`. +""" + + +# flake8: noqa: E501 +class Bulyan(FedAvg): + """Configurable Bulyan strategy implementation.""" + + # pylint: disable=too-many-arguments,too-many-instance-attributes,line-too-long + def __init__( + self, + *, + fraction_fit: float = 1.0, + fraction_evaluate: float = 1.0, + min_fit_clients: int = 2, + min_evaluate_clients: int = 2, + min_available_clients: int = 2, + num_malicious_clients: int = 0, + evaluate_fn: Optional[ + Callable[ + [int, NDArrays, Dict[str, Scalar]], + Optional[Tuple[float, Dict[str, Scalar]]], + ] + ] = None, + on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None, + on_evaluate_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None, + accept_failures: bool = True, + initial_parameters: Optional[Parameters] = None, + fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, + evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, + ) -> None: + """Configurable FedMedian strategy. + + Implementation based on https://arxiv.org/pdf/1803.01498v1.pdf + + Parameters + ---------- + fraction_fit : float, optional + Fraction of clients used during training. Defaults to 0.1. + fraction_evaluate : float, optional + Fraction of clients used during validation. Defaults to 0.1. + min_fit_clients : int, optional + Minimum number of clients used during training. Defaults to 2. + min_evaluate_clients : int, optional + Minimum number of clients used during validation. Defaults to 2. + min_available_clients : int, optional + Minimum number of total clients in the system. Defaults to 2. + num_malicious_clients : int, optional + Number of malicious clients in the system. Defaults to 0. + evaluate_fn : Optional[Callable[[int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]]]] + Optional function used for validation. Defaults to None. + on_fit_config_fn : Callable[[int], Dict[str, Scalar]], optional + Function used to configure training. Defaults to None. + on_evaluate_config_fn : Callable[[int], Dict[str, Scalar]], optional + Function used to configure validation. Defaults to None. + accept_failures : bool, optional + Whether or not accept rounds containing failures. Defaults to True. + initial_parameters : Parameters, optional + Initial global model parameters. + """ + + if ( + min_fit_clients > min_available_clients + or min_evaluate_clients > min_available_clients + ): + log(WARNING, WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW) + + super().__init__( + fraction_fit=fraction_fit, + fraction_evaluate=fraction_evaluate, + min_fit_clients=min_fit_clients, + min_evaluate_clients=min_evaluate_clients, + min_available_clients=min_available_clients, + evaluate_fn=evaluate_fn, + on_fit_config_fn=on_fit_config_fn, + on_evaluate_config_fn=on_evaluate_config_fn, + accept_failures=accept_failures, + initial_parameters=initial_parameters, + fit_metrics_aggregation_fn=fit_metrics_aggregation_fn, + evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn, + ) + self.num_malicious_clients = num_malicious_clients + + def __repr__(self) -> str: + rep = f"Bulyan(accept_failures={self.accept_failures})" + return rep + + def aggregate_fit( + self, + server_round: int, + results: List[Tuple[ClientProxy, FitRes]], + failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]], + ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]: + """Aggregate fit results using Bulyan.""" + if not results: + return None, {} + # Do not aggregate if there are failures and failures are not accepted + if not self.accept_failures and failures: + return None, {} + + # Convert results + weights_results = [ + (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples) + for _, fit_res in results + ] + + # Aggregate weights + parameters_aggregated = ndarrays_to_parameters( + aggregate_bulyan(weights_results, self.num_malicious_clients) + ) + + # Aggregate custom metrics if aggregation fn was provided + metrics_aggregated = {} + if self.fit_metrics_aggregation_fn: + fit_metrics = [(res.num_examples, res.metrics) for _, res in results] + metrics_aggregated = self.fit_metrics_aggregation_fn(fit_metrics) + elif server_round == 1: # Only log this warning once + log(WARNING, "No fit_metrics_aggregation_fn provided") + + return parameters_aggregated, metrics_aggregated diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py new file mode 100644 index 000000000000..147081a00782 --- /dev/null +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -0,0 +1,240 @@ +# Copyright 2020 Adap GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Bulyan tests.""" + + +from typing import List, Tuple +from unittest.mock import MagicMock + +from numpy import array, float32 + +from flwr.common import ( + Code, + FitRes, + NDArrays, + Parameters, + Status, + ndarrays_to_parameters, + parameters_to_ndarrays, +) +from flwr.server.client_proxy import ClientProxy +from flwr.server.fleet.grpc_bidi.grpc_client_proxy import GrpcClientProxy + +from .bulyan import Bulyan + + +def test_krum_num_fit_clients_20_available() -> None: + """Test num_fit_clients function.""" + # Prepare + strategy = Bulyan() + expected = 20 + + # Execute + actual, _ = strategy.num_fit_clients(num_available_clients=20) + + # Assert + assert expected == actual + + +def test_krum_num_fit_clients_19_available() -> None: + """Test num_fit_clients function.""" + # Prepare + strategy = Bulyan() + expected = 19 + + # Execute + actual, _ = strategy.num_fit_clients(num_available_clients=19) + + # Assert + assert expected == actual + + +def test_krum_num_fit_clients_10_available() -> None: + """Test num_fit_clients function.""" + # Prepare + strategy = Bulyan() + expected = 10 + + # Execute + actual, _ = strategy.num_fit_clients(num_available_clients=10) + + # Assert + assert expected == actual + + +def test_krum_num_fit_clients_minimum() -> None: + """Test num_fit_clients function.""" + # Prepare + strategy = Bulyan() + expected = 9 + + # Execute + actual, _ = strategy.num_fit_clients(num_available_clients=9) + + # Assert + assert expected == actual + + +def test_krum_num_evaluation_clients_40_available() -> None: + """Test num_evaluation_clients function.""" + # Prepare + strategy = Bulyan(fraction_evaluate=0.05) + expected = 2 + + # Execute + actual, _ = strategy.num_evaluation_clients(num_available_clients=40) + + # Assert + assert expected == actual + + +def test_krum_num_evaluation_clients_39_available() -> None: + """Test num_evaluation_clients function.""" + # Prepare + strategy = Bulyan(fraction_evaluate=0.05) + expected = 2 + + # Execute + actual, _ = strategy.num_evaluation_clients(num_available_clients=39) + + # Assert + assert expected == actual + + +def test_krum_num_evaluation_clients_20_available() -> None: + """Test num_evaluation_clients function.""" + # Prepare + strategy = Bulyan(fraction_evaluate=0.05) + expected = 2 + + # Execute + actual, _ = strategy.num_evaluation_clients(num_available_clients=20) + + # Assert + assert expected == actual + + +def test_krum_num_evaluation_clients_minimum() -> None: + """Test num_evaluation_clients function.""" + # Prepare + strategy = Bulyan(fraction_evaluate=0.05) + expected = 2 + + # Execute + actual, _ = strategy.num_evaluation_clients(num_available_clients=19) + + # Assert + assert expected == actual + + +def test_aggregate_fit() -> None: + """Tests if Bulyan is aggregating correctly.""" + # Prepare + previous_weights: NDArrays = [array([0.1, 0.1, 0.1, 0.1], dtype=float32)] + strategy = Bulyan( + initial_parameters=ndarrays_to_parameters(previous_weights), + num_malicious_clients=1, + ) + param_0: Parameters = ndarrays_to_parameters( + [array([0.2, 0.2, 0.2, 0.2], dtype=float32)] + ) + param_1: Parameters = ndarrays_to_parameters( + [array([0.5, 0.5, 0.5, 0.5], dtype=float32)] + ) + param_2: Parameters = ndarrays_to_parameters( + [array([0.7, 0.7, 0.7, 0.7], dtype=float32)] + ) + param_3: Parameters = ndarrays_to_parameters( + [array([12.0, 12.0, 12.0, 12.0], dtype=float32)] + ) + param_4: Parameters = ndarrays_to_parameters( + [array([0.1, 0.1, 0.1, 0.1], dtype=float32)] + ) + param_5: Parameters = ndarrays_to_parameters( + [array([0.1, 0.1, 0.1, 0.1], dtype=float32)] + ) + bridge = MagicMock() + client_0 = GrpcClientProxy(cid="0", bridge=bridge) + client_1 = GrpcClientProxy(cid="1", bridge=bridge) + client_2 = GrpcClientProxy(cid="2", bridge=bridge) + client_3 = GrpcClientProxy(cid="3", bridge=bridge) + client_4 = GrpcClientProxy(cid="4", bridge=bridge) + client_5 = GrpcClientProxy(cid="5", bridge=bridge) + results: List[Tuple[ClientProxy, FitRes]] = [ + ( + client_0, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_0, + num_examples=5, + metrics={}, + ), + ), + ( + client_1, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_1, + num_examples=5, + metrics={}, + ), + ), + ( + client_2, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_2, + num_examples=5, + metrics={}, + ), + ), + ( + client_3, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_3, + num_examples=5, + metrics={}, + ), + ), + ( + client_4, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_4, + num_examples=5, + metrics={}, + ), + ), + ( + client_5, + FitRes( + status=Status(code=Code.OK, message="Success"), + parameters=param_5, + num_examples=5, + metrics={}, + ), + ), + ] + expected: NDArrays = [array([0.35, 0.35, 0.35, 0.35], dtype=float32)] + + # Execute + actual_aggregated, _ = strategy.aggregate_fit( + server_round=1, results=results, failures=[] + ) + if actual_aggregated: + actual_list = parameters_to_ndarrays(actual_aggregated) + actual = actual_list[0] + assert (actual == expected[0]).all() From a9390ae31d5e67a5e081cb8f304d0563e18d1b1c Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:18:57 +0200 Subject: [PATCH 02/36] Fix tests --- src/py/flwr/server/strategy/aggregate.py | 32 ++++++++++++---------- src/py/flwr/server/strategy/bulyan_test.py | 1 + 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index b3789fff766e..b2a0bc4743b4 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -16,7 +16,7 @@ from functools import reduce -from typing import List, Tuple +from typing import Dict, List, Tuple import numpy as np @@ -56,7 +56,7 @@ def aggregate_median(results: List[Tuple[NDArrays, int]]) -> NDArrays: def aggregate_krum( results: List[Tuple[NDArrays, int]], num_malicious: int, to_keep: int ) -> NDArrays: - """Choose one parameter vector according to the Krum fucntion. + """Choose one parameter vector according to the Krum function. If to_keep is not None, then MultiKrum is applied. """ @@ -91,11 +91,12 @@ def aggregate_krum( return weights[np.argmin(scores)] +# pylint: disable=too-many-locals def aggregate_bulyan( results: List[Tuple[NDArrays, int]], num_malicious: int ) -> NDArrays: - # S must be Dict[int, Tuple[NDArrays, int]] (selection set) - S = {} + """Perform Bulyan aggregation.""" + selected_models_set: Dict[int, Tuple[NDArrays, int]] = {} # List of idx to keep track of the order of clients tracker = np.arange(len(results)) @@ -115,37 +116,38 @@ def aggregate_bulyan( best_model = aggregate_krum(results, num_malicious, to_keep=0) best_idx = None - for idx, el in enumerate(results): - if list(el[0][0]) == list(best_model[0]): + for idx, result in enumerate(results): + if list(result[0][0]) == list(best_model[0]): best_idx = idx break - S[tracker[best_idx]] = results[best_idx] + selected_models_set[tracker[best_idx]] = results[best_idx] # remove idx from tracker and weights_results tracker = np.delete(tracker, best_idx) results.pop(best_idx) - # Compute median parameter vector across S - median_vect = aggregate_median(S.values()) + # Compute median parameter vector across selected_models_set + median_vect = aggregate_median(list(selected_models_set.values())) # Take the beta closest params to the median distances = {} - for i in S.keys(): + for idx, result in selected_models_set.items(): dist = [ - np.abs(S[i][0][0][j] - median_vect[0][j]) for j in range(len(weights[0][0])) + np.abs(result[0][0][j] - median_vect[0][j]) + for j in range(len(weights[0][0])) ] norm_sums = 0 for k in dist: norm_sums += np.linalg.norm(k) - distances[i] = norm_sums + distances[idx] = norm_sums closest_idx = sorted(distances, key=distances.get)[:beta] - M = [S[i][0] for i in closest_idx] + closest_models_to_median = [selected_models_set[i][0] for i in closest_idx] - # Apply FevAvg on M + # Apply FevAvg on closest_models_to_median parameters_aggregated: NDArrays = [ - reduce(np.add, layers) / beta for layers in zip(*M) + reduce(np.add, layers) / beta for layers in zip(*closest_models_to_median) ] return parameters_aggregated diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index 147081a00782..e54d93a0b37b 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -139,6 +139,7 @@ def test_krum_num_evaluation_clients_minimum() -> None: assert expected == actual +# pylint: disable=too-many-locals def test_aggregate_fit() -> None: """Tests if Bulyan is aggregating correctly.""" # Prepare From 21b4829112c8965dac20bf27b1f8b558d44eb5ab Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:29:00 +0200 Subject: [PATCH 03/36] Remove redundant warning --- src/py/flwr/server/strategy/bulyan.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 3a8271dd2c03..49694a0bb40b 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -36,13 +36,6 @@ from .aggregate import aggregate_bulyan from .fedavg import FedAvg -WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW = """ -Setting `min_available_clients` lower than `min_fit_clients` or -`min_evaluate_clients` can cause the server to fail when there are too few clients -connected to the server. `min_available_clients` must be set to a value larger -than or equal to the values of `min_fit_clients` and `min_evaluate_clients`. -""" - # flake8: noqa: E501 class Bulyan(FedAvg): @@ -100,13 +93,6 @@ def __init__( initial_parameters : Parameters, optional Initial global model parameters. """ - - if ( - min_fit_clients > min_available_clients - or min_evaluate_clients > min_available_clients - ): - log(WARNING, WARNING_MIN_AVAILABLE_CLIENTS_TOO_LOW) - super().__init__( fraction_fit=fraction_fit, fraction_evaluate=fraction_evaluate, From 1ef78a0db42f54ea0a33087c059a0b43e5bfc509 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:29:34 +0200 Subject: [PATCH 04/36] Reference the arxiv page not the pdf directly --- src/py/flwr/server/strategy/bulyan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 49694a0bb40b..33cf9d64e9d6 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -14,7 +14,7 @@ # ============================================================================== """Bulyan [El Mhamdi et al., 2018] strategy. -Paper: https://arxiv.org/pdf/1802.07927.pdf +Paper: https://arxiv.org/abs/1802.07927 """ From 0cdd39b79146809f1bdb350491768e3633dbded4 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:38:26 +0200 Subject: [PATCH 05/36] Add Bulyan strategy to changelog --- doc/source/changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/source/changelog.md b/doc/source/changelog.md index e279a0f42882..b10684427cf7 100644 --- a/doc/source/changelog.md +++ b/doc/source/changelog.md @@ -4,6 +4,10 @@ ### What's new? +- **Add new** `Bulyan` **strategy** ([#1817](https://github.com/adap/flower/pull/1817), [#1891](https://github.com/adap/flower/pull/1891)) + + The new `Bulyan` strategy implements Bulyan by [El Mhamdi et al., 2018](https://arxiv.org/abs/1802.07927) + - **Add new** `FedTrimmedAvg` **strategy** ([#1769](https://github.com/adap/flower/pull/1769), [#1853](https://github.com/adap/flower/pull/1853)) The new `FedTrimmedAvg` strategy implements Trimmed Mean by [Dong Yin, 2018](https://arxiv.org/abs/1803.01498) From 23dcf0a229ee83eacef4815e6dd7649d5ac064dd Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:45:49 +0200 Subject: [PATCH 06/36] Fix mypy tests --- src/py/flwr/server/strategy/aggregate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index b2a0bc4743b4..f89daca01be9 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -120,11 +120,13 @@ def aggregate_bulyan( if list(result[0][0]) == list(best_model[0]): best_idx = idx break + if best_idx is None: + raise ValueError("No matching model found in the results.") selected_models_set[tracker[best_idx]] = results[best_idx] # remove idx from tracker and weights_results - tracker = np.delete(tracker, best_idx) + tracker: NDArray = np.delete(tracker, best_idx) results.pop(best_idx) # Compute median parameter vector across selected_models_set From 3e5d3f9cd90da1e71a28fe9dfd33368ac7d5160d Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:59:00 +0200 Subject: [PATCH 07/36] Fix mypy tests --- src/py/flwr/server/strategy/aggregate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index f89daca01be9..4e254df58f2d 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -99,7 +99,7 @@ def aggregate_bulyan( selected_models_set: Dict[int, Tuple[NDArrays, int]] = {} # List of idx to keep track of the order of clients - tracker = np.arange(len(results)) + tracker: NDArray = np.arange(len(results)) # Create a list of weights and ignore the number of examples weights = [weights for weights, _ in results] @@ -126,7 +126,7 @@ def aggregate_bulyan( selected_models_set[tracker[best_idx]] = results[best_idx] # remove idx from tracker and weights_results - tracker: NDArray = np.delete(tracker, best_idx) + tracker = np.delete(tracker, best_idx) # type: ignore results.pop(best_idx) # Compute median parameter vector across selected_models_set @@ -141,10 +141,10 @@ def aggregate_bulyan( ] norm_sums = 0 for k in dist: - norm_sums += np.linalg.norm(k) + norm_sums += np.linalg.norm(k) # type: ignore distances[idx] = norm_sums - closest_idx = sorted(distances, key=distances.get)[:beta] + closest_idx = sorted(distances, key= lambda idx: distances[idx])[:beta] closest_models_to_median = [selected_models_set[i][0] for i in closest_idx] # Apply FevAvg on closest_models_to_median From 4aaddfbe19c5ae9964e63ef883c7a4541e1575a2 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 13:59:21 +0200 Subject: [PATCH 08/36] Fix formatting --- src/py/flwr/server/strategy/aggregate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 4e254df58f2d..5e24838dbd11 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -126,7 +126,7 @@ def aggregate_bulyan( selected_models_set[tracker[best_idx]] = results[best_idx] # remove idx from tracker and weights_results - tracker = np.delete(tracker, best_idx) # type: ignore + tracker = np.delete(tracker, best_idx) # type: ignore results.pop(best_idx) # Compute median parameter vector across selected_models_set @@ -141,10 +141,10 @@ def aggregate_bulyan( ] norm_sums = 0 for k in dist: - norm_sums += np.linalg.norm(k) # type: ignore + norm_sums += np.linalg.norm(k) # type: ignore distances[idx] = norm_sums - closest_idx = sorted(distances, key= lambda idx: distances[idx])[:beta] + closest_idx = sorted(distances, key=lambda idx: distances[idx])[:beta] closest_models_to_median = [selected_models_set[i][0] for i in closest_idx] # Apply FevAvg on closest_models_to_median From f41abd707e2c81d77f4597a7a7058dca6ee46b85 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 23 May 2023 14:26:47 +0200 Subject: [PATCH 09/36] Fix spacing in changelog --- doc/source/changelog.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/changelog.md b/doc/source/changelog.md index b10684427cf7..0c88a237a555 100644 --- a/doc/source/changelog.md +++ b/doc/source/changelog.md @@ -4,11 +4,11 @@ ### What's new? -- **Add new** `Bulyan` **strategy** ([#1817](https://github.com/adap/flower/pull/1817), [#1891](https://github.com/adap/flower/pull/1891)) +- **Add new** `Bulyan` **strategy** ([#1817](https://github.com/adap/flower/pull/1817), [#1891](https://github.com/adap/flower/pull/1891)) The new `Bulyan` strategy implements Bulyan by [El Mhamdi et al., 2018](https://arxiv.org/abs/1802.07927) -- **Add new** `FedTrimmedAvg` **strategy** ([#1769](https://github.com/adap/flower/pull/1769), [#1853](https://github.com/adap/flower/pull/1853)) +- **Add new** `FedTrimmedAvg` **strategy** ([#1769](https://github.com/adap/flower/pull/1769), [#1853](https://github.com/adap/flower/pull/1853)) The new `FedTrimmedAvg` strategy implements Trimmed Mean by [Dong Yin, 2018](https://arxiv.org/abs/1803.01498) From d121e822008d3fbb44e5c3c7cf501b48480229c6 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Thu, 25 May 2023 10:14:32 +0200 Subject: [PATCH 10/36] Remove redundant test, fix mocking --- src/py/flwr/server/strategy/bulyan_test.py | 33 ++++------------------ 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index e54d93a0b37b..c2a2c5ab6c8b 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -30,24 +30,10 @@ parameters_to_ndarrays, ) from flwr.server.client_proxy import ClientProxy -from flwr.server.fleet.grpc_bidi.grpc_client_proxy import GrpcClientProxy from .bulyan import Bulyan -def test_krum_num_fit_clients_20_available() -> None: - """Test num_fit_clients function.""" - # Prepare - strategy = Bulyan() - expected = 20 - - # Execute - actual, _ = strategy.num_fit_clients(num_available_clients=20) - - # Assert - assert expected == actual - - def test_krum_num_fit_clients_19_available() -> None: """Test num_fit_clients function.""" # Prepare @@ -166,16 +152,9 @@ def test_aggregate_fit() -> None: param_5: Parameters = ndarrays_to_parameters( [array([0.1, 0.1, 0.1, 0.1], dtype=float32)] ) - bridge = MagicMock() - client_0 = GrpcClientProxy(cid="0", bridge=bridge) - client_1 = GrpcClientProxy(cid="1", bridge=bridge) - client_2 = GrpcClientProxy(cid="2", bridge=bridge) - client_3 = GrpcClientProxy(cid="3", bridge=bridge) - client_4 = GrpcClientProxy(cid="4", bridge=bridge) - client_5 = GrpcClientProxy(cid="5", bridge=bridge) results: List[Tuple[ClientProxy, FitRes]] = [ ( - client_0, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_0, @@ -184,7 +163,7 @@ def test_aggregate_fit() -> None: ), ), ( - client_1, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_1, @@ -193,7 +172,7 @@ def test_aggregate_fit() -> None: ), ), ( - client_2, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_2, @@ -202,7 +181,7 @@ def test_aggregate_fit() -> None: ), ), ( - client_3, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_3, @@ -211,7 +190,7 @@ def test_aggregate_fit() -> None: ), ), ( - client_4, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_4, @@ -220,7 +199,7 @@ def test_aggregate_fit() -> None: ), ), ( - client_5, + MagicMock(), FitRes( status=Status(code=Code.OK, message="Success"), parameters=param_5, From 3eb4c0a5eb692027f78213fb7f37dfc5a5465ea6 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Thu, 25 May 2023 10:47:27 +0200 Subject: [PATCH 11/36] Remove redundant tests --- src/py/flwr/server/strategy/bulyan_test.py | 91 ---------------------- 1 file changed, 91 deletions(-) diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index c2a2c5ab6c8b..b07c859d5d42 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -34,97 +34,6 @@ from .bulyan import Bulyan -def test_krum_num_fit_clients_19_available() -> None: - """Test num_fit_clients function.""" - # Prepare - strategy = Bulyan() - expected = 19 - - # Execute - actual, _ = strategy.num_fit_clients(num_available_clients=19) - - # Assert - assert expected == actual - - -def test_krum_num_fit_clients_10_available() -> None: - """Test num_fit_clients function.""" - # Prepare - strategy = Bulyan() - expected = 10 - - # Execute - actual, _ = strategy.num_fit_clients(num_available_clients=10) - - # Assert - assert expected == actual - - -def test_krum_num_fit_clients_minimum() -> None: - """Test num_fit_clients function.""" - # Prepare - strategy = Bulyan() - expected = 9 - - # Execute - actual, _ = strategy.num_fit_clients(num_available_clients=9) - - # Assert - assert expected == actual - - -def test_krum_num_evaluation_clients_40_available() -> None: - """Test num_evaluation_clients function.""" - # Prepare - strategy = Bulyan(fraction_evaluate=0.05) - expected = 2 - - # Execute - actual, _ = strategy.num_evaluation_clients(num_available_clients=40) - - # Assert - assert expected == actual - - -def test_krum_num_evaluation_clients_39_available() -> None: - """Test num_evaluation_clients function.""" - # Prepare - strategy = Bulyan(fraction_evaluate=0.05) - expected = 2 - - # Execute - actual, _ = strategy.num_evaluation_clients(num_available_clients=39) - - # Assert - assert expected == actual - - -def test_krum_num_evaluation_clients_20_available() -> None: - """Test num_evaluation_clients function.""" - # Prepare - strategy = Bulyan(fraction_evaluate=0.05) - expected = 2 - - # Execute - actual, _ = strategy.num_evaluation_clients(num_available_clients=20) - - # Assert - assert expected == actual - - -def test_krum_num_evaluation_clients_minimum() -> None: - """Test num_evaluation_clients function.""" - # Prepare - strategy = Bulyan(fraction_evaluate=0.05) - expected = 2 - - # Execute - actual, _ = strategy.num_evaluation_clients(num_available_clients=19) - - # Assert - assert expected == actual - - # pylint: disable=too-many-locals def test_aggregate_fit() -> None: """Tests if Bulyan is aggregating correctly.""" From db8d167c244da587c7caa17abf4a0bcf1f0dee3d Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 11:19:53 +0200 Subject: [PATCH 12/36] Fix weights comparison for determining Byzantine-resilient model --- src/py/flwr/server/strategy/aggregate.py | 47 ++++++++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 5e24838dbd11..683edacb1b74 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -114,14 +114,8 @@ def aggregate_bulyan( for _ in range(theta): best_model = aggregate_krum(results, num_malicious, to_keep=0) - - best_idx = None - for idx, result in enumerate(results): - if list(result[0][0]) == list(best_model[0]): - best_idx = idx - break - if best_idx is None: - raise ValueError("No matching model found in the results.") + list_of_weights = [weights for weights, num_samples in results] + best_idx = _find_reference_weights(best_model, list_of_weights) selected_models_set[tracker[best_idx]] = results[best_idx] @@ -234,3 +228,40 @@ def aggregate_trimmed_avg( ] return trimmed_w + + +def _check_weights_equality(weights1: NDArrays, weights2: NDArrays) -> bool: + """Check if weights are the same.""" + return all( + np.array_equal(layer_weights1, layer_weights2) + for layer_weights1, layer_weights2 in zip(weights1, weights2) + ) + + +def _find_reference_weights( + reference_weights: NDArrays, list_of_weights: List[NDArrays] +) -> int: + """Loop through the `list_of_weights` to find the reference weights, raise + Error if not found. + + Parameters + ---------- + reference_weights: NDArrays + Weights that will be searched for. + list_of_weights: List[NDArrays] + List of weights that will be searched through. + + Returns + ------- + index: int + The index of `reference_weights` in the `list_of_weights`. + + Raises + ------ + ValueError + If `reference_weights` is not found in `list_of_weights`. + """ + for idx, weights in enumerate(list_of_weights): + if _check_weights_equality(reference_weights, weights): + return idx + raise ValueError("The reference weights not found in list_of_weights.") From fe54b0f6f58c4875a92c2ec8b1a990eaec74a7b9 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 11:26:49 +0200 Subject: [PATCH 13/36] Add length check in _check_weights_equality --- src/py/flwr/server/strategy/aggregate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 683edacb1b74..ca2d85570a84 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -232,6 +232,8 @@ def aggregate_trimmed_avg( def _check_weights_equality(weights1: NDArrays, weights2: NDArrays) -> bool: """Check if weights are the same.""" + if len(weights1) != len(weights2): + return False return all( np.array_equal(layer_weights1, layer_weights2) for layer_weights1, layer_weights2 in zip(weights1, weights2) From 0732031eb489d66bf7002ed637cd30ace734db9a Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 11:27:12 +0200 Subject: [PATCH 14/36] Add tests for _check_weights_equality correctness --- src/py/flwr/server/strategy/aggregate_test.py | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/aggregate_test.py b/src/py/flwr/server/strategy/aggregate_test.py index 633ec56632ed..23868ffe6a34 100644 --- a/src/py/flwr/server/strategy/aggregate_test.py +++ b/src/py/flwr/server/strategy/aggregate_test.py @@ -19,7 +19,7 @@ import numpy as np -from .aggregate import aggregate, weighted_loss_avg +from .aggregate import _check_weights_equality, aggregate, weighted_loss_avg def test_aggregate() -> None: @@ -65,3 +65,27 @@ def test_weighted_loss_avg_multiple_values() -> None: # Assert assert expected == actual + + +def test_check_weights_equality_true() -> None: + w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + w2 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + results = _check_weights_equality(w1, w2) + expected = True + assert expected == results + + +def test_check_weights_equality_numeric_false() -> None: + w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + w2 = [np.array([2, 2]), np.array([[1, 2], [3, 4]])] + results = _check_weights_equality(w1, w2) + expected = False + assert expected == results + + +def test_check_weights_equality_various_legnth_false() -> None: + w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + w2 = [np.array([1, 2])] + results = _check_weights_equality(w1, w2) + expected = False + assert expected == results From 5c420d3677f6e921a07e7e7937e353b82d943c9e Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 11:30:18 +0200 Subject: [PATCH 15/36] Add tests for _find_reference_weights --- src/py/flwr/server/strategy/aggregate_test.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate_test.py b/src/py/flwr/server/strategy/aggregate_test.py index 23868ffe6a34..e7d8549eb98c 100644 --- a/src/py/flwr/server/strategy/aggregate_test.py +++ b/src/py/flwr/server/strategy/aggregate_test.py @@ -19,7 +19,12 @@ import numpy as np -from .aggregate import _check_weights_equality, aggregate, weighted_loss_avg +from .aggregate import ( + _check_weights_equality, + _find_reference_weights, + aggregate, + weighted_loss_avg, +) def test_aggregate() -> None: @@ -83,9 +88,24 @@ def test_check_weights_equality_numeric_false() -> None: assert expected == results -def test_check_weights_equality_various_legnth_false() -> None: +def test_check_weights_equality_various_length_false() -> None: w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] w2 = [np.array([1, 2])] results = _check_weights_equality(w1, w2) expected = False assert expected == results + + +def test_find_reference_weights() -> None: + reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + list_of_weights = [ + [np.array([2, 2]), np.array([[1, 2], [3, 4]])], + [np.array([3, 2]), np.array([[1, 2], [3, 4]])], + [np.array([3, 2]), np.array([[1, 2], [10, 4]])], + [np.array([1, 2]), np.array([[1, 2], [3, 4]])], + ] + + result = _find_reference_weights(reference_weights, list_of_weights) + + expected = 3 + assert result == expected From 4eaccd17cf8ada434e540d74a4cc5544a2a28e73 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 14:06:12 +0200 Subject: [PATCH 16/36] Add correct beta closest averaging --- src/py/flwr/server/strategy/aggregate.py | 69 +++++++++++++++++------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index ca2d85570a84..c915adcc3062 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -97,6 +97,7 @@ def aggregate_bulyan( ) -> NDArrays: """Perform Bulyan aggregation.""" selected_models_set: Dict[int, Tuple[NDArrays, int]] = {} + # selected_models_set: List[Tuple[NDArrays, int]] = [] # List of idx to keep track of the order of clients tracker: NDArray = np.arange(len(results)) @@ -126,26 +127,10 @@ def aggregate_bulyan( # Compute median parameter vector across selected_models_set median_vect = aggregate_median(list(selected_models_set.values())) - # Take the beta closest params to the median - distances = {} - for idx, result in selected_models_set.items(): - dist = [ - np.abs(result[0][0][j] - median_vect[0][j]) - for j in range(len(weights[0][0])) - ] - norm_sums = 0 - for k in dist: - norm_sums += np.linalg.norm(k) # type: ignore - distances[idx] = norm_sums - - closest_idx = sorted(distances, key=lambda idx: distances[idx])[:beta] - closest_models_to_median = [selected_models_set[i][0] for i in closest_idx] - - # Apply FevAvg on closest_models_to_median - parameters_aggregated: NDArrays = [ - reduce(np.add, layers) / beta for layers in zip(*closest_models_to_median) - ] - + # Take the averaged beta parameters of the closest distance to the median (coordinate-wise) + parameters_aggregated = _aggregate_n_closest_weights( + median_vect, list(selected_models_set.values()), beta_closest=beta + ) return parameters_aggregated @@ -267,3 +252,47 @@ def _find_reference_weights( if _check_weights_equality(reference_weights, weights): return idx raise ValueError("The reference weights not found in list_of_weights.") + + +def _aggregate_n_closest_weights( + reference_weights: NDArrays, results: List[Tuple[NDArrays, int]], beta_closest: int +) -> NDArrays: + """It calculates element-wise mean of the `N` the closest values. + + Note, each i-th coordinate of the result weight is the average of the beta_closest -ith + coordinates to the reference weights + + + Parameters + ---------- + reference_weights: NDArrays + The weights from which the distances will be computed + results: List[Tuple[NDArrays, int]] + The weights from models + beta_closest: int + The number of the closest distance weights that will be averaged + Returns + ------- + aggregated_weights: NDArrays + Averaged (element-wise) beta weights that have the closest distance to reference weights + """ + list_of_weights = [weights for weights, num_examples in results] + aggregated_weights = [] + + for layer_id, layer_weights in enumerate(reference_weights): + other_weights_layer_list = [] + for other_w in list_of_weights: + other_weights_layer = other_w[layer_id] + other_weights_layer_list.append(other_weights_layer) + other_weights_layer_np = np.array(other_weights_layer_list) + diff_np = np.abs(layer_weights - other_weights_layer_np) + # Create indices of the smallest differences + # We do not need the exact order but just the beta closest weights + # therefore np.argpartition is used instead of np.argsort + indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0) + # Take the weights (coordinate-wise) corresponding to the beta of the closest distances + beta_closest_weights = np.take_along_axis( + other_weights_layer_np, indices=indices, axis=0 + )[:beta_closest] + aggregated_weights.append(np.mean(beta_closest_weights, axis=0)) + return aggregated_weights From 38c63067c0d768908534dc91d8f8f71bd060a05e Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 14:06:24 +0200 Subject: [PATCH 17/36] Add test for beta closest averaging --- src/py/flwr/server/strategy/aggregate_test.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/py/flwr/server/strategy/aggregate_test.py b/src/py/flwr/server/strategy/aggregate_test.py index e7d8549eb98c..052df354330d 100644 --- a/src/py/flwr/server/strategy/aggregate_test.py +++ b/src/py/flwr/server/strategy/aggregate_test.py @@ -20,6 +20,7 @@ import numpy as np from .aggregate import ( + _aggregate_n_closest_weights, _check_weights_equality, _find_reference_weights, aggregate, @@ -109,3 +110,28 @@ def test_find_reference_weights() -> None: expected = 3 assert result == expected + + +def test_aggregate_n_closest_weights_mean() -> None: + beta_closest = 2 + reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + + list_of_weights = [ + [np.array([1, 2]), np.array([[1, 2], [3, 4]])], + [np.array([1.1, 2.1]), np.array([[1.1, 2.1], [3.1, 4.1]])], + [np.array([1.2, 2.2]), np.array([[1.2, 2.2], [3.2, 4.2]])], + [np.array([1.3, 2.3]), np.array([[0.9, 2.5], [3.4, 3.8]])], + ] + list_of_weights = [(weights, 0) for weights in list_of_weights] + + beta_closest_weights = _aggregate_n_closest_weights( + reference_weights, list_of_weights, beta_closest=beta_closest + ) + expected_averaged = [np.array([1.05, 2.05]), np.array([[0.95, 2.05], [3.05, 4.05]])] + + assert all( + [ + np.array_equal(expected, result) + for expected, result in zip(expected_averaged, beta_closest_weights) + ] + ) From ce01dc7b2414cad319e3b4768f298f044c413400 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 26 May 2023 14:29:18 +0200 Subject: [PATCH 18/36] Simplify aggregate_bulyan * Change the data type of selected_model_set * Remove redundant tracker --- src/py/flwr/server/strategy/aggregate.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index c915adcc3062..b7c82fd7d33d 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -96,11 +96,7 @@ def aggregate_bulyan( results: List[Tuple[NDArrays, int]], num_malicious: int ) -> NDArrays: """Perform Bulyan aggregation.""" - selected_models_set: Dict[int, Tuple[NDArrays, int]] = {} - # selected_models_set: List[Tuple[NDArrays, int]] = [] - - # List of idx to keep track of the order of clients - tracker: NDArray = np.arange(len(results)) + selected_models_set: List[Tuple[NDArrays, int]] = [] # Create a list of weights and ignore the number of examples weights = [weights for weights, _ in results] @@ -118,18 +114,17 @@ def aggregate_bulyan( list_of_weights = [weights for weights, num_samples in results] best_idx = _find_reference_weights(best_model, list_of_weights) - selected_models_set[tracker[best_idx]] = results[best_idx] + selected_models_set.append(results[best_idx]) # remove idx from tracker and weights_results - tracker = np.delete(tracker, best_idx) # type: ignore results.pop(best_idx) # Compute median parameter vector across selected_models_set - median_vect = aggregate_median(list(selected_models_set.values())) + median_vect = aggregate_median(selected_models_set) # Take the averaged beta parameters of the closest distance to the median (coordinate-wise) parameters_aggregated = _aggregate_n_closest_weights( - median_vect, list(selected_models_set.values()), beta_closest=beta + median_vect, selected_models_set, beta_closest=beta ) return parameters_aggregated From 3f30afa96ff423e59c7a47939afec3610d2f088b Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 11:19:45 +0200 Subject: [PATCH 19/36] =?UTF-8?q?Add=20support=20for=20different=20Byzanti?= =?UTF-8?q?ne=E2=80=93resilient=20first=20step=20aggregations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/py/flwr/server/strategy/aggregate.py | 40 ++++++++++++++++++---- src/py/flwr/server/strategy/bulyan.py | 15 ++++++-- src/py/flwr/server/strategy/bulyan_test.py | 1 + 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index b7c82fd7d33d..d08abea6c36d 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -16,7 +16,7 @@ from functools import reduce -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Callable, Any import numpy as np @@ -90,12 +90,30 @@ def aggregate_krum( # Return the model parameters that minimize the score (Krum) return weights[np.argmin(scores)] - +BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION = [aggregate_krum] # also GeoMed (but not implemented yet) +BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION = [] # Brute, Medoid (but not implemented yet) # pylint: disable=too-many-locals def aggregate_bulyan( - results: List[Tuple[NDArrays, int]], num_malicious: int + results: List[Tuple[NDArrays, int]], num_malicious: int, aggregation_rule: Callable, **aggregation_rule_kwargs: Any ) -> NDArrays: - """Perform Bulyan aggregation.""" + """ + Perform Bulyan aggregation. + + Parameters + ---------- + results: List[Tuple[NDArrays, int]] + Weights and number of samples for each of the client. + num_malicious: int + The maximum number of malicious clients. + aggregation_rule: Callable + Byzantine resilient aggregation rule that is used as the first step of the Bulyan + aggregation_rule_kwargs: Any + The arguments to the aggregation rule. + Returns + ------- + aggregated_parameters: NDArrays + Aggregated parameters according to the Bulyan strategy. + """ selected_models_set: List[Tuple[NDArrays, int]] = [] # Create a list of weights and ignore the number of examples @@ -110,9 +128,19 @@ def aggregate_bulyan( beta = 1 for _ in range(theta): - best_model = aggregate_krum(results, num_malicious, to_keep=0) + + best_model = aggregation_rule(results=results, num_malicious=num_malicious, **aggregation_rule_kwargs) list_of_weights = [weights for weights, num_samples in results] - best_idx = _find_reference_weights(best_model, list_of_weights) + # This group gives exact result + if aggregation_rule in BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION: + best_idx = _find_reference_weights(best_model, list_of_weights) + # This group requires finding the closest model to the returned one (weights distance wise) + elif aggregation_rule in BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION: + # TODO: write a function to find the closest model + best_idx = 0 + else: + raise ValueError("The given aggregation rule is not added as Byzantine resilient. " + "Please choose from Byzantine resilient rules.") selected_models_set.append(results[best_idx]) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 33cf9d64e9d6..09913013a269 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -19,7 +19,7 @@ from logging import WARNING -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union, Any from flwr.common import ( FitRes, @@ -33,7 +33,7 @@ from flwr.common.logger import log from flwr.server.client_proxy import ClientProxy -from .aggregate import aggregate_bulyan +from .aggregate import aggregate_bulyan, aggregate_krum from .fedavg import FedAvg @@ -63,6 +63,8 @@ def __init__( initial_parameters: Optional[Parameters] = None, fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, + first_aggregation_rule: Callable = aggregate_krum, + **aggregation_rule_kwargs: Any, ) -> None: """Configurable FedMedian strategy. @@ -92,6 +94,11 @@ def __init__( Whether or not accept rounds containing failures. Defaults to True. initial_parameters : Parameters, optional Initial global model parameters. + first_aggregation_rule: Callable + Byzantine resilient aggregation rule that is used as the first step of the Bulyan e.g.krum + **aggregation_rule_kwargs: Any + arguments to the first_aggregation rule + """ super().__init__( fraction_fit=fraction_fit, @@ -108,6 +115,8 @@ def __init__( evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn, ) self.num_malicious_clients = num_malicious_clients + self.first_aggregation_rule = first_aggregation_rule + self.aggregation_rule_kwargs = aggregation_rule_kwargs def __repr__(self) -> str: rep = f"Bulyan(accept_failures={self.accept_failures})" @@ -134,7 +143,7 @@ def aggregate_fit( # Aggregate weights parameters_aggregated = ndarrays_to_parameters( - aggregate_bulyan(weights_results, self.num_malicious_clients) + aggregate_bulyan(weights_results, self.num_malicious_clients, self.first_aggregation_rule, **self.aggregation_rule_kwargs) ) # Aggregate custom metrics if aggregation fn was provided diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index b07c859d5d42..97d172bb98a0 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -42,6 +42,7 @@ def test_aggregate_fit() -> None: strategy = Bulyan( initial_parameters=ndarrays_to_parameters(previous_weights), num_malicious_clients=1, + to_keep=0 ) param_0: Parameters = ndarrays_to_parameters( [array([0.2, 0.2, 0.2, 0.2], dtype=float32)] From 18e5d1ce633cc80b249dfc0af0abe6cca957a0bb Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 11:32:04 +0200 Subject: [PATCH 20/36] Add the check for assumptions needed to use the aggregation strategy --- src/py/flwr/server/strategy/aggregate.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index d08abea6c36d..0b9176e0b308 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -114,21 +114,22 @@ def aggregate_bulyan( aggregated_parameters: NDArrays Aggregated parameters according to the Bulyan strategy. """ + num_clients = len(results) + if num_clients < 4 * num_malicious + 3: + raise ValueError( + "The Bulyan aggregation requires then number of clients to be greater or equal to the " + "4 * num_malicious + 3. This is the assumption of this method. " + "It is needed to ensure that the method reduces the attacker's leeway to the one " + "proved in the paper.") selected_models_set: List[Tuple[NDArrays, int]] = [] # Create a list of weights and ignore the number of examples weights = [weights for weights, _ in results] theta = len(weights) - 2 * num_malicious - if theta <= 0: - theta = 1 - beta = theta - 2 * num_malicious - if beta <= 0: - beta = 1 for _ in range(theta): - best_model = aggregation_rule(results=results, num_malicious=num_malicious, **aggregation_rule_kwargs) list_of_weights = [weights for weights, num_samples in results] # This group gives exact result From 68712ffee7ffd192c15761142f773d1512e431f9 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 12:06:45 +0200 Subject: [PATCH 21/36] Fix formatting --- src/py/flwr/server/strategy/aggregate.py | 32 ++++++++++++++++-------- src/py/flwr/server/strategy/bulyan.py | 10 +++++--- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 0b9176e0b308..5c7c0071cb98 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -16,7 +16,7 @@ from functools import reduce -from typing import Dict, List, Tuple, Callable, Any +from typing import Any, Callable, List, Tuple import numpy as np @@ -90,14 +90,21 @@ def aggregate_krum( # Return the model parameters that minimize the score (Krum) return weights[np.argmin(scores)] -BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION = [aggregate_krum] # also GeoMed (but not implemented yet) -BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION = [] # Brute, Medoid (but not implemented yet) + +BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION = [ + aggregate_krum +] # also GeoMed (but not implemented yet) +BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION = ( + [] +) # Brute, Medoid (but not implemented yet) # pylint: disable=too-many-locals def aggregate_bulyan( - results: List[Tuple[NDArrays, int]], num_malicious: int, aggregation_rule: Callable, **aggregation_rule_kwargs: Any + results: List[Tuple[NDArrays, int]], + num_malicious: int, + aggregation_rule: Callable, + **aggregation_rule_kwargs: Any, ) -> NDArrays: - """ - Perform Bulyan aggregation. + """Perform Bulyan aggregation. Parameters ---------- @@ -120,7 +127,8 @@ def aggregate_bulyan( "The Bulyan aggregation requires then number of clients to be greater or equal to the " "4 * num_malicious + 3. This is the assumption of this method. " "It is needed to ensure that the method reduces the attacker's leeway to the one " - "proved in the paper.") + "proved in the paper." + ) selected_models_set: List[Tuple[NDArrays, int]] = [] # Create a list of weights and ignore the number of examples @@ -130,7 +138,9 @@ def aggregate_bulyan( beta = theta - 2 * num_malicious for _ in range(theta): - best_model = aggregation_rule(results=results, num_malicious=num_malicious, **aggregation_rule_kwargs) + best_model = aggregation_rule( + results=results, num_malicious=num_malicious, **aggregation_rule_kwargs + ) list_of_weights = [weights for weights, num_samples in results] # This group gives exact result if aggregation_rule in BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION: @@ -140,8 +150,10 @@ def aggregate_bulyan( # TODO: write a function to find the closest model best_idx = 0 else: - raise ValueError("The given aggregation rule is not added as Byzantine resilient. " - "Please choose from Byzantine resilient rules.") + raise ValueError( + "The given aggregation rule is not added as Byzantine resilient. " + "Please choose from Byzantine resilient rules." + ) selected_models_set.append(results[best_idx]) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 09913013a269..62ed9a694238 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -19,7 +19,7 @@ from logging import WARNING -from typing import Callable, Dict, List, Optional, Tuple, Union, Any +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from flwr.common import ( FitRes, @@ -98,7 +98,6 @@ def __init__( Byzantine resilient aggregation rule that is used as the first step of the Bulyan e.g.krum **aggregation_rule_kwargs: Any arguments to the first_aggregation rule - """ super().__init__( fraction_fit=fraction_fit, @@ -143,7 +142,12 @@ def aggregate_fit( # Aggregate weights parameters_aggregated = ndarrays_to_parameters( - aggregate_bulyan(weights_results, self.num_malicious_clients, self.first_aggregation_rule, **self.aggregation_rule_kwargs) + aggregate_bulyan( + weights_results, + self.num_malicious_clients, + self.first_aggregation_rule, + **self.aggregation_rule_kwargs, + ) ) # Aggregate custom metrics if aggregation fn was provided From 7de96161183f3248e1f7609496e5af961eb22fd0 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 12:29:36 +0200 Subject: [PATCH 22/36] Fix formatting and tests --- src/py/flwr/server/strategy/aggregate.py | 29 +++++++++++-------- src/py/flwr/server/strategy/aggregate_test.py | 27 ++++++++++------- src/py/flwr/server/strategy/bulyan.py | 2 +- src/py/flwr/server/strategy/bulyan_test.py | 7 +++-- 4 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 5c7c0071cb98..5c24d937e807 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -113,7 +113,7 @@ def aggregate_bulyan( num_malicious: int The maximum number of malicious clients. aggregation_rule: Callable - Byzantine resilient aggregation rule that is used as the first step of the Bulyan + Byzantine resilient aggregation rule used as the first step of the Bulyan aggregation_rule_kwargs: Any The arguments to the aggregation rule. Returns @@ -124,10 +124,10 @@ def aggregate_bulyan( num_clients = len(results) if num_clients < 4 * num_malicious + 3: raise ValueError( - "The Bulyan aggregation requires then number of clients to be greater or equal to the " - "4 * num_malicious + 3. This is the assumption of this method. " - "It is needed to ensure that the method reduces the attacker's leeway to the one " - "proved in the paper." + "The Bulyan aggregation requires then number of clients to be greater or " + "equal to the 4 * num_malicious + 3. This is the assumption of this method." + "It is needed to ensure that the method reduces the attacker's leeway to " + "the one proved in the paper." ) selected_models_set: List[Tuple[NDArrays, int]] = [] @@ -145,9 +145,11 @@ def aggregate_bulyan( # This group gives exact result if aggregation_rule in BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION: best_idx = _find_reference_weights(best_model, list_of_weights) - # This group requires finding the closest model to the returned one (weights distance wise) + # This group requires finding the closest model to the returned one + # (weights distance wise) elif aggregation_rule in BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION: - # TODO: write a function to find the closest model + # when different aggregation strategies available + # write a function to find the closest model best_idx = 0 else: raise ValueError( @@ -163,7 +165,8 @@ def aggregate_bulyan( # Compute median parameter vector across selected_models_set median_vect = aggregate_median(selected_models_set) - # Take the averaged beta parameters of the closest distance to the median (coordinate-wise) + # Take the averaged beta parameters of the closest distance to the median + # (coordinate-wise) parameters_aggregated = _aggregate_n_closest_weights( median_vect, selected_models_set, beta_closest=beta ) @@ -295,8 +298,8 @@ def _aggregate_n_closest_weights( ) -> NDArrays: """It calculates element-wise mean of the `N` the closest values. - Note, each i-th coordinate of the result weight is the average of the beta_closest -ith - coordinates to the reference weights + Note, each i-th coordinate of the result weight is the average of the beta_closest + -ith coordinates to the reference weights Parameters @@ -310,7 +313,8 @@ def _aggregate_n_closest_weights( Returns ------- aggregated_weights: NDArrays - Averaged (element-wise) beta weights that have the closest distance to reference weights + Averaged (element-wise) beta weights that have the closest distance to + reference weights """ list_of_weights = [weights for weights, num_examples in results] aggregated_weights = [] @@ -326,7 +330,8 @@ def _aggregate_n_closest_weights( # We do not need the exact order but just the beta closest weights # therefore np.argpartition is used instead of np.argsort indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0) - # Take the weights (coordinate-wise) corresponding to the beta of the closest distances + # Take the weights (coordinate-wise) corresponding to the beta of the + # closest distances beta_closest_weights = np.take_along_axis( other_weights_layer_np, indices=indices, axis=0 )[:beta_closest] diff --git a/src/py/flwr/server/strategy/aggregate_test.py b/src/py/flwr/server/strategy/aggregate_test.py index 052df354330d..9b0b383f9215 100644 --- a/src/py/flwr/server/strategy/aggregate_test.py +++ b/src/py/flwr/server/strategy/aggregate_test.py @@ -74,30 +74,34 @@ def test_weighted_loss_avg_multiple_values() -> None: def test_check_weights_equality_true() -> None: - w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] - w2 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] - results = _check_weights_equality(w1, w2) + """Check weights equality - the same weights.""" + weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + weights2 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + results = _check_weights_equality(weights1, weights2) expected = True assert expected == results def test_check_weights_equality_numeric_false() -> None: - w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] - w2 = [np.array([2, 2]), np.array([[1, 2], [3, 4]])] - results = _check_weights_equality(w1, w2) + """Check weights equality - different weights, same length.""" + weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + weights2 = [np.array([2, 2]), np.array([[1, 2], [3, 4]])] + results = _check_weights_equality(weights1, weights2) expected = False assert expected == results def test_check_weights_equality_various_length_false() -> None: - w1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] - w2 = [np.array([1, 2])] - results = _check_weights_equality(w1, w2) + """Check weights equality - the same first layer weights, different length.""" + weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] + weights2 = [np.array([1, 2])] + results = _check_weights_equality(weights1, weights2) expected = False assert expected == results def test_find_reference_weights() -> None: + """Check if the finding weights from list of weigths work.""" reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] list_of_weights = [ [np.array([2, 2]), np.array([[1, 2], [3, 4]])], @@ -113,6 +117,7 @@ def test_find_reference_weights() -> None: def test_aggregate_n_closest_weights_mean() -> None: + """Check if aggregation of n closest weights to the reference works.""" beta_closest = 2 reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])] @@ -130,8 +135,8 @@ def test_aggregate_n_closest_weights_mean() -> None: expected_averaged = [np.array([1.05, 2.05]), np.array([[0.95, 2.05], [3.05, 4.05]])] assert all( - [ + ( np.array_equal(expected, result) for expected, result in zip(expected_averaged, beta_closest_weights) - ] + ) ) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 62ed9a694238..abd39dc5564e 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -41,7 +41,7 @@ class Bulyan(FedAvg): """Configurable Bulyan strategy implementation.""" - # pylint: disable=too-many-arguments,too-many-instance-attributes,line-too-long + # pylint: disable=too-many-arguments,too-many-instance-attributes,line-too-long, too-many-locals def __init__( self, *, diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index 97d172bb98a0..a1c75479d801 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -41,8 +41,8 @@ def test_aggregate_fit() -> None: previous_weights: NDArrays = [array([0.1, 0.1, 0.1, 0.1], dtype=float32)] strategy = Bulyan( initial_parameters=ndarrays_to_parameters(previous_weights), - num_malicious_clients=1, - to_keep=0 + num_malicious_clients=0, + to_keep=0, ) param_0: Parameters = ndarrays_to_parameters( [array([0.2, 0.2, 0.2, 0.2], dtype=float32)] @@ -118,7 +118,8 @@ def test_aggregate_fit() -> None: ), ), ] - expected: NDArrays = [array([0.35, 0.35, 0.35, 0.35], dtype=float32)] + coordinate = (0.2 + 0.5 + 0.7 + 12.0 + 0.1 + 0.1) / 6 + expected: NDArrays = [array([coordinate] * 4, dtype=float32)] # Execute actual_aggregated, _ = strategy.aggregate_fit( From c9c9f7b3b2607c31f2fbed851f6ff0f62f10d7a3 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 12:55:11 +0200 Subject: [PATCH 23/36] Fix formatting --- src/py/flwr/server/strategy/aggregate.py | 2 ++ src/py/flwr/server/strategy/bulyan.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 4639d6e2b4a0..643946fbd41e 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -97,6 +97,8 @@ def aggregate_krum( BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION = ( [] ) # Brute, Medoid (but not implemented yet) + + # pylint: disable=too-many-locals def aggregate_bulyan( results: List[Tuple[NDArrays, int]], diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index abd39dc5564e..76c207e1df3a 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -14,7 +14,7 @@ # ============================================================================== """Bulyan [El Mhamdi et al., 2018] strategy. -Paper: https://arxiv.org/abs/1802.07927 +Paper: arxiv.org/abs/1802.07927 """ From 6dc62615e39db77a5b1bb9a9a35b218160a09e8f Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 14:33:51 +0200 Subject: [PATCH 24/36] Apply suggestions --- src/py/flwr/server/strategy/aggregate.py | 1 + src/py/flwr/server/strategy/bulyan.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 643946fbd41e..08ccb98b11a0 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -118,6 +118,7 @@ def aggregate_bulyan( Byzantine resilient aggregation rule used as the first step of the Bulyan aggregation_rule_kwargs: Any The arguments to the aggregation rule. + Returns ------- aggregated_parameters: NDArrays diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 76c207e1df3a..d0ef455f079b 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -66,7 +66,7 @@ def __init__( first_aggregation_rule: Callable = aggregate_krum, **aggregation_rule_kwargs: Any, ) -> None: - """Configurable FedMedian strategy. + """Configurable Bulyan strategy. Implementation based on https://arxiv.org/pdf/1803.01498v1.pdf @@ -95,7 +95,7 @@ def __init__( initial_parameters : Parameters, optional Initial global model parameters. first_aggregation_rule: Callable - Byzantine resilient aggregation rule that is used as the first step of the Bulyan e.g.krum + Byzantine resilient aggregation rule that is used as the first step of the Bulyan (e.g., Krum) **aggregation_rule_kwargs: Any arguments to the first_aggregation rule """ From 330058b8e0607fcc2852fb7485b89b50ccf0ceea Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 14:36:58 +0200 Subject: [PATCH 25/36] Move list constants to aggregate_bulyan --- src/py/flwr/server/strategy/aggregate.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 08ccb98b11a0..73d0118bed22 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -91,14 +91,6 @@ def aggregate_krum( return weights[np.argmin(scores)] -BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION = [ - aggregate_krum -] # also GeoMed (but not implemented yet) -BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION = ( - [] -) # Brute, Medoid (but not implemented yet) - - # pylint: disable=too-many-locals def aggregate_bulyan( results: List[Tuple[NDArrays, int]], @@ -124,6 +116,13 @@ def aggregate_bulyan( aggregated_parameters: NDArrays Aggregated parameters according to the Bulyan strategy. """ + byzantine_resilient_single_ret_model_aggregation = [ + aggregate_krum + ] # also GeoMed (but not implemented yet) + byzantine_resilient_many_return_models_aggregation = ( + [] + ) # Brute, Medoid (but not implemented yet) + num_clients = len(results) if num_clients < 4 * num_malicious + 3: raise ValueError( @@ -146,11 +145,11 @@ def aggregate_bulyan( ) list_of_weights = [weights for weights, num_samples in results] # This group gives exact result - if aggregation_rule in BYZANTINE_RESILIENT_SINGLE_RET_MODEL_AGGREGATION: + if aggregation_rule in byzantine_resilient_single_ret_model_aggregation: best_idx = _find_reference_weights(best_model, list_of_weights) # This group requires finding the closest model to the returned one # (weights distance wise) - elif aggregation_rule in BYZANTINE_RESILIENT_MANY_RETURN_MODELS_AGGREGATION: + elif aggregation_rule in byzantine_resilient_many_return_models_aggregation: # when different aggregation strategies available # write a function to find the closest model best_idx = 0 From aa7e7352fbcc4a42154dbb379cc4a4cbd6f6423d Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Fri, 9 Jun 2023 14:52:31 +0200 Subject: [PATCH 26/36] Fix mypy tests --- src/py/flwr/server/strategy/aggregate.py | 16 ++++++---------- src/py/flwr/server/strategy/aggregate_test.py | 4 ++-- src/py/flwr/server/strategy/bulyan.py | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 73d0118bed22..a50e9f6f9de0 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -95,7 +95,7 @@ def aggregate_krum( def aggregate_bulyan( results: List[Tuple[NDArrays, int]], num_malicious: int, - aggregation_rule: Callable, + aggregation_rule: Callable, # type: ignore **aggregation_rule_kwargs: Any, ) -> NDArrays: """Perform Bulyan aggregation. @@ -116,12 +116,10 @@ def aggregate_bulyan( aggregated_parameters: NDArrays Aggregated parameters according to the Bulyan strategy. """ - byzantine_resilient_single_ret_model_aggregation = [ - aggregate_krum - ] # also GeoMed (but not implemented yet) - byzantine_resilient_many_return_models_aggregation = ( - [] - ) # Brute, Medoid (but not implemented yet) + byzantine_resilient_single_ret_model_aggregation = [aggregate_krum] + # also GeoMed (but not implemented yet) + byzantine_resilient_many_return_models_aggregation = [] # type: ignore + # Brute, Medoid (but not implemented yet) num_clients = len(results) if num_clients < 4 * num_malicious + 3: @@ -334,8 +332,6 @@ def _aggregate_n_closest_weights( indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0) # Take the weights (coordinate-wise) corresponding to the beta of the # closest distances - beta_closest_weights = np.take_along_axis( - other_weights_layer_np, indices=indices, axis=0 - )[:beta_closest] + beta_closest_weights = np.take_along_axis(other_weights_layer_np, indices=indices, axis=0)[:beta_closest] # type: ignore[no-untyped-call] aggregated_weights.append(np.mean(beta_closest_weights, axis=0)) return aggregated_weights diff --git a/src/py/flwr/server/strategy/aggregate_test.py b/src/py/flwr/server/strategy/aggregate_test.py index 9b0b383f9215..bbb1057cf256 100644 --- a/src/py/flwr/server/strategy/aggregate_test.py +++ b/src/py/flwr/server/strategy/aggregate_test.py @@ -127,10 +127,10 @@ def test_aggregate_n_closest_weights_mean() -> None: [np.array([1.2, 2.2]), np.array([[1.2, 2.2], [3.2, 4.2]])], [np.array([1.3, 2.3]), np.array([[0.9, 2.5], [3.4, 3.8]])], ] - list_of_weights = [(weights, 0) for weights in list_of_weights] + list_of_weights_and_samples = [(weights, 0) for weights in list_of_weights] beta_closest_weights = _aggregate_n_closest_weights( - reference_weights, list_of_weights, beta_closest=beta_closest + reference_weights, list_of_weights_and_samples, beta_closest=beta_closest ) expected_averaged = [np.array([1.05, 2.05]), np.array([[0.95, 2.05], [3.05, 4.05]])] diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index d0ef455f079b..676a9d7b3d2d 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -63,7 +63,7 @@ def __init__( initial_parameters: Optional[Parameters] = None, fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, - first_aggregation_rule: Callable = aggregate_krum, + first_aggregation_rule: Callable = aggregate_krum, # type: ignore **aggregation_rule_kwargs: Any, ) -> None: """Configurable Bulyan strategy. From 23680ea100ac20aa43c104d59188de9852e868a3 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Mon, 12 Jun 2023 10:55:00 +0200 Subject: [PATCH 27/36] Apply suggestions --- src/py/flwr/server/strategy/aggregate.py | 13 +++++++------ src/py/flwr/server/strategy/bulyan.py | 6 +++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index a50e9f6f9de0..63d89dd25e35 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -131,10 +131,7 @@ def aggregate_bulyan( ) selected_models_set: List[Tuple[NDArrays, int]] = [] - # Create a list of weights and ignore the number of examples - weights = [weights for weights, _ in results] - - theta = len(weights) - 2 * num_malicious + theta = len(results) - 2 * num_malicious beta = theta - 2 * num_malicious for _ in range(theta): @@ -150,7 +147,11 @@ def aggregate_bulyan( elif aggregation_rule in byzantine_resilient_many_return_models_aggregation: # when different aggregation strategies available # write a function to find the closest model - best_idx = 0 + raise NotImplementedError( + "aggregate_bulyan currently does not support the aggregation rules that" + " return many models as results. " + "Such aggregation rules are currently not available in Flower." + ) else: raise ValueError( "The given aggregation rule is not added as Byzantine resilient. " @@ -296,7 +297,7 @@ def _find_reference_weights( def _aggregate_n_closest_weights( reference_weights: NDArrays, results: List[Tuple[NDArrays, int]], beta_closest: int ) -> NDArrays: - """It calculates element-wise mean of the `N` the closest values. + """Calculate element-wise mean of the `N` closest values. Note, each i-th coordinate of the result weight is the average of the beta_closest -ith coordinates to the reference weights diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 676a9d7b3d2d..bb76a5076016 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -68,14 +68,14 @@ def __init__( ) -> None: """Configurable Bulyan strategy. - Implementation based on https://arxiv.org/pdf/1803.01498v1.pdf + Implementation based on arxiv.org/abs/1802.07927. Parameters ---------- fraction_fit : float, optional - Fraction of clients used during training. Defaults to 0.1. + Fraction of clients used during training. Defaults to 1.0. fraction_evaluate : float, optional - Fraction of clients used during validation. Defaults to 0.1. + Fraction of clients used during validation. Defaults to 1.0. min_fit_clients : int, optional Minimum number of clients used during training. Defaults to 2. min_evaluate_clients : int, optional From 2be7e9d48bb18e67f8ec1d083d39322ba1e0f19a Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 1 Aug 2023 15:28:08 +0200 Subject: [PATCH 28/36] Fix tests in aggregate --- src/py/flwr/server/strategy/aggregate.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 63d89dd25e35..6de08dead4dc 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -268,8 +268,9 @@ def _check_weights_equality(weights1: NDArrays, weights2: NDArrays) -> bool: def _find_reference_weights( reference_weights: NDArrays, list_of_weights: List[NDArrays] ) -> int: - """Loop through the `list_of_weights` to find the reference weights, raise - Error if not found. + """Find the reference weights by looping through the `list_of_weights`. + + Raise Error if the reference weights is not found. Parameters ---------- @@ -311,6 +312,7 @@ def _aggregate_n_closest_weights( The weights from models beta_closest: int The number of the closest distance weights that will be averaged + Returns ------- aggregated_weights: NDArrays From a1171a0bca7820fdbf5c1e3c8119a8425f63e362 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 2 Aug 2023 13:25:03 +0200 Subject: [PATCH 29/36] Fix pylint errors in aggregate --- src/py/flwr/server/strategy/aggregate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 6de08dead4dc..300f29192c77 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -335,6 +335,10 @@ def _aggregate_n_closest_weights( indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0) # Take the weights (coordinate-wise) corresponding to the beta of the # closest distances - beta_closest_weights = np.take_along_axis(other_weights_layer_np, indices=indices, axis=0)[:beta_closest] # type: ignore[no-untyped-call] + beta_closest_weights = np.take_along_axis( + other_weights_layer_np, indices=indices, axis=0 + )[ + :beta_closest + ] # type: ignore[no-untyped-call] aggregated_weights.append(np.mean(beta_closest_weights, axis=0)) return aggregated_weights From bd23813d837cf5efa6553fd7f487e33599ceba1f Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Wed, 2 Aug 2023 14:33:21 +0200 Subject: [PATCH 30/36] Fix pylint errors in aggregate --- src/py/flwr/server/strategy/aggregate.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 300f29192c77..cb8e122af5e2 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -13,7 +13,7 @@ # limitations under the License. # ============================================================================== """Aggregation functions for strategy implementations.""" - +# mypy: disallow_untyped_calls=False from functools import reduce from typing import Any, Callable, List, Tuple @@ -48,7 +48,7 @@ def aggregate_median(results: List[Tuple[NDArrays, int]]) -> NDArrays: # Compute median weight of each layer median_w: NDArrays = [ - np.median(np.asarray(layer), axis=0) for layer in zip(*weights) # type: ignore + np.median(np.asarray(layer), axis=0) for layer in zip(*weights) ] return median_w @@ -205,14 +205,12 @@ def _compute_distances(weights: List[NDArrays]) -> NDArray: Input: weights - list of weights vectors Output: distances - matrix distance_matrix of squared distances between the vectors """ - flat_w = np.array( - [np.concatenate(p, axis=None).ravel() for p in weights] # type: ignore - ) + flat_w = np.array([np.concatenate(p, axis=None).ravel() for p in weights]) distance_matrix = np.zeros((len(weights), len(weights))) for i, _ in enumerate(flat_w): for j, _ in enumerate(flat_w): delta = flat_w[i] - flat_w[j] - norm = np.linalg.norm(delta) # type: ignore + norm = np.linalg.norm(delta) distance_matrix[i, j] = norm**2 return distance_matrix @@ -337,8 +335,6 @@ def _aggregate_n_closest_weights( # closest distances beta_closest_weights = np.take_along_axis( other_weights_layer_np, indices=indices, axis=0 - )[ - :beta_closest - ] # type: ignore[no-untyped-call] + )[:beta_closest] aggregated_weights.append(np.mean(beta_closest_weights, axis=0)) return aggregated_weights From 2f0a75d34d88b9d83a89c061d49aaf6e424b5563 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Tue, 12 Sep 2023 13:03:39 +0200 Subject: [PATCH 31/36] Fix the ruff linter errors --- src/py/flwr/server/strategy/bulyan.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index bb76a5076016..775f0c9686f3 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -39,7 +39,7 @@ # flake8: noqa: E501 class Bulyan(FedAvg): - """Configurable Bulyan strategy implementation.""" + """Bulyan strategy implementation.""" # pylint: disable=too-many-arguments,too-many-instance-attributes,line-too-long, too-many-locals def __init__( @@ -66,7 +66,7 @@ def __init__( first_aggregation_rule: Callable = aggregate_krum, # type: ignore **aggregation_rule_kwargs: Any, ) -> None: - """Configurable Bulyan strategy. + """Bulyan strategy. Implementation based on arxiv.org/abs/1802.07927. @@ -118,6 +118,7 @@ def __init__( self.aggregation_rule_kwargs = aggregation_rule_kwargs def __repr__(self) -> str: + """Compute a string representation of the strategy.""" rep = f"Bulyan(accept_failures={self.accept_failures})" return rep From e5a4ad816c97332ee40124c02934d5eb59eef87e Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Mon, 2 Oct 2023 14:28:14 +0200 Subject: [PATCH 32/36] Add Bulyan to the docs --- doc/source/ref-api-flwr.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/doc/source/ref-api-flwr.rst b/doc/source/ref-api-flwr.rst index 07936f117444..e1983cd92c90 100644 --- a/doc/source/ref-api-flwr.rst +++ b/doc/source/ref-api-flwr.rst @@ -214,6 +214,16 @@ server.strategy.Krum .. automethod:: __init__ +.. _flwr-server-strategy-Bulyan-apiref: + +server.strategy.Bulyan +^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: flwr.server.strategy.Bulyan + :members: + + .. automethod:: __init__ + .. _flwr-server-strategy-FedXgbNnAvg-apiref: From 1df361a5e2e235965dfddcf61e6978e34df82784 Mon Sep 17 00:00:00 2001 From: Adam Narozniak Date: Mon, 2 Oct 2023 14:28:33 +0200 Subject: [PATCH 33/36] Enable import from strategy --- src/py/flwr/server/strategy/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/py/flwr/server/strategy/__init__.py b/src/py/flwr/server/strategy/__init__.py index f14fd0540893..cc395b4f5bb3 100644 --- a/src/py/flwr/server/strategy/__init__.py +++ b/src/py/flwr/server/strategy/__init__.py @@ -15,6 +15,7 @@ """Contains the strategy abstraction and different implementations.""" +from .bulyan import Bulyan as Bulyan from .dpfedavg_adaptive import DPFedAvgAdaptive as DPFedAvgAdaptive from .dpfedavg_fixed import DPFedAvgFixed as DPFedAvgFixed from .fault_tolerant_fedavg import FaultTolerantFedAvg as FaultTolerantFedAvg @@ -48,6 +49,7 @@ "FedMedian", "FedTrimmedAvg", "Krum", + "Bulyan", "DPFedAvgAdaptive", "DPFedAvgFixed", "Strategy", From e0c30ccc4236a6d8108d1235ea3634121c24ea82 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 16 Nov 2023 13:51:24 +0000 Subject: [PATCH 34/36] updated copyright note --- src/py/flwr/server/strategy/bulyan.py | 2 +- src/py/flwr/server/strategy/bulyan_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 775f0c9686f3..1a268e3ac078 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -1,4 +1,4 @@ -# Copyright 2020 Adap GmbH. All Rights Reserved. +# Copyright 2020 Flower Labs GmbH. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/py/flwr/server/strategy/bulyan_test.py b/src/py/flwr/server/strategy/bulyan_test.py index a1c75479d801..299ed49066fb 100644 --- a/src/py/flwr/server/strategy/bulyan_test.py +++ b/src/py/flwr/server/strategy/bulyan_test.py @@ -1,4 +1,4 @@ -# Copyright 2020 Adap GmbH. All Rights Reserved. +# Copyright 2020 Flower Labs GmbH. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From dfb3d5bece0a48470eb2488ac2fd11f14b287493 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 16 Nov 2023 13:58:58 +0000 Subject: [PATCH 35/36] url clickable in docs --- src/py/flwr/server/strategy/bulyan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/bulyan.py b/src/py/flwr/server/strategy/bulyan.py index 1a268e3ac078..0243f4e6546f 100644 --- a/src/py/flwr/server/strategy/bulyan.py +++ b/src/py/flwr/server/strategy/bulyan.py @@ -68,7 +68,7 @@ def __init__( ) -> None: """Bulyan strategy. - Implementation based on arxiv.org/abs/1802.07927. + Implementation based on https://arxiv.org/abs/1802.07927. Parameters ---------- From 2d9fde8d9491e332b6c54e4a54e7503709099934 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Thu, 16 Nov 2023 14:16:08 +0000 Subject: [PATCH 36/36] minor fix --- src/py/flwr/server/strategy/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/server/strategy/aggregate.py b/src/py/flwr/server/strategy/aggregate.py index 34fd14ea08ea..63926f2eaa51 100644 --- a/src/py/flwr/server/strategy/aggregate.py +++ b/src/py/flwr/server/strategy/aggregate.py @@ -328,7 +328,7 @@ def _aggregate_n_closest_weights( other_weights_layer_np = np.array(other_weights_layer_list) diff_np = np.abs(layer_weights - other_weights_layer_np) # Create indices of the smallest differences - # We do not need the exact order but just the beta closest weights + # We do not need the exact order but just the beta closest weights # therefore np.argpartition is used instead of np.argsort indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0) # Take the weights (coordinate-wise) corresponding to the beta of the