Skip to content

Commit

Permalink
Add size partitioners to FDS (#2533)
Browse files Browse the repository at this point in the history
Co-authored-by: yan-gao-GY <[email protected]>
  • Loading branch information
adam-narozniak and yan-gao-GY authored Nov 7, 2023
1 parent dcad379 commit 2a67348
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 0 deletions.
8 changes: 8 additions & 0 deletions datasets/flwr_datasets/partitioner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
"""Flower Datasets Partitioner package."""


from .exponential_partitioner import ExponentialPartitioner
from .iid_partitioner import IidPartitioner
from .linear_partitioner import LinearPartitioner
from .partitioner import Partitioner
from .size_partitioner import SizePartitioner
from .square_partitioner import SquarePartitioner

__all__ = [
"IidPartitioner",
"Partitioner",
"SizePartitioner",
"LinearPartitioner",
"SquarePartitioner",
"ExponentialPartitioner",
]
43 changes: 43 additions & 0 deletions datasets/flwr_datasets/partitioner/exponential_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""ExponentialPartitioner class."""


import numpy as np

from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class ExponentialPartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are correlated with exp(idx).
The amount of data each client gets is correlated with the exponent of partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets e units of
data, client 2 gets e^2 units, and so on, up to client M which gets e^M units.
The floor operation is applied on each of these numbers, it means floor(2.71...)
= 2; e^2 ~ 7.39 floor(7.39) = 7. The number is rounded down = the fraction is
always cut. The remainders of theses unassigned (fraction) samples is added to the
biggest partition (the one with the biggest idx).
Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.exp)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
37 changes: 37 additions & 0 deletions datasets/flwr_datasets/partitioner/linear_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""LinearPartitioner class."""


from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class LinearPartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are linearly correlated with idx.
The amount of data each client gets is linearly correlated with the partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data,
client 2 gets 2 units, and so on, up to client M which gets M units.
Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=lambda x: x)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
139 changes: 139 additions & 0 deletions datasets/flwr_datasets/partitioner/size_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SizePartitioner class."""


from typing import Callable, Dict, List, Union

import numpy as np

import datasets
from flwr_datasets.partitioner.partitioner import Partitioner


class SizePartitioner(Partitioner):
"""Base class for the deterministic size partitioning based on the `node_id`.
The client with `node_id` has the following relationship regarding the number of
samples.
`node_id_to_size_fn(node_id)` ~ number of samples for `node_id`
If the function doesn't transform the `node_id` it's a linear correlation between
the number of sample for the node and the value of `node_id`. For instance, if the
node ids range from 1 to M, node with id 1 gets 1 unit of data, client 2 gets 2
units, and so on, up to node M which gets M units.
Note that size corresponding to the `node_id` is deterministic, yet in case of
different dataset shuffling the assignment of samples to `node_id` will vary.
Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
node_id_to_size_fn : Callable
Function that defines the relationship between node id and the number of
samples.
"""

def __init__(
self,
num_partitions: int,
node_id_to_size_fn: Callable, # type: ignore[type-arg]
) -> None:
super().__init__()
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
self._num_partitions = num_partitions
self._node_id_to_size_fn = node_id_to_size_fn

self._node_id_to_size: Dict[int, int] = {}
self._node_id_to_indices: Dict[int, List[int]] = {}
# A flag to perform only a single compute to determine the indices
self._node_id_to_indices_determined = False

def load_partition(self, idx: int) -> datasets.Dataset:
"""Load a single partition based on the partition index.
For this partitioner the number of samples is dependent on the partition idx.
Parameters
----------
idx : int
the index that corresponds to the requested partition
Returns
-------
dataset_partition: Dataset
single dataset partition
"""
# The partitioning is done lazily - only when the first partition is requested.
# A single run creates the indices assignments for all the partition indices.
self._determine_node_id_to_indices_if_needed()
return self.dataset.select(self._node_id_to_indices[idx])

@property
def node_id_to_size(self) -> Dict[int, int]:
"""Node id to the number of samples."""
return self._node_id_to_size

@property
def node_id_to_indices(self) -> Dict[int, List[int]]:
"""Node id to the list of indices."""
return self._node_id_to_indices

def _determine_node_id_to_size(self) -> None:
"""Determine data quantity associated with partition indices."""
data_division_in_units = self._node_id_to_size_fn(
np.linspace(start=1, stop=self._num_partitions, num=self._num_partitions)
)
total_units: Union[int, float] = data_division_in_units.sum()
# Normalize the units to get the fraction total dataset
partition_sizes_as_fraction = data_division_in_units / total_units
# Calculate the number of samples
partition_sizes_as_num_of_samples = np.array(
partition_sizes_as_fraction * len(self.dataset), dtype=np.int64
)
# Check if any sample is not allocated because of multiplication with fractions.
assigned_samples = np.sum(partition_sizes_as_num_of_samples)
left_unassigned_samples = len(self.dataset) - assigned_samples
# If there is any sample(s) left unassigned, assign it to the largest partition.
partition_sizes_as_num_of_samples[-1] += left_unassigned_samples
for idx, partition_size in enumerate(partition_sizes_as_num_of_samples):
self._node_id_to_size[idx] = partition_size

self._check_if_node_id_to_size_possible()

def _determine_node_id_to_indices_if_needed(self) -> None:
"""Create an assignment of indices to the partition indices.."""
if self._node_id_to_indices_determined is True:
return
self._determine_node_id_to_size()
total_samples_assigned = 0
for idx, quantity in self._node_id_to_size.items():
self._node_id_to_indices[idx] = list(
range(total_samples_assigned, total_samples_assigned + quantity)
)
total_samples_assigned += quantity
self._node_id_to_indices_determined = True

def _check_if_node_id_to_size_possible(self) -> None:
all_positive = all(value >= 1 for value in self.node_id_to_size.values())
if not all_positive:
raise ValueError(
f"The given specification of the parameter num_partitions"
f"={self._num_partitions} for the given dataset results "
f"in the partitions sizes that are not greater than 0."
)
104 changes: 104 additions & 0 deletions datasets/flwr_datasets/partitioner/size_partitioner_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SizePartitioner tests."""


import unittest

from parameterized import parameterized

from datasets import Dataset
from flwr_datasets.partitioner.linear_partitioner import LinearPartitioner


def _dummy_dataset(num_rows: int) -> Dataset:
data = {
"features": list(range(num_rows)),
"labels": [i % 2 for i in range(num_rows)],
}
dataset = Dataset.from_dict(data)
return dataset


class TestLinearPartitioner(unittest.TestCase):
"""Test LinearPartitioner."""

@parameterized.expand( # type: ignore
[
(1, 100),
(10, 100),
(5, 55), # This will leave some undivided samples
]
)
def test_linear_distribution(self, num_partitions: int, num_rows: int) -> None:
"""Test the linear distribution of samples."""
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
# Run a single partition loading to trigger the division
_ = partitioner.load_partition(0)
total_samples = sum(partitioner.node_id_to_size.values())
self.assertEqual(total_samples, num_rows)

# Testing if each partition is getting more than the previous one
last_count = 0
for i in range(num_partitions):
current_count = partitioner.node_id_to_size[i]
self.assertGreaterEqual(current_count, last_count)
last_count = current_count

@parameterized.expand( # type: ignore
[
(10, 100),
(5, 55), # This will leave some undivided samples
(7, 77), # This will leave some undivided samples
]
)
def test_undivided_samples(self, num_partitions: int, num_rows: int) -> None:
"""Test the logic for distributing undivided samples."""
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
# If there are any undivided samples, they should be added to the largest
# partition
last_partition_id = num_partitions - 1
actual_samples_in_last_partition = len(
partitioner.load_partition(last_partition_id)
)
expected_samples_in_last_partition = partitioner.node_id_to_size[
last_partition_id
]
self.assertEqual(
expected_samples_in_last_partition, actual_samples_in_last_partition
)

def test_meaningless_params(self) -> None:
"""Test if the params leading to partition size not greater than zero raises."""
num_rows = 10
num_partitions = 100
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
with self.assertRaises(ValueError) as context:
partitioner.load_partition(1)
self.assertIn(
"The given specification of the parameter num_partitions=100 for the given "
"dataset results in the partitions sizes that are not greater than 0.",
str(context.exception),
)


if __name__ == "__main__":
unittest.main()
39 changes: 39 additions & 0 deletions datasets/flwr_datasets/partitioner/square_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SquarePartitioner class."""


import numpy as np

from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class SquarePartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are correlated with squared idx.
The amount of data each client gets is correlated with the squared partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data,
client 2 gets 4 units, and so on, up to client M which gets M^2 units.
Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.square)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")

0 comments on commit 2a67348

Please sign in to comment.