From 71c9e014d9e63c24cb29c7f3c69b375f7c6cbb4c Mon Sep 17 00:00:00 2001 From: JMGaljaard Date: Sun, 18 Sep 2022 16:26:37 +0200 Subject: [PATCH] Fix launch for federated experiments --- fltk/core/client.py | 35 ++++++++++++++++------------------- fltk/launch.py | 15 +++++++++++---- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/fltk/core/client.py b/fltk/core/client.py index f0aaef2f..a97068c1 100644 --- a/fltk/core/client.py +++ b/fltk/core/client.py @@ -49,8 +49,23 @@ def remote_registration(self): self.logger.info('Sending registration') self.message('federator', 'ping', 'new_sender') self.message('federator', 'register_client', self.id, self.rank) + + def run(self): + """ + Function to start running the Client after registration. This allows for processing requests by the main thread, + while the RPC requests can be made asynchronously. + + Returns: None + + """ self.running = True - # self._event_loop() + event = multiprocessing.Event() + while self.running: + if not self.request_queue.empty(): + self.logger.info("Got request, running synchronously") + request = self.request_queue.get() + self.result_queue.put(self.exec_round(*request)) + event.wait(1) def stop_client(self): """ @@ -62,15 +77,6 @@ def stop_client(self): self.logger.info('Got call to stop event loop') self.running = False - def _event_loop(self): - return - self.logger.info('Starting event loop') - while self.running: - if not self.request_queue.empty(): - self.result_queue.put(self.exec_round(*self.request_queue.get())) - time.sleep(5) - self.logger.info('Exiting node') - def train(self, num_epochs: int, round_id: int): """ Function implementing federated learning training loop, allowing to run for a configurable number of epochs @@ -170,15 +176,6 @@ def test(self) -> Tuple[float, float, np.array]: def get_client_datasize(self): # pylint: disable=missing-function-docstring return len(self.dataset.get_train_sampler()) - def run(self): - event = multiprocessing.Event() - while self.running: - if not self.request_queue.empty(): - self.logger.info("Got request, running synchronously") - request = self.request_queue.get() - self.result_queue.put(self.exec_round(*request)) - event.wait(1) - def request_round(self, num_epochs: int, round_id:int): event = multiprocessing.Event() self.request_queue.put([num_epochs, round_id]) diff --git a/fltk/launch.py b/fltk/launch.py index a64632ad..ed1fb034 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -287,6 +287,7 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC ) client_node = Client(f'client{rank}', rank, r_conf.world_size, r_conf) client_node.remote_registration() + client_node.run() else: print(f'Starting the PS (Fed) with world size={r_conf.world_size}') rpc.init_rpc( @@ -295,8 +296,13 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC world_size=r_conf.world_size, rpc_backend_options=options ) + federator_node = Federator('federator', 0, r_conf.world_size, r_conf) - federator_node.run() + try: + federator_node.run() + except Exception as e: + logging.critical(f"Federator failed execution with reason: {e}." + f"{e.with_traceback()}") federator_node.stop_all_clients() print('Ending program') exit(0) @@ -336,10 +342,11 @@ def launch_cluster(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NI # Set the seed for arrivals, torch seed is mostly ignored. Set the `arrival_seed` to a different value # for each repetition that you want to run an experiment with. for replication, experiment_seed in enumerate(conf.execution_config.reproducibility.seeds): + logging.info(f"Starting with experiment replication: {replication} with seed: {experiment_seed}") + init_reproducibility(conf.execution_config) + exec_orchestrator(args=args, conf=conf, replication=replication) try: - logging.info(f"Starting with experiment replication: {replication} with seed: {experiment_seed}") - init_reproducibility(conf.execution_config) - exec_orchestrator(args=args, conf=conf, replication=replication) + pass except Exception as e: logging.info(f"Execution of replication {replication} with seed {experiment_seed} failed." f"Reason: {e}")