Skip to content

Commit

Permalink
refactor(datasets) Make naming consistent in VerticalSizePartitioner (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-narozniak authored Dec 20, 2024
1 parent c34c7da commit 621de32
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class VerticalEvenPartitioner(Partitioner):
- `"create_as_last"`: Create a new partition at the end containing only these columns.
- `"add_to_all"`: Append active party columns to all partitions.
- int: Append active party columns to the specified partition index.
drop_columns : Optional[list[str]]
drop_columns : Optional[Union[str, list[str]]]
Columns to remove entirely from the dataset before partitioning.
shared_columns : Optional[list[str]]
shared_columns : Optional[Union[str, list[str]]]
Columns to duplicate into every partition after initial partitioning.
shuffle : bool
Whether to shuffle the order of columns before partitioning.
Expand Down
45 changes: 23 additions & 22 deletions datasets/flwr_datasets/partitioner/vertical_size_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from flwr_datasets.partitioner.partitioner import Partitioner
from flwr_datasets.partitioner.vertical_partitioner_utils import (
_add_active_party_columns,
_init_optional_str_or_list_str,
)


Expand Down Expand Up @@ -51,7 +52,7 @@ class VerticalSizePartitioner(Partitioner):
toward the partition sizes.
In case fo list[int]: sum(partition_sizes) == len(columns) - len(drop_columns) -
len(shared_columns) - len(active_party_columns)
active_party_column : Optional[Union[str, list[str]]]
active_party_columns : Optional[Union[str, list[str]]]
Column(s) (typically representing labels) associated with the
"active party" (which can be the server).
active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int]
Expand All @@ -63,9 +64,9 @@ class VerticalSizePartitioner(Partitioner):
- `"create_as_last"`: Create a new partition at the end containing only these columns.
- `"add_to_all"`: Append active party columns to all partitions.
- int: Append active party columns to the specified partition index.
drop_columns : Optional[list[str]]
drop_columns : Optional[Union[str, list[str]]]
Columns to remove entirely from the dataset before partitioning.
shared_columns : Optional[list[str]]
shared_columns : Optional[Union[str, list[str]]]
Columns to duplicate into every partition after initial partitioning.
shuffle : bool
Whether to shuffle the order of columns before partitioning.
Expand All @@ -79,7 +80,7 @@ class VerticalSizePartitioner(Partitioner):
>>>
>>> partitioner = VerticalSizePartitioner(
... partition_sizes=[8, 4, 2],
... active_party_column="income",
... active_party_columns="income",
... active_party_columns_mode="create_as_last"
... )
>>> fds = FederatedDataset(
Expand All @@ -93,7 +94,7 @@ class VerticalSizePartitioner(Partitioner):
def __init__(
self,
partition_sizes: Union[list[int], list[float]],
active_party_column: Optional[Union[str, list[str]]] = None,
active_party_columns: Optional[Union[str, list[str]]] = None,
active_party_columns_mode: Union[
Literal[
"add_to_first",
Expand All @@ -104,18 +105,20 @@ def __init__(
],
int,
] = "add_to_last",
drop_columns: Optional[list[str]] = None,
shared_columns: Optional[list[str]] = None,
drop_columns: Optional[Union[str, list[str]]] = None,
shared_columns: Optional[Union[str, list[str]]] = None,
shuffle: bool = True,
seed: Optional[int] = 42,
) -> None:
super().__init__()

self._partition_sizes = partition_sizes
self._active_party_columns = self._init_active_party_column(active_party_column)
self._active_party_columns = _init_optional_str_or_list_str(
active_party_columns
)
self._active_party_columns_mode = active_party_columns_mode
self._drop_columns = drop_columns or []
self._shared_columns = shared_columns or []
self._drop_columns = _init_optional_str_or_list_str(drop_columns)
self._shared_columns = _init_optional_str_or_list_str(shared_columns)
self._shuffle = shuffle
self._seed = seed
self._rng = np.random.default_rng(seed=self._seed)
Expand Down Expand Up @@ -201,8 +204,17 @@ def _validate_parameters_in_init(self) -> None:
raise ValueError("partition_sizes must be a list.")
if all(isinstance(fraction, float) for fraction in self._partition_sizes):
fraction_sum = sum(self._partition_sizes)
# Tolerance 0.01 for the floating point numerical problems
if 0.99 < fraction_sum < 1.01:
self._partition_sizes = self._partition_sizes[:-1] + [
1.0 - self._partition_sizes[-1]
]
fraction_sum = 1.0
if fraction_sum != 1.0:
raise ValueError("Float ratios in `partition_sizes` must sum to 1.0.")
raise ValueError(
"Float ratios in `partition_sizes` must sum to 1.0. "
f"Instead got: {fraction_sum}."
)
if any(
fraction < 0.0 or fraction > 1.0 for fraction in self._partition_sizes
):
Expand Down Expand Up @@ -276,17 +288,6 @@ def _validate_parameters_while_partitioning(
"active_party_columns are not included in the division."
)

def _init_active_party_column(
self, active_party_column: Optional[Union[str, list[str]]]
) -> list[str]:
if active_party_column is None:
return []
if isinstance(active_party_column, str):
return [active_party_column]
if isinstance(active_party_column, list):
return active_party_column
raise ValueError("active_party_column must be a string or a list of strings.")


def _count_split(columns: list[str], counts: list[int]) -> list[list[str]]:
partition_columns = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def test_init_invalid_mode(self) -> None:

def test_init_active_party_column_invalid_type(self) -> None:
"""Check ValueError if active_party_column is not str/list."""
with self.assertRaises(ValueError):
VerticalSizePartitioner(partition_sizes=[2, 2], active_party_column=123)
with self.assertRaises(TypeError):
VerticalSizePartitioner(partition_sizes=[2, 2], active_party_columns=123)

def test_partitioning_with_int_sizes(self) -> None:
"""Check correct partitioning with integer sizes."""
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_partitioning_with_active_party_add_to_last(self) -> None:
dataset = _create_dummy_dataset(columns)
partitioner = VerticalSizePartitioner(
partition_sizes=[2],
active_party_column="label",
active_party_columns="label",
active_party_columns_mode="add_to_last",
shuffle=False,
)
Expand All @@ -138,7 +138,7 @@ def test_partitioning_with_active_party_create_as_first(self) -> None:
dataset = _create_dummy_dataset(columns)
partitioner = VerticalSizePartitioner(
partition_sizes=[2],
active_party_column="label",
active_party_columns="label",
active_party_columns_mode="create_as_first",
shuffle=False,
)
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_partitioning_with_nonexistent_active_party_column(self) -> None:
columns = ["f1", "f2"]
dataset = _create_dummy_dataset(columns)
partitioner = VerticalSizePartitioner(
partition_sizes=[1], active_party_column="missing_label", shuffle=False
partition_sizes=[1], active_party_columns="missing_label", shuffle=False
)
partitioner.dataset = dataset
with self.assertRaises(ValueError):
Expand Down

0 comments on commit 621de32

Please sign in to comment.