diff --git a/datasets/flwr_datasets/partitioner/__init__.py b/datasets/flwr_datasets/partitioner/__init__.py index d1eba7895702..83186a6d24ee 100644 --- a/datasets/flwr_datasets/partitioner/__init__.py +++ b/datasets/flwr_datasets/partitioner/__init__.py @@ -15,10 +15,18 @@ """Flower Datasets Partitioner package.""" +from .exponential_partitioner import ExponentialPartitioner from .iid_partitioner import IidPartitioner +from .linear_partitioner import LinearPartitioner from .partitioner import Partitioner +from .size_partitioner import SizePartitioner +from .square_partitioner import SquarePartitioner __all__ = [ "IidPartitioner", "Partitioner", + "SizePartitioner", + "LinearPartitioner", + "SquarePartitioner", + "ExponentialPartitioner", ] diff --git a/datasets/flwr_datasets/partitioner/exponential_partitioner.py b/datasets/flwr_datasets/partitioner/exponential_partitioner.py new file mode 100644 index 000000000000..07bb07cea9a5 --- /dev/null +++ b/datasets/flwr_datasets/partitioner/exponential_partitioner.py @@ -0,0 +1,43 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""ExponentialPartitioner class.""" + + +import numpy as np + +from flwr_datasets.partitioner.size_partitioner import SizePartitioner + + +class ExponentialPartitioner(SizePartitioner): + """Partitioner creates partitions of size that are correlated with exp(idx). + + The amount of data each client gets is correlated with the exponent of partition ID. + For instance, if the IDs range from 1 to M, client with ID 1 gets e units of + data, client 2 gets e^2 units, and so on, up to client M which gets e^M units. + The floor operation is applied on each of these numbers, it means floor(2.71...) + = 2; e^2 ~ 7.39 floor(7.39) = 7. The number is rounded down = the fraction is + always cut. The remainders of theses unassigned (fraction) samples is added to the + biggest partition (the one with the biggest idx). + + Parameters + ---------- + num_partitions : int + The total number of partitions that the data will be divided into. + """ + + def __init__(self, num_partitions: int) -> None: + super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.exp) + if num_partitions <= 0: + raise ValueError("The number of partitions must be greater than zero.") diff --git a/datasets/flwr_datasets/partitioner/linear_partitioner.py b/datasets/flwr_datasets/partitioner/linear_partitioner.py new file mode 100644 index 000000000000..a9dc8b020c08 --- /dev/null +++ b/datasets/flwr_datasets/partitioner/linear_partitioner.py @@ -0,0 +1,37 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""LinearPartitioner class.""" + + +from flwr_datasets.partitioner.size_partitioner import SizePartitioner + + +class LinearPartitioner(SizePartitioner): + """Partitioner creates partitions of size that are linearly correlated with idx. + + The amount of data each client gets is linearly correlated with the partition ID. + For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data, + client 2 gets 2 units, and so on, up to client M which gets M units. + + Parameters + ---------- + num_partitions : int + The total number of partitions that the data will be divided into. + """ + + def __init__(self, num_partitions: int) -> None: + super().__init__(num_partitions=num_partitions, node_id_to_size_fn=lambda x: x) + if num_partitions <= 0: + raise ValueError("The number of partitions must be greater than zero.") diff --git a/datasets/flwr_datasets/partitioner/size_partitioner.py b/datasets/flwr_datasets/partitioner/size_partitioner.py new file mode 100644 index 000000000000..27e1f528dc52 --- /dev/null +++ b/datasets/flwr_datasets/partitioner/size_partitioner.py @@ -0,0 +1,139 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SizePartitioner class.""" + + +from typing import Callable, Dict, List, Union + +import numpy as np + +import datasets +from flwr_datasets.partitioner.partitioner import Partitioner + + +class SizePartitioner(Partitioner): + """Base class for the deterministic size partitioning based on the `node_id`. + + The client with `node_id` has the following relationship regarding the number of + samples. + + `node_id_to_size_fn(node_id)` ~ number of samples for `node_id` + + If the function doesn't transform the `node_id` it's a linear correlation between + the number of sample for the node and the value of `node_id`. For instance, if the + node ids range from 1 to M, node with id 1 gets 1 unit of data, client 2 gets 2 + units, and so on, up to node M which gets M units. + + Note that size corresponding to the `node_id` is deterministic, yet in case of + different dataset shuffling the assignment of samples to `node_id` will vary. + + Parameters + ---------- + num_partitions : int + The total number of partitions that the data will be divided into. + node_id_to_size_fn : Callable + Function that defines the relationship between node id and the number of + samples. + """ + + def __init__( + self, + num_partitions: int, + node_id_to_size_fn: Callable, # type: ignore[type-arg] + ) -> None: + super().__init__() + if num_partitions <= 0: + raise ValueError("The number of partitions must be greater than zero.") + self._num_partitions = num_partitions + self._node_id_to_size_fn = node_id_to_size_fn + + self._node_id_to_size: Dict[int, int] = {} + self._node_id_to_indices: Dict[int, List[int]] = {} + # A flag to perform only a single compute to determine the indices + self._node_id_to_indices_determined = False + + def load_partition(self, idx: int) -> datasets.Dataset: + """Load a single partition based on the partition index. + + For this partitioner the number of samples is dependent on the partition idx. + + Parameters + ---------- + idx : int + the index that corresponds to the requested partition + + Returns + ------- + dataset_partition: Dataset + single dataset partition + """ + # The partitioning is done lazily - only when the first partition is requested. + # A single run creates the indices assignments for all the partition indices. + self._determine_node_id_to_indices_if_needed() + return self.dataset.select(self._node_id_to_indices[idx]) + + @property + def node_id_to_size(self) -> Dict[int, int]: + """Node id to the number of samples.""" + return self._node_id_to_size + + @property + def node_id_to_indices(self) -> Dict[int, List[int]]: + """Node id to the list of indices.""" + return self._node_id_to_indices + + def _determine_node_id_to_size(self) -> None: + """Determine data quantity associated with partition indices.""" + data_division_in_units = self._node_id_to_size_fn( + np.linspace(start=1, stop=self._num_partitions, num=self._num_partitions) + ) + total_units: Union[int, float] = data_division_in_units.sum() + # Normalize the units to get the fraction total dataset + partition_sizes_as_fraction = data_division_in_units / total_units + # Calculate the number of samples + partition_sizes_as_num_of_samples = np.array( + partition_sizes_as_fraction * len(self.dataset), dtype=np.int64 + ) + # Check if any sample is not allocated because of multiplication with fractions. + assigned_samples = np.sum(partition_sizes_as_num_of_samples) + left_unassigned_samples = len(self.dataset) - assigned_samples + # If there is any sample(s) left unassigned, assign it to the largest partition. + partition_sizes_as_num_of_samples[-1] += left_unassigned_samples + for idx, partition_size in enumerate(partition_sizes_as_num_of_samples): + self._node_id_to_size[idx] = partition_size + + self._check_if_node_id_to_size_possible() + + def _determine_node_id_to_indices_if_needed(self) -> None: + """Create an assignment of indices to the partition indices..""" + if self._node_id_to_indices_determined is True: + return + self._determine_node_id_to_size() + total_samples_assigned = 0 + for idx, quantity in self._node_id_to_size.items(): + self._node_id_to_indices[idx] = list( + range(total_samples_assigned, total_samples_assigned + quantity) + ) + total_samples_assigned += quantity + self._node_id_to_indices_determined = True + + def _check_if_node_id_to_size_possible(self) -> None: + all_positive = all(value >= 1 for value in self.node_id_to_size.values()) + if not all_positive: + raise ValueError( + f"The given specification of the parameter num_partitions" + f"={self._num_partitions} for the given dataset results " + f"in the partitions sizes that are not greater than 0." + ) diff --git a/datasets/flwr_datasets/partitioner/size_partitioner_test.py b/datasets/flwr_datasets/partitioner/size_partitioner_test.py new file mode 100644 index 000000000000..390f6a613fce --- /dev/null +++ b/datasets/flwr_datasets/partitioner/size_partitioner_test.py @@ -0,0 +1,104 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SizePartitioner tests.""" + + +import unittest + +from parameterized import parameterized + +from datasets import Dataset +from flwr_datasets.partitioner.linear_partitioner import LinearPartitioner + + +def _dummy_dataset(num_rows: int) -> Dataset: + data = { + "features": list(range(num_rows)), + "labels": [i % 2 for i in range(num_rows)], + } + dataset = Dataset.from_dict(data) + return dataset + + +class TestLinearPartitioner(unittest.TestCase): + """Test LinearPartitioner.""" + + @parameterized.expand( # type: ignore + [ + (1, 100), + (10, 100), + (5, 55), # This will leave some undivided samples + ] + ) + def test_linear_distribution(self, num_partitions: int, num_rows: int) -> None: + """Test the linear distribution of samples.""" + dataset = _dummy_dataset(num_rows) + partitioner = LinearPartitioner(num_partitions=num_partitions) + partitioner.dataset = dataset + # Run a single partition loading to trigger the division + _ = partitioner.load_partition(0) + total_samples = sum(partitioner.node_id_to_size.values()) + self.assertEqual(total_samples, num_rows) + + # Testing if each partition is getting more than the previous one + last_count = 0 + for i in range(num_partitions): + current_count = partitioner.node_id_to_size[i] + self.assertGreaterEqual(current_count, last_count) + last_count = current_count + + @parameterized.expand( # type: ignore + [ + (10, 100), + (5, 55), # This will leave some undivided samples + (7, 77), # This will leave some undivided samples + ] + ) + def test_undivided_samples(self, num_partitions: int, num_rows: int) -> None: + """Test the logic for distributing undivided samples.""" + dataset = _dummy_dataset(num_rows) + partitioner = LinearPartitioner(num_partitions=num_partitions) + partitioner.dataset = dataset + # If there are any undivided samples, they should be added to the largest + # partition + last_partition_id = num_partitions - 1 + actual_samples_in_last_partition = len( + partitioner.load_partition(last_partition_id) + ) + expected_samples_in_last_partition = partitioner.node_id_to_size[ + last_partition_id + ] + self.assertEqual( + expected_samples_in_last_partition, actual_samples_in_last_partition + ) + + def test_meaningless_params(self) -> None: + """Test if the params leading to partition size not greater than zero raises.""" + num_rows = 10 + num_partitions = 100 + dataset = _dummy_dataset(num_rows) + partitioner = LinearPartitioner(num_partitions=num_partitions) + partitioner.dataset = dataset + with self.assertRaises(ValueError) as context: + partitioner.load_partition(1) + self.assertIn( + "The given specification of the parameter num_partitions=100 for the given " + "dataset results in the partitions sizes that are not greater than 0.", + str(context.exception), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/datasets/flwr_datasets/partitioner/square_partitioner.py b/datasets/flwr_datasets/partitioner/square_partitioner.py new file mode 100644 index 000000000000..25affaf304ef --- /dev/null +++ b/datasets/flwr_datasets/partitioner/square_partitioner.py @@ -0,0 +1,39 @@ +# Copyright 2023 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SquarePartitioner class.""" + + +import numpy as np + +from flwr_datasets.partitioner.size_partitioner import SizePartitioner + + +class SquarePartitioner(SizePartitioner): + """Partitioner creates partitions of size that are correlated with squared idx. + + The amount of data each client gets is correlated with the squared partition ID. + For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data, + client 2 gets 4 units, and so on, up to client M which gets M^2 units. + + Parameters + ---------- + num_partitions : int + The total number of partitions that the data will be divided into. + """ + + def __init__(self, num_partitions: int) -> None: + super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.square) + if num_partitions <= 0: + raise ValueError("The number of partitions must be greater than zero.")