Skip to content

Commit

Permalink
Merge branch 'main' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
tanertopal authored Oct 29, 2024
2 parents f75653c + c1f0a24 commit 0f58d42
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 22 deletions.
1 change: 1 addition & 0 deletions examples/quickstart-pytorch/pytorchexample/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def train(net, trainloader, valloader, epochs, learning_rate, device):

def test(net, testloader, device):
"""Validate the model on the test set."""
net.to(device) # move model to GPU if available
criterion = torch.nn.CrossEntropyLoss()
correct, loss = 0, 0.0
with torch.no_grad():
Expand Down
38 changes: 38 additions & 0 deletions glossary/heterogeneity-in-federated-learning.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
title: "Heterogenity in Federated Learning"
description: "Heterogeneity is a core challenge in FL, and countering the problems that result from it is an active field of study. We distinguish statistical and structural heterogeneity."
date: "2024-05-24"
author:
name: "Adam Narożniak"
position: "ML Engineer at Flower Labs"
website: "https://discuss.flower.ai/u/adam.narozniak/summary"
---

Heterogeneity is a core challenge in federated learning (FL), and countering the problems that result from it is an active field of study in FL. We can distinguish the following categories:
* statistical heterogeneity (related to data),
* structural heterogeneity (related to resources and infrastructure).

Real-world FL training can exhibit any combination of the problems described below.

### Statistical Heterogeneity
Statistical heterogeneity is the situation in which the clients' distributions are not equal, which can be a result of the following:
* feature distribution skew (covariate shift),
* label distribution skew (prior probability shift),
* same label, different features (concept drift),
* same features, different label (concept shift),
* quantity skew.

### Structural Heterogeneity
Structural Heterogeneity results from different types of devices that can be in the same federation of FL devices, which can exhibit differences in the following:
* computation resource (different chips), which leads to different training times,
* storage resources (available disk space), which can imply, e.g., not enough resources to store the results (which can also indicate a different number of samples),
* energy levels/charging status, current resource consumption, which can change over time and can imply the lack of willingness/capabilities to join the training,
* network connection, e.g., unstable network connection, can lead to more frequent dropouts and lack of availability.

### Simulating Heterogeneity - Flower Datasets
Flower Datasets is a library that enables you to simulate statistical heterogeneity according to the various partitioning schemes (see all [here](https://flower.ai/docs/datasets/ref-api/flwr_datasets.partitioner.html)).
It provides ways of simulating quantity skew, label distribution skew, and mix of them, depending on the object used. It also enables working with datasets that naturally exhibit different types of heterogeneity.

### Countering Heterogeneity - Strategies in Flower
Flower is a library that enables you to perform federated learning in deployment (real-life scenario) and simulation. It provides out-of-the-box weight aggregation
strategies (see them [here](https://flower.ai/docs/framework/ref-api/flwr.server.strategy.html)), which are used as the core measures to mitigate problems in heterogeneous environments.
18 changes: 10 additions & 8 deletions src/py/flwr/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@
TRANSPORT_TYPE_GRPC_ADAPTER,
TRANSPORT_TYPE_GRPC_RERE,
TRANSPORT_TYPE_REST,
Status,
)
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import log
from flwr.common.secure_aggregation.crypto.symmetric_encryption import (
private_key_to_bytes,
public_key_to_bytes,
)
from flwr.common.typing import RunStatus
from flwr.proto.fleet_pb2_grpc import ( # pylint: disable=E0611
add_FleetServicer_to_server,
)
Expand Down Expand Up @@ -345,7 +343,7 @@ def run_superlink() -> None:
# Scheduler thread
scheduler_th = threading.Thread(
target=_flwr_serverapp_scheduler,
args=(state_factory, args.driver_api_address),
args=(state_factory, args.driver_api_address, args.ssl_ca_certfile),
)
scheduler_th.start()
bckg_threads.append(scheduler_th)
Expand All @@ -367,7 +365,9 @@ def run_superlink() -> None:


def _flwr_serverapp_scheduler(
state_factory: LinkStateFactory, driver_api_address: str
state_factory: LinkStateFactory,
driver_api_address: str,
ssl_ca_certfile: Optional[str],
) -> None:
log(DEBUG, "Started flwr-serverapp scheduler thread.")

Expand All @@ -380,10 +380,6 @@ def _flwr_serverapp_scheduler(

if pending_run_id:

# Set run as starting
state.update_run_status(
run_id=pending_run_id, new_status=RunStatus(Status.STARTING, "", "")
)
log(
INFO,
"Launching `flwr-serverapp` subprocess with run-id %d. "
Expand All @@ -399,6 +395,12 @@ def _flwr_serverapp_scheduler(
"--run-id",
str(pending_run_id),
]
if ssl_ca_certfile:
command.append("--root-certificates")
command.append(ssl_ca_certfile)
else:
command.append("--insecure")

subprocess.run(
command,
stdout=None,
Expand Down
89 changes: 76 additions & 13 deletions src/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
"""Flower ServerApp process."""

import argparse
from logging import DEBUG, INFO
import sys
from logging import DEBUG, INFO, WARN
from os.path import isfile
from pathlib import Path
from typing import Optional

from flwr.common.logger import log
Expand All @@ -41,38 +44,98 @@ def flwr_serverapp() -> None:
help="Id of the Run this process should start. If not supplied, this "
"function will request a pending run to the LinkState.",
)
parser.add_argument(
"--flwr-dir",
default=None,
help="""The path containing installed Flower Apps.
By default, this value is equal to:
- `$FLWR_HOME/` if `$FLWR_HOME` is defined
- `$XDG_DATA_HOME/.flwr/` if `$XDG_DATA_HOME` is defined
- `$HOME/.flwr/` in all other cases
""",
)
parser.add_argument(
"--insecure",
action="store_true",
help="Run the server without HTTPS, regardless of whether certificate "
"paths are provided. By default, the server runs with HTTPS enabled. "
"Use this flag only if you understand the risks.",
)
parser.add_argument(
"--root-certificates",
metavar="ROOT_CERT",
type=str,
help="Specifies the path to the PEM-encoded root certificate file for "
"establishing secure HTTPS connections.",
)
args = parser.parse_args()

certificates = _try_obtain_certificates(args)

log(
DEBUG,
"Staring isolated `ServerApp` connected to SuperLink DriverAPI at %s "
"for run-id %s",
args.superlink,
args.run_id,
)
run_serverapp(superlink=args.superlink, run_id=args.run_id)
run_serverapp(
superlink=args.superlink,
run_id=args.run_id,
flwr_dir_=args.flwr_dir,
certificates=certificates,
)


def _try_obtain_certificates(
args: argparse.Namespace,
) -> Optional[bytes]:

if args.insecure:
if args.root_certificates is not None:
sys.exit(
"Conflicting options: The '--insecure' flag disables HTTPS, "
"but '--root-certificates' was also specified. Please remove "
"the '--root-certificates' option when running in insecure mode, "
"or omit '--insecure' to use HTTPS."
)
log(
WARN,
"Option `--insecure` was set. Starting insecure HTTP channel to %s.",
args.superlink,
)
root_certificates = None
else:
# Load the certificates if provided, or load the system certificates
if not isfile(args.root_certificates):
sys.exit("Path argument `--root-certificates` does not point to a file.")
root_certificates = Path(args.root_certificates).read_bytes()
log(
DEBUG,
"Starting secure HTTPS channel to %s "
"with the following certificates: %s.",
args.superlink,
args.root_certificates,
)
return root_certificates


def run_serverapp( # pylint: disable=R0914
superlink: str,
run_id: Optional[int] = None,
flwr_dir_: Optional[str] = None,
certificates: Optional[bytes] = None,
) -> None:
"""Run Flower ServerApp process.
Parameters
----------
superlink : str
Address of SuperLink
run_id : Optional[int] (default: None)
Unique identifier of a Run registered at the LinkState. If not supplied,
this function will request a pending run to the LinkState.
"""
"""Run Flower ServerApp process."""
_ = GrpcDriver(
run_id=run_id if run_id else 0,
driver_service_address=superlink,
root_certificates=None,
root_certificates=certificates,
)

log(INFO, "%s", flwr_dir_)

# Then, GetServerInputs

# Then, run ServerApp
8 changes: 7 additions & 1 deletion src/py/flwr/server/superlink/driver/driver_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from uuid import UUID

import grpc
from google.protobuf.message import Message as GrpcMessage

from flwr.common.constant import Status
from flwr.common.logger import log
Expand Down Expand Up @@ -212,7 +213,7 @@ def PullServerAppInputs(
# Lock access to LinkState, preventing obtaining the same pending run_id
with self.lock:
# If run_id is provided, use it, otherwise use the pending run_id
if request.HasField("run_id"):
if _has_field(request, "run_id"):
run_id: Optional[int] = request.run_id
else:
run_id = state.get_pending_run_id()
Expand Down Expand Up @@ -256,3 +257,8 @@ def PushServerAppOutputs(
def _raise_if(validation_error: bool, detail: str) -> None:
if validation_error:
raise ValueError(f"Malformed PushTaskInsRequest: {detail}")


def _has_field(message: GrpcMessage, field_name: str) -> bool:
"""Check if a certain field is set for the message, including scalar fields."""
return field_name in {fld.name for fld, _ in message.ListFields()}

0 comments on commit 0f58d42

Please sign in to comment.