From 6c842a8db3b648351290db4d9779c448dbd49b54 Mon Sep 17 00:00:00 2001 From: yan-gao-GY Date: Thu, 16 Nov 2023 19:59:03 +0000 Subject: [PATCH 1/8] Initialise xgboost-quickstart --- examples/xgboost-quickstart/README.md | 85 ++++++++++ examples/xgboost-quickstart/client.py | 144 ++++++++++++++++ examples/xgboost-quickstart/dataset.py | 23 +++ examples/xgboost-quickstart/pyproject.toml | 15 ++ examples/xgboost-quickstart/requirements.txt | 3 + examples/xgboost-quickstart/run.sh | 17 ++ examples/xgboost-quickstart/server.py | 37 +++++ examples/xgboost-quickstart/strategy.py | 163 +++++++++++++++++++ 8 files changed, 487 insertions(+) create mode 100644 examples/xgboost-quickstart/README.md create mode 100644 examples/xgboost-quickstart/client.py create mode 100644 examples/xgboost-quickstart/dataset.py create mode 100644 examples/xgboost-quickstart/pyproject.toml create mode 100644 examples/xgboost-quickstart/requirements.txt create mode 100755 examples/xgboost-quickstart/run.sh create mode 100644 examples/xgboost-quickstart/server.py create mode 100644 examples/xgboost-quickstart/strategy.py diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md new file mode 100644 index 000000000000..d2ef03ad478f --- /dev/null +++ b/examples/xgboost-quickstart/README.md @@ -0,0 +1,85 @@ +# Flower Example using XGBoost + +This example demonstrates how to perform EXtreme Gradient Boosting (XGBoost) within Flower using `xgboost` package. +Tree-based with bagging method is used for aggregation on the server. + +## Project Setup + +Start by cloning the example project. We prepared a single-line command that you can copy into your shell which will checkout the example for you: + +```shell +git clone --depth=1 https://github.com/adap/flower.git && mv flower/examples/quickstart-xgboost . && rm -rf flower && cd quickstart-xgboost +``` + +This will create a new directory called `quickstart-xgboost` containing the following files: + +``` +-- README.md <- Your're reading this right now +-- server.py <- Defines the server-side logic +-- client.py <- Defines the client-side logic +-- dataset.py <- Defines the functions of data loading and partitioning +-- pyproject.toml <- Example dependencies (if you use Poetry) +-- requirements.txt <- Example dependencies +``` + +### Installing Dependencies + +Project dependencies (such as `xgboost` and `flwr`) are defined in `pyproject.toml` and `requirements.txt`. We recommend [Poetry](https://python-poetry.org/docs/) to install those dependencies and manage your virtual environment ([Poetry installation](https://python-poetry.org/docs/#installation)) or [pip](https://pip.pypa.io/en/latest/development/), but feel free to use a different way of installing dependencies and managing virtual environments if you have other preferences. + +#### Poetry + +```shell +poetry install +poetry shell +``` + +Poetry will install all your dependencies in a newly created virtual environment. To verify that everything works correctly you can run the following command: + +```shell +poetry run python3 -c "import flwr" +``` + +If you don't see any errors you're good to go! + +#### pip + +Write the command below in your terminal to install the dependencies according to the configuration file requirements.txt. + +```shell +pip install -r requirements.txt +``` + +## Run Federated Learning with XGBoost and Flower + +Afterwards you are ready to start the Flower server as well as the clients. +You can simply start the server in a terminal as follows: + +```shell +python3 server.py +``` + +Now you are ready to start the Flower clients which will participate in the learning. +To do so simply open two more terminal windows and run the following commands. + +Start client 1 in the first terminal: + +```shell +python3 client.py +``` + +Start client 2 in the second terminal: + +```shell +python3 client.py +``` + +You will see that XGBoost is starting a federated training. + +Alternatively, you can use `run.sh` to run the same experiment in a single terminal as follows: + +```shell +bash run.sh +``` + +Look at the [code](https://github.com/adap/flower/tree/main/examples/quickstart-xgboost) +and [tutorial](https://flower.dev/docs/framework/tutorial-quickstart-xgboost.html) for a detailed explanation. diff --git a/examples/xgboost-quickstart/client.py b/examples/xgboost-quickstart/client.py new file mode 100644 index 000000000000..f6c6f2e34722 --- /dev/null +++ b/examples/xgboost-quickstart/client.py @@ -0,0 +1,144 @@ +import warnings +from logging import INFO +import xgboost as xgb + +import flwr as fl +from flwr_datasets import FederatedDataset +from flwr.common.logger import log +from flwr.common import ( + Code, + EvaluateIns, + EvaluateRes, + FitIns, + FitRes, + GetParametersIns, + GetParametersRes, + Parameters, + Status, +) +from flwr_datasets.partitioner import IidPartitioner + +from dataset import ( + train_test_split, + transform_dataset_to_dmatrix, +) + + +warnings.filterwarnings("ignore", category=UserWarning) + + +# Load (HIGGS) dataset and conduct partitioning +partitioner = IidPartitioner(num_partitions=10) +fds = FederatedDataset(dataset="jxie/higgs", partitioners={"train": partitioner}) + +# Let's use the first partition as an example +partition = fds.load_partition(idx=0, split="train") +partition.set_format("numpy") + +# Train/test splitting +train_data, valid_data, num_train, num_val = train_test_split( + partition, test_fraction=0.2, seed=42 +) + +# Reformat data to DMatrix for xgboost +train_dmatrix = transform_dataset_to_dmatrix(train_data) +valid_dmatrix = transform_dataset_to_dmatrix(valid_data) + +# Hyper-parameters for xgboost training +num_local_round = 1 +params = { + "objective": "binary:logistic", + "eta": 0.1, # Learning rate + "max_depth": 8, + "eval_metric": "auc", + "nthread": 16, + "num_parallel_tree": 1, + "subsample": 1, + "tree_method": "hist", +} + + +# Define Flower client +class FlowerClient(fl.client.Client): + def __init__(self): + self.bst = None + self.config = None + + def get_parameters(self, ins: GetParametersIns) -> GetParametersRes: + _ = (self, ins) + return GetParametersRes( + status=Status( + code=Code.OK, + message="OK", + ), + parameters=Parameters(tensor_type="", tensors=[]), + ) + + def _local_boost(self): + # Update trees based on local training data. + for i in range(num_local_round): + self.bst.update(train_dmatrix, self.bst.num_boosted_rounds()) + + # Extract the last N=num_local_round trees for sever aggregation + bst = self.bst[ + self.bst.num_boosted_rounds() + - num_local_round : self.bst.num_boosted_rounds() + ] + + return bst + + def fit(self, ins: FitIns) -> FitRes: + if not self.bst: + # First round local training + log(INFO, "Start training at round 1") + bst = xgb.train( + params, + train_dmatrix, + num_boost_round=num_local_round, + evals=[(valid_dmatrix, "validate"), (train_dmatrix, "train")], + ) + self.config = bst.save_config() + self.bst = bst + else: + for item in ins.parameters.tensors: + global_model = bytearray(item) + + # Load global model into booster + self.bst.load_model(global_model) + self.bst.load_config(self.config) + + bst = self._local_boost() + + local_model = bst.save_raw("json") + local_model_bytes = bytes(local_model) + + return FitRes( + status=Status( + code=Code.OK, + message="OK", + ), + parameters=Parameters(tensor_type="", tensors=[local_model_bytes]), + num_examples=num_train, + metrics={}, + ) + + def evaluate(self, ins: EvaluateIns) -> EvaluateRes: + eval_results = self.bst.eval_set( + evals=[(valid_dmatrix, "valid")], + iteration=self.bst.num_boosted_rounds() - 1, + ) + auc = round(float(eval_results.split("\t")[1].split(":")[1]), 4) + + return EvaluateRes( + status=Status( + code=Code.OK, + message="OK", + ), + loss=0.0, + num_examples=num_val, + metrics={"AUC": auc}, + ) + + +# Start Flower client +fl.client.start_client(server_address="127.0.0.1:8080", client=FlowerClient()) diff --git a/examples/xgboost-quickstart/dataset.py b/examples/xgboost-quickstart/dataset.py new file mode 100644 index 000000000000..95455618937b --- /dev/null +++ b/examples/xgboost-quickstart/dataset.py @@ -0,0 +1,23 @@ +import xgboost as xgb +from typing import Union +from datasets import Dataset, DatasetDict + + +def train_test_split(partition: Dataset, test_fraction: float, seed: int): + """Split the data into train and validation set given split rate.""" + train_test = partition.train_test_split(test_size=test_fraction, seed=seed) + partition_train = train_test["train"] + partition_test = train_test["test"] + + num_train = len(partition_train) + num_test = len(partition_test) + + return partition_train, partition_test, num_train, num_test + + +def transform_dataset_to_dmatrix(data: Union[Dataset, DatasetDict]) -> xgb.core.DMatrix: + """Transform dataset to DMatrix format for xgboost.""" + x = data["inputs"] + y = data["label"] + new_data = xgb.DMatrix(x, label=y) + return new_data diff --git a/examples/xgboost-quickstart/pyproject.toml b/examples/xgboost-quickstart/pyproject.toml new file mode 100644 index 000000000000..74256846c693 --- /dev/null +++ b/examples/xgboost-quickstart/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["poetry-core>=1.4.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "xgboost-quickstart" +version = "0.1.0" +description = "Federated XGBoost with Flower (quickstart)" +authors = ["The Flower Authors "] + +[tool.poetry.dependencies] +python = ">=3.8,<3.11" +flwr = ">=1.0,<2.0" +flwr-datasets = ">=0.0.1,<1.0.0" +xgboost = ">=2.0.0,<3.0.0" diff --git a/examples/xgboost-quickstart/requirements.txt b/examples/xgboost-quickstart/requirements.txt new file mode 100644 index 000000000000..9596a8d6cd02 --- /dev/null +++ b/examples/xgboost-quickstart/requirements.txt @@ -0,0 +1,3 @@ +flwr>=1.0, <2.0 +flwr-datasets>=0.0.1, <1.0.0 +xgboost>=2.0.0, <3.0.0 diff --git a/examples/xgboost-quickstart/run.sh b/examples/xgboost-quickstart/run.sh new file mode 100755 index 000000000000..b3ab546022d8 --- /dev/null +++ b/examples/xgboost-quickstart/run.sh @@ -0,0 +1,17 @@ +#!/bin/bash +set -e +cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"/ + +echo "Starting server" +python server.py & +sleep 5 # Sleep for 15s to give the server enough time to start + +for i in `seq 0 1`; do + echo "Starting client $i" + python3 client.py & +done + +# Enable CTRL+C to stop all background processes +trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM +# Wait for all background processes to complete +wait diff --git a/examples/xgboost-quickstart/server.py b/examples/xgboost-quickstart/server.py new file mode 100644 index 000000000000..d287eae596e7 --- /dev/null +++ b/examples/xgboost-quickstart/server.py @@ -0,0 +1,37 @@ +import flwr as fl +from strategy import FedXgbBagging + + +# FL experimental settings +pool_size = 2 +num_rounds = 5 +num_clients_per_round = 2 +num_evaluate_clients = 2 + + +def evaluate_metrics_aggregation(eval_metrics): + """Return an aggregated metric (AUC) for evaluation.""" + total_num = sum([num for num, _ in eval_metrics]) + auc_aggregated = ( + sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num + ) + metrics_aggregated = {"AUC": auc_aggregated} + return metrics_aggregated + + +# Define strategy +strategy = FedXgbBagging( + fraction_fit=(float(num_clients_per_round) / pool_size), + min_fit_clients=num_clients_per_round, + min_available_clients=pool_size, + min_evaluate_clients=num_evaluate_clients, + fraction_evaluate=1.0, + evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation, +) + +# Start Flower server +fl.server.start_server( + server_address="0.0.0.0:8080", + config=fl.server.ServerConfig(num_rounds=num_rounds), + strategy=strategy, +) diff --git a/examples/xgboost-quickstart/strategy.py b/examples/xgboost-quickstart/strategy.py new file mode 100644 index 000000000000..578328ca701b --- /dev/null +++ b/examples/xgboost-quickstart/strategy.py @@ -0,0 +1,163 @@ +# Copyright 2020 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Federated XGBoost bagging aggregation strategy.""" + + +import json +from logging import WARNING +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast + +import flwr as fl +from flwr.common import EvaluateRes, FitRes, Parameters, Scalar +from flwr.common.logger import log +from flwr.server.client_proxy import ClientProxy + + +class FedXgbBagging(fl.server.strategy.FedAvg): + """Configurable FedXgbBagging strategy implementation.""" + + # pylint: disable=too-many-arguments,too-many-instance-attributes, line-too-long + def __init__( + self, + evaluate_function: Optional[ + Callable[ + [int, Parameters, Dict[str, Scalar]], + Optional[Tuple[float, Dict[str, Scalar]]], + ] + ] = None, + **kwargs: Any, + ): + self.evaluate_function = evaluate_function + self.global_model: Optional[bytes] = None + super().__init__(**kwargs) + + def aggregate_fit( + self, + server_round: int, + results: List[Tuple[ClientProxy, FitRes]], + failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]], + ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]: + """Aggregate fit results using bagging.""" + if not results: + return None, {} + # Do not aggregate if there are failures and failures are not accepted + if not self.accept_failures and failures: + return None, {} + + # Aggregate all the client trees + global_model = self.global_model + for _, fit_res in results: + update = fit_res.parameters.tensors + for bst in update: + global_model = aggregate(global_model, bst) + + self.global_model = global_model + + return ( + Parameters(tensor_type="", tensors=[cast(bytes, global_model)]), + {}, + ) + + def aggregate_evaluate( + self, + server_round: int, + results: List[Tuple[ClientProxy, EvaluateRes]], + failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]], + ) -> Tuple[Optional[float], Dict[str, Scalar]]: + """Aggregate evaluation metrics using average.""" + if not results: + return None, {} + # Do not aggregate if there are failures and failures are not accepted + if not self.accept_failures and failures: + return None, {} + + # Aggregate custom metrics if aggregation fn was provided + metrics_aggregated = {} + if self.evaluate_metrics_aggregation_fn: + eval_metrics = [(res.num_examples, res.metrics) for _, res in results] + metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics) + elif server_round == 1: # Only log this warning once + log(WARNING, "No evaluate_metrics_aggregation_fn provided") + + return 0, metrics_aggregated + + def evaluate( + self, server_round: int, parameters: Parameters + ) -> Optional[Tuple[float, Dict[str, Scalar]]]: + """Evaluate model parameters using an evaluation function.""" + if self.evaluate_function is None: + # No evaluation function provided + return None + eval_res = self.evaluate_function(server_round, parameters, {}) + if eval_res is None: + return None + loss, metrics = eval_res + return loss, metrics + + +def aggregate( + bst_prev_org: Optional[bytes], + bst_curr_org: bytes, +) -> bytes: + """Conduct bagging aggregation for given trees.""" + if not bst_prev_org: + return bst_curr_org + + # Get the tree numbers + tree_num_prev, _ = _get_tree_nums(bst_prev_org) + _, paral_tree_num_curr = _get_tree_nums(bst_curr_org) + + bst_prev = json.loads(bytearray(bst_prev_org)) + bst_curr = json.loads(bytearray(bst_curr_org)) + + bst_prev["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ + "num_trees" + ] = str(tree_num_prev + paral_tree_num_curr) + iteration_indptr = bst_prev["learner"]["gradient_booster"]["model"][ + "iteration_indptr" + ] + bst_prev["learner"]["gradient_booster"]["model"]["iteration_indptr"].append( + iteration_indptr[-1] + paral_tree_num_curr + ) + + # Aggregate new trees + trees_curr = bst_curr["learner"]["gradient_booster"]["model"]["trees"] + for tree_count in range(paral_tree_num_curr): + trees_curr[tree_count]["id"] = tree_num_prev + tree_count + bst_prev["learner"]["gradient_booster"]["model"]["trees"].append( + trees_curr[tree_count] + ) + bst_prev["learner"]["gradient_booster"]["model"]["tree_info"].append(0) + + bst_prev_bytes = bytes(json.dumps(bst_prev), "utf-8") + + return bst_prev_bytes + + +def _get_tree_nums(xgb_model_org: bytes) -> Tuple[int, int]: + xgb_model = json.loads(bytearray(xgb_model_org)) + # Get the number of trees + tree_num = int( + xgb_model["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ + "num_trees" + ] + ) + # Get the number of parallel trees + paral_tree_num = int( + xgb_model["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ + "num_parallel_tree" + ] + ) + return tree_num, paral_tree_num From bf901a3b19bf1a8fb1e92d85512bd888bbe0f907 Mon Sep 17 00:00:00 2001 From: yan-gao-GY Date: Fri, 17 Nov 2023 09:52:29 +0000 Subject: [PATCH 2/8] Update server code to use strategy from core framework --- examples/xgboost-quickstart/server.py | 2 +- examples/xgboost-quickstart/strategy.py | 163 ------------------------ 2 files changed, 1 insertion(+), 164 deletions(-) delete mode 100644 examples/xgboost-quickstart/strategy.py diff --git a/examples/xgboost-quickstart/server.py b/examples/xgboost-quickstart/server.py index d287eae596e7..b45a375ce94f 100644 --- a/examples/xgboost-quickstart/server.py +++ b/examples/xgboost-quickstart/server.py @@ -1,5 +1,5 @@ import flwr as fl -from strategy import FedXgbBagging +from flwr.server.strategy import FedXgbBagging # FL experimental settings diff --git a/examples/xgboost-quickstart/strategy.py b/examples/xgboost-quickstart/strategy.py deleted file mode 100644 index 578328ca701b..000000000000 --- a/examples/xgboost-quickstart/strategy.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright 2020 Flower Labs GmbH. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Federated XGBoost bagging aggregation strategy.""" - - -import json -from logging import WARNING -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast - -import flwr as fl -from flwr.common import EvaluateRes, FitRes, Parameters, Scalar -from flwr.common.logger import log -from flwr.server.client_proxy import ClientProxy - - -class FedXgbBagging(fl.server.strategy.FedAvg): - """Configurable FedXgbBagging strategy implementation.""" - - # pylint: disable=too-many-arguments,too-many-instance-attributes, line-too-long - def __init__( - self, - evaluate_function: Optional[ - Callable[ - [int, Parameters, Dict[str, Scalar]], - Optional[Tuple[float, Dict[str, Scalar]]], - ] - ] = None, - **kwargs: Any, - ): - self.evaluate_function = evaluate_function - self.global_model: Optional[bytes] = None - super().__init__(**kwargs) - - def aggregate_fit( - self, - server_round: int, - results: List[Tuple[ClientProxy, FitRes]], - failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]], - ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]: - """Aggregate fit results using bagging.""" - if not results: - return None, {} - # Do not aggregate if there are failures and failures are not accepted - if not self.accept_failures and failures: - return None, {} - - # Aggregate all the client trees - global_model = self.global_model - for _, fit_res in results: - update = fit_res.parameters.tensors - for bst in update: - global_model = aggregate(global_model, bst) - - self.global_model = global_model - - return ( - Parameters(tensor_type="", tensors=[cast(bytes, global_model)]), - {}, - ) - - def aggregate_evaluate( - self, - server_round: int, - results: List[Tuple[ClientProxy, EvaluateRes]], - failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]], - ) -> Tuple[Optional[float], Dict[str, Scalar]]: - """Aggregate evaluation metrics using average.""" - if not results: - return None, {} - # Do not aggregate if there are failures and failures are not accepted - if not self.accept_failures and failures: - return None, {} - - # Aggregate custom metrics if aggregation fn was provided - metrics_aggregated = {} - if self.evaluate_metrics_aggregation_fn: - eval_metrics = [(res.num_examples, res.metrics) for _, res in results] - metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics) - elif server_round == 1: # Only log this warning once - log(WARNING, "No evaluate_metrics_aggregation_fn provided") - - return 0, metrics_aggregated - - def evaluate( - self, server_round: int, parameters: Parameters - ) -> Optional[Tuple[float, Dict[str, Scalar]]]: - """Evaluate model parameters using an evaluation function.""" - if self.evaluate_function is None: - # No evaluation function provided - return None - eval_res = self.evaluate_function(server_round, parameters, {}) - if eval_res is None: - return None - loss, metrics = eval_res - return loss, metrics - - -def aggregate( - bst_prev_org: Optional[bytes], - bst_curr_org: bytes, -) -> bytes: - """Conduct bagging aggregation for given trees.""" - if not bst_prev_org: - return bst_curr_org - - # Get the tree numbers - tree_num_prev, _ = _get_tree_nums(bst_prev_org) - _, paral_tree_num_curr = _get_tree_nums(bst_curr_org) - - bst_prev = json.loads(bytearray(bst_prev_org)) - bst_curr = json.loads(bytearray(bst_curr_org)) - - bst_prev["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ - "num_trees" - ] = str(tree_num_prev + paral_tree_num_curr) - iteration_indptr = bst_prev["learner"]["gradient_booster"]["model"][ - "iteration_indptr" - ] - bst_prev["learner"]["gradient_booster"]["model"]["iteration_indptr"].append( - iteration_indptr[-1] + paral_tree_num_curr - ) - - # Aggregate new trees - trees_curr = bst_curr["learner"]["gradient_booster"]["model"]["trees"] - for tree_count in range(paral_tree_num_curr): - trees_curr[tree_count]["id"] = tree_num_prev + tree_count - bst_prev["learner"]["gradient_booster"]["model"]["trees"].append( - trees_curr[tree_count] - ) - bst_prev["learner"]["gradient_booster"]["model"]["tree_info"].append(0) - - bst_prev_bytes = bytes(json.dumps(bst_prev), "utf-8") - - return bst_prev_bytes - - -def _get_tree_nums(xgb_model_org: bytes) -> Tuple[int, int]: - xgb_model = json.loads(bytearray(xgb_model_org)) - # Get the number of trees - tree_num = int( - xgb_model["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ - "num_trees" - ] - ) - # Get the number of parallel trees - paral_tree_num = int( - xgb_model["learner"]["gradient_booster"]["model"]["gbtree_model_param"][ - "num_parallel_tree" - ] - ) - return tree_num, paral_tree_num From bcbceec77bf0bc317378a4dceeb3c93ff0b71e16 Mon Sep 17 00:00:00 2001 From: Yan Gao Date: Fri, 17 Nov 2023 09:55:30 +0000 Subject: [PATCH 3/8] Update examples/xgboost-quickstart/README.md Co-authored-by: Daniel J. Beutel --- examples/xgboost-quickstart/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md index d2ef03ad478f..1278436631eb 100644 --- a/examples/xgboost-quickstart/README.md +++ b/examples/xgboost-quickstart/README.md @@ -3,6 +3,7 @@ This example demonstrates how to perform EXtreme Gradient Boosting (XGBoost) within Flower using `xgboost` package. Tree-based with bagging method is used for aggregation on the server. +This project provides a minimal code example to enable you to get stated quickly. For a more comprehensive code example, take a look at [xgboost-comprehensive](https://github.com/adap/flower/tree/main/examples/xgboost-comprehensive). ## Project Setup Start by cloning the example project. We prepared a single-line command that you can copy into your shell which will checkout the example for you: From bb248dda3cccbb84037d2669ba67ac5efb00130f Mon Sep 17 00:00:00 2001 From: Yan Gao Date: Fri, 17 Nov 2023 09:55:42 +0000 Subject: [PATCH 4/8] Update examples/xgboost-quickstart/README.md Co-authored-by: Daniel J. Beutel --- examples/xgboost-quickstart/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md index 1278436631eb..f31ce26df3a8 100644 --- a/examples/xgboost-quickstart/README.md +++ b/examples/xgboost-quickstart/README.md @@ -12,7 +12,7 @@ Start by cloning the example project. We prepared a single-line command that you git clone --depth=1 https://github.com/adap/flower.git && mv flower/examples/quickstart-xgboost . && rm -rf flower && cd quickstart-xgboost ``` -This will create a new directory called `quickstart-xgboost` containing the following files: +This will create a new directory called `xgboost-quickstart` containing the following files: ``` -- README.md <- Your're reading this right now From cf5ca611657a56a4eea0371fd21eb583c2f1023d Mon Sep 17 00:00:00 2001 From: yan-gao-GY Date: Fri, 17 Nov 2023 10:08:01 +0000 Subject: [PATCH 5/8] Move dataset loading to client.py; update readme --- examples/xgboost-quickstart/README.md | 5 ++--- examples/xgboost-quickstart/client.py | 28 +++++++++++++++++++++----- examples/xgboost-quickstart/dataset.py | 23 --------------------- 3 files changed, 25 insertions(+), 31 deletions(-) delete mode 100644 examples/xgboost-quickstart/dataset.py diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md index f31ce26df3a8..b689eb82fb42 100644 --- a/examples/xgboost-quickstart/README.md +++ b/examples/xgboost-quickstart/README.md @@ -9,7 +9,7 @@ This project provides a minimal code example to enable you to get stated quickly Start by cloning the example project. We prepared a single-line command that you can copy into your shell which will checkout the example for you: ```shell -git clone --depth=1 https://github.com/adap/flower.git && mv flower/examples/quickstart-xgboost . && rm -rf flower && cd quickstart-xgboost +git clone --depth=1 https://github.com/adap/flower.git && mv flower/examples/xgboost-quickstart . && rm -rf flower && cd xgboost-quickstart ``` This will create a new directory called `xgboost-quickstart` containing the following files: @@ -18,7 +18,6 @@ This will create a new directory called `xgboost-quickstart` containing the foll -- README.md <- Your're reading this right now -- server.py <- Defines the server-side logic -- client.py <- Defines the client-side logic --- dataset.py <- Defines the functions of data loading and partitioning -- pyproject.toml <- Example dependencies (if you use Poetry) -- requirements.txt <- Example dependencies ``` @@ -82,5 +81,5 @@ Alternatively, you can use `run.sh` to run the same experiment in a single termi bash run.sh ``` -Look at the [code](https://github.com/adap/flower/tree/main/examples/quickstart-xgboost) +Look at the [code](https://github.com/adap/flower/tree/main/examples/xgboost-quickstart) and [tutorial](https://flower.dev/docs/framework/tutorial-quickstart-xgboost.html) for a detailed explanation. diff --git a/examples/xgboost-quickstart/client.py b/examples/xgboost-quickstart/client.py index f6c6f2e34722..ddf878ae32c9 100644 --- a/examples/xgboost-quickstart/client.py +++ b/examples/xgboost-quickstart/client.py @@ -1,5 +1,7 @@ import warnings +from typing import Union from logging import INFO +from datasets import Dataset, DatasetDict import xgboost as xgb import flwr as fl @@ -18,15 +20,31 @@ ) from flwr_datasets.partitioner import IidPartitioner -from dataset import ( - train_test_split, - transform_dataset_to_dmatrix, -) - warnings.filterwarnings("ignore", category=UserWarning) +# Define data partitioning related functions +def train_test_split(partition: Dataset, test_fraction: float, seed: int): + """Split the data into train and validation set given split rate.""" + train_test = partition.train_test_split(test_size=test_fraction, seed=seed) + partition_train = train_test["train"] + partition_test = train_test["test"] + + num_train = len(partition_train) + num_test = len(partition_test) + + return partition_train, partition_test, num_train, num_test + + +def transform_dataset_to_dmatrix(data: Union[Dataset, DatasetDict]) -> xgb.core.DMatrix: + """Transform dataset to DMatrix format for xgboost.""" + x = data["inputs"] + y = data["label"] + new_data = xgb.DMatrix(x, label=y) + return new_data + + # Load (HIGGS) dataset and conduct partitioning partitioner = IidPartitioner(num_partitions=10) fds = FederatedDataset(dataset="jxie/higgs", partitioners={"train": partitioner}) diff --git a/examples/xgboost-quickstart/dataset.py b/examples/xgboost-quickstart/dataset.py deleted file mode 100644 index 95455618937b..000000000000 --- a/examples/xgboost-quickstart/dataset.py +++ /dev/null @@ -1,23 +0,0 @@ -import xgboost as xgb -from typing import Union -from datasets import Dataset, DatasetDict - - -def train_test_split(partition: Dataset, test_fraction: float, seed: int): - """Split the data into train and validation set given split rate.""" - train_test = partition.train_test_split(test_size=test_fraction, seed=seed) - partition_train = train_test["train"] - partition_test = train_test["test"] - - num_train = len(partition_train) - num_test = len(partition_test) - - return partition_train, partition_test, num_train, num_test - - -def transform_dataset_to_dmatrix(data: Union[Dataset, DatasetDict]) -> xgb.core.DMatrix: - """Transform dataset to DMatrix format for xgboost.""" - x = data["inputs"] - y = data["label"] - new_data = xgb.DMatrix(x, label=y) - return new_data From 222efdd3972c259ceb8a11af55c43d6f08938cea Mon Sep 17 00:00:00 2001 From: yan-gao-GY Date: Fri, 17 Nov 2023 10:09:21 +0000 Subject: [PATCH 6/8] Format readme --- examples/xgboost-quickstart/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md index b689eb82fb42..538534f8a35d 100644 --- a/examples/xgboost-quickstart/README.md +++ b/examples/xgboost-quickstart/README.md @@ -4,6 +4,7 @@ This example demonstrates how to perform EXtreme Gradient Boosting (XGBoost) wit Tree-based with bagging method is used for aggregation on the server. This project provides a minimal code example to enable you to get stated quickly. For a more comprehensive code example, take a look at [xgboost-comprehensive](https://github.com/adap/flower/tree/main/examples/xgboost-comprehensive). + ## Project Setup Start by cloning the example project. We prepared a single-line command that you can copy into your shell which will checkout the example for you: From 76a6521218a8a78cf8647a6713e7245b178a0291 Mon Sep 17 00:00:00 2001 From: yan-gao-GY Date: Fri, 17 Nov 2023 11:41:00 +0000 Subject: [PATCH 7/8] Add arguments parser for client ID --- examples/xgboost-quickstart/README.md | 4 ++-- examples/xgboost-quickstart/client.py | 15 +++++++++++++-- examples/xgboost-quickstart/run.sh | 4 ++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/examples/xgboost-quickstart/README.md b/examples/xgboost-quickstart/README.md index 538534f8a35d..53cd37e18aa3 100644 --- a/examples/xgboost-quickstart/README.md +++ b/examples/xgboost-quickstart/README.md @@ -65,13 +65,13 @@ To do so simply open two more terminal windows and run the following commands. Start client 1 in the first terminal: ```shell -python3 client.py +python3 client.py --node-id=0 ``` Start client 2 in the second terminal: ```shell -python3 client.py +python3 client.py --node-id=1 ``` You will see that XGBoost is starting a federated training. diff --git a/examples/xgboost-quickstart/client.py b/examples/xgboost-quickstart/client.py index ddf878ae32c9..08458f6d49cd 100644 --- a/examples/xgboost-quickstart/client.py +++ b/examples/xgboost-quickstart/client.py @@ -1,3 +1,4 @@ +import argparse import warnings from typing import Union from logging import INFO @@ -23,6 +24,16 @@ warnings.filterwarnings("ignore", category=UserWarning) +# Define arguments parser for the client/node ID. +parser = argparse.ArgumentParser() +parser.add_argument( + "--node-id", + default=0, + type=int, + help="Node ID used for the current client.", +) +args = parser.parse_args() + # Define data partitioning related functions def train_test_split(partition: Dataset, test_fraction: float, seed: int): @@ -46,11 +57,11 @@ def transform_dataset_to_dmatrix(data: Union[Dataset, DatasetDict]) -> xgb.core. # Load (HIGGS) dataset and conduct partitioning -partitioner = IidPartitioner(num_partitions=10) +partitioner = IidPartitioner(num_partitions=2) fds = FederatedDataset(dataset="jxie/higgs", partitioners={"train": partitioner}) # Let's use the first partition as an example -partition = fds.load_partition(idx=0, split="train") +partition = fds.load_partition(idx=args.node_id, split="train") partition.set_format("numpy") # Train/test splitting diff --git a/examples/xgboost-quickstart/run.sh b/examples/xgboost-quickstart/run.sh index b3ab546022d8..6287145bfb5f 100755 --- a/examples/xgboost-quickstart/run.sh +++ b/examples/xgboost-quickstart/run.sh @@ -4,11 +4,11 @@ cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"/ echo "Starting server" python server.py & -sleep 5 # Sleep for 15s to give the server enough time to start +sleep 5 # Sleep for 5s to give the server enough time to start for i in `seq 0 1`; do echo "Starting client $i" - python3 client.py & + python3 client.py --node-id=$i & done # Enable CTRL+C to stop all background processes From 410210bc27a34ae642c1cbdbecf4fa1b4c5ef953 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Fri, 17 Nov 2023 13:42:26 +0100 Subject: [PATCH 8/8] Update examples/xgboost-quickstart/client.py --- examples/xgboost-quickstart/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/xgboost-quickstart/client.py b/examples/xgboost-quickstart/client.py index 08458f6d49cd..ede4a2bba764 100644 --- a/examples/xgboost-quickstart/client.py +++ b/examples/xgboost-quickstart/client.py @@ -60,7 +60,7 @@ def transform_dataset_to_dmatrix(data: Union[Dataset, DatasetDict]) -> xgb.core. partitioner = IidPartitioner(num_partitions=2) fds = FederatedDataset(dataset="jxie/higgs", partitioners={"train": partitioner}) -# Let's use the first partition as an example +# Load the partition for this `node_id` partition = fds.load_partition(idx=args.node_id, split="train") partition.set_format("numpy")