Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Size Partitioners to FDS #2533

Merged
merged 22 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dd02021
Add size partitioner
adam-narozniak Oct 23, 2023
60f47fc
Add size partitioner tests
adam-narozniak Oct 23, 2023
c595d2c
Add linear partitioner
adam-narozniak Oct 23, 2023
0c5ee27
Add square partitioner
adam-narozniak Oct 23, 2023
3e05d5c
Add exponential partitioner
adam-narozniak Oct 23, 2023
a37529a
Remove print
adam-narozniak Oct 23, 2023
c393c1f
Fix tests
adam-narozniak Oct 23, 2023
614c6fc
Add new partitioners to init for easy imports
adam-narozniak Oct 24, 2023
c2772ee
Merge branch 'main' into fds-size-partitioner
tanertopal Oct 26, 2023
3038081
Add jxie/higgs as tested_datasets
Oct 26, 2023
d0f2cfc
Remove jxie/higgs from tested_datasets list
adam-narozniak Nov 7, 2023
05bba2d
Clarify _check_if_cid_to_size_possible by using >= 1 instead >0
adam-narozniak Nov 7, 2023
b5d456e
Rename cid to id (in each method)
adam-narozniak Nov 7, 2023
55691d9
Clarify documentation
adam-narozniak Nov 7, 2023
982155c
Merge branch 'main' into fds-size-partitioner
danieljanes Nov 7, 2023
32c7117
Apply review suggestion
adam-narozniak Nov 7, 2023
e048cc6
Rename cid to node_id
adam-narozniak Nov 7, 2023
d54b059
Merge remote-tracking branch 'origin/fds-size-partitioner' into fds-s…
adam-narozniak Nov 7, 2023
620924f
Apply suggestions from code review
adam-narozniak Nov 7, 2023
def2d43
Change SizePartitioner doc to make line width requirement
adam-narozniak Nov 7, 2023
b3c020f
Merge branch 'main' into fds-size-partitioner
danieljanes Nov 7, 2023
8de3c3f
Merge branch 'main' into fds-size-partitioner
danieljanes Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datasets/flwr_datasets/partitioner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
"""Flower Datasets Partitioner package."""


from .exponential_partitioner import ExponentialPartitioner
from .iid_partitioner import IidPartitioner
from .linear_partitioner import LinearPartitioner
from .partitioner import Partitioner
from .size_partitioner import SizePartitioner
from .square_partitioner import SquarePartitioner

__all__ = [
"IidPartitioner",
"Partitioner",
"SizePartitioner",
"LinearPartitioner",
"SquarePartitioner",
"ExponentialPartitioner",
]
43 changes: 43 additions & 0 deletions datasets/flwr_datasets/partitioner/exponential_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""ExponentialPartitioner class."""


import numpy as np

from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class ExponentialPartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are correlated with exp(idx).

The amount of data each client gets is correlated with the exponent of partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets e units of
data, client 2 gets e^2 units, and so on, up to client M which gets e^M units.
The floor operation is applied on each of these numbers, it means floor(2.71...)
= 2; e^2 ~ 7.39 floor(7.39) = 7. The number is rounded down = the fraction is
always cut. The remainders of theses unassigned (fraction) samples is added to the
biggest partition (the one with the biggest idx).

Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.exp)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
37 changes: 37 additions & 0 deletions datasets/flwr_datasets/partitioner/linear_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""LinearPartitioner class."""


from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class LinearPartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are linearly correlated with idx.

The amount of data each client gets is linearly correlated with the partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data,
client 2 gets 2 units, and so on, up to client M which gets M units.

Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=lambda x: x)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
139 changes: 139 additions & 0 deletions datasets/flwr_datasets/partitioner/size_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SizePartitioner class."""


from typing import Callable, Dict, List, Union

import numpy as np

import datasets
from flwr_datasets.partitioner.partitioner import Partitioner


class SizePartitioner(Partitioner):
"""Base class for the deterministic size partitioning based on the `node_id`.

The client with `node_id` has the following relationship regarding the number of
samples.

`node_id_to_size_fn(node_id)` ~ number of samples for `node_id`

If the function doesn't transform the `node_id` it's a linear correlation between
the number of sample for the node and the value of `node_id`. For instance, if the
node ids range from 1 to M, node with id 1 gets 1 unit of data, client 2 gets 2
units, and so on, up to node M which gets M units.

Note that size corresponding to the `node_id` is deterministic, yet in case of
different dataset shuffling the assignment of samples to `node_id` will vary.

Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
node_id_to_size_fn : Callable
Function that defines the relationship between node id and the number of
samples.
"""

def __init__(
self,
num_partitions: int,
node_id_to_size_fn: Callable, # type: ignore[type-arg]
) -> None:
super().__init__()
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")
self._num_partitions = num_partitions
self._node_id_to_size_fn = node_id_to_size_fn

self._node_id_to_size: Dict[int, int] = {}
self._node_id_to_indices: Dict[int, List[int]] = {}
# A flag to perform only a single compute to determine the indices
self._node_id_to_indices_determined = False

def load_partition(self, idx: int) -> datasets.Dataset:
"""Load a single partition based on the partition index.

For this partitioner the number of samples is dependent on the partition idx.

Parameters
----------
idx : int
the index that corresponds to the requested partition

Returns
-------
dataset_partition: Dataset
single dataset partition
"""
# The partitioning is done lazily - only when the first partition is requested.
# A single run creates the indices assignments for all the partition indices.
self._determine_node_id_to_indices_if_needed()
return self.dataset.select(self._node_id_to_indices[idx])

@property
def node_id_to_size(self) -> Dict[int, int]:
"""Node id to the number of samples."""
return self._node_id_to_size

@property
def node_id_to_indices(self) -> Dict[int, List[int]]:
"""Node id to the list of indices."""
return self._node_id_to_indices

def _determine_node_id_to_size(self) -> None:
"""Determine data quantity associated with partition indices."""
data_division_in_units = self._node_id_to_size_fn(
np.linspace(start=1, stop=self._num_partitions, num=self._num_partitions)
)
total_units: Union[int, float] = data_division_in_units.sum()
# Normalize the units to get the fraction total dataset
partition_sizes_as_fraction = data_division_in_units / total_units
# Calculate the number of samples
partition_sizes_as_num_of_samples = np.array(
partition_sizes_as_fraction * len(self.dataset), dtype=np.int64
)
# Check if any sample is not allocated because of multiplication with fractions.
assigned_samples = np.sum(partition_sizes_as_num_of_samples)
left_unassigned_samples = len(self.dataset) - assigned_samples
# If there is any sample(s) left unassigned, assign it to the largest partition.
partition_sizes_as_num_of_samples[-1] += left_unassigned_samples
for idx, partition_size in enumerate(partition_sizes_as_num_of_samples):
self._node_id_to_size[idx] = partition_size

self._check_if_node_id_to_size_possible()

def _determine_node_id_to_indices_if_needed(self) -> None:
"""Create an assignment of indices to the partition indices.."""
if self._node_id_to_indices_determined is True:
return
self._determine_node_id_to_size()
total_samples_assigned = 0
for idx, quantity in self._node_id_to_size.items():
self._node_id_to_indices[idx] = list(
range(total_samples_assigned, total_samples_assigned + quantity)
)
total_samples_assigned += quantity
self._node_id_to_indices_determined = True

def _check_if_node_id_to_size_possible(self) -> None:
all_positive = all(value >= 1 for value in self.node_id_to_size.values())
if not all_positive:
raise ValueError(
f"The given specification of the parameter num_partitions"
f"={self._num_partitions} for the given dataset results "
f"in the partitions sizes that are not greater than 0."
)
104 changes: 104 additions & 0 deletions datasets/flwr_datasets/partitioner/size_partitioner_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SizePartitioner tests."""


import unittest

from parameterized import parameterized

from datasets import Dataset
from flwr_datasets.partitioner.linear_partitioner import LinearPartitioner


def _dummy_dataset(num_rows: int) -> Dataset:
data = {
"features": list(range(num_rows)),
"labels": [i % 2 for i in range(num_rows)],
}
dataset = Dataset.from_dict(data)
return dataset


class TestLinearPartitioner(unittest.TestCase):
"""Test LinearPartitioner."""

@parameterized.expand( # type: ignore
[
(1, 100),
(10, 100),
(5, 55), # This will leave some undivided samples
]
)
def test_linear_distribution(self, num_partitions: int, num_rows: int) -> None:
"""Test the linear distribution of samples."""
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
# Run a single partition loading to trigger the division
_ = partitioner.load_partition(0)
total_samples = sum(partitioner.node_id_to_size.values())
self.assertEqual(total_samples, num_rows)

# Testing if each partition is getting more than the previous one
last_count = 0
for i in range(num_partitions):
current_count = partitioner.node_id_to_size[i]
self.assertGreaterEqual(current_count, last_count)
last_count = current_count

@parameterized.expand( # type: ignore
[
(10, 100),
(5, 55), # This will leave some undivided samples
(7, 77), # This will leave some undivided samples
]
)
def test_undivided_samples(self, num_partitions: int, num_rows: int) -> None:
"""Test the logic for distributing undivided samples."""
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
# If there are any undivided samples, they should be added to the largest
# partition
last_partition_id = num_partitions - 1
actual_samples_in_last_partition = len(
partitioner.load_partition(last_partition_id)
)
expected_samples_in_last_partition = partitioner.node_id_to_size[
last_partition_id
]
self.assertEqual(
expected_samples_in_last_partition, actual_samples_in_last_partition
)

def test_meaningless_params(self) -> None:
"""Test if the params leading to partition size not greater than zero raises."""
num_rows = 10
num_partitions = 100
dataset = _dummy_dataset(num_rows)
partitioner = LinearPartitioner(num_partitions=num_partitions)
partitioner.dataset = dataset
with self.assertRaises(ValueError) as context:
partitioner.load_partition(1)
self.assertIn(
"The given specification of the parameter num_partitions=100 for the given "
"dataset results in the partitions sizes that are not greater than 0.",
str(context.exception),
)


if __name__ == "__main__":
unittest.main()
39 changes: 39 additions & 0 deletions datasets/flwr_datasets/partitioner/square_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SquarePartitioner class."""


import numpy as np

from flwr_datasets.partitioner.size_partitioner import SizePartitioner


class SquarePartitioner(SizePartitioner):
"""Partitioner creates partitions of size that are correlated with squared idx.

The amount of data each client gets is correlated with the squared partition ID.
For instance, if the IDs range from 1 to M, client with ID 1 gets 1 unit of data,
client 2 gets 4 units, and so on, up to client M which gets M^2 units.

Parameters
----------
num_partitions : int
The total number of partitions that the data will be divided into.
"""

def __init__(self, num_partitions: int) -> None:
super().__init__(num_partitions=num_partitions, node_id_to_size_fn=np.square)
if num_partitions <= 0:
raise ValueError("The number of partitions must be greater than zero.")