Skip to content

Commit

Permalink
Fix launch for federated experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 18, 2022
1 parent 4830779 commit 71c9e01
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
35 changes: 16 additions & 19 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,23 @@ def remote_registration(self):
self.logger.info('Sending registration')
self.message('federator', 'ping', 'new_sender')
self.message('federator', 'register_client', self.id, self.rank)

def run(self):
"""
Function to start running the Client after registration. This allows for processing requests by the main thread,
while the RPC requests can be made asynchronously.
Returns: None
"""
self.running = True
# self._event_loop()
event = multiprocessing.Event()
while self.running:
if not self.request_queue.empty():
self.logger.info("Got request, running synchronously")
request = self.request_queue.get()
self.result_queue.put(self.exec_round(*request))
event.wait(1)

def stop_client(self):
"""
Expand All @@ -62,15 +77,6 @@ def stop_client(self):
self.logger.info('Got call to stop event loop')
self.running = False

def _event_loop(self):
return
self.logger.info('Starting event loop')
while self.running:
if not self.request_queue.empty():
self.result_queue.put(self.exec_round(*self.request_queue.get()))
time.sleep(5)
self.logger.info('Exiting node')

def train(self, num_epochs: int, round_id: int):
"""
Function implementing federated learning training loop, allowing to run for a configurable number of epochs
Expand Down Expand Up @@ -170,15 +176,6 @@ def test(self) -> Tuple[float, float, np.array]:
def get_client_datasize(self): # pylint: disable=missing-function-docstring
return len(self.dataset.get_train_sampler())

def run(self):
event = multiprocessing.Event()
while self.running:
if not self.request_queue.empty():
self.logger.info("Got request, running synchronously")
request = self.request_queue.get()
self.result_queue.put(self.exec_round(*request))
event.wait(1)

def request_round(self, num_epochs: int, round_id:int):
event = multiprocessing.Event()
self.request_queue.put([num_epochs, round_id])
Expand Down
15 changes: 11 additions & 4 deletions fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC
)
client_node = Client(f'client{rank}', rank, r_conf.world_size, r_conf)
client_node.remote_registration()
client_node.run()
else:
print(f'Starting the PS (Fed) with world size={r_conf.world_size}')
rpc.init_rpc(
Expand All @@ -295,8 +296,13 @@ def launch_remote(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NIC
world_size=r_conf.world_size,
rpc_backend_options=options
)

federator_node = Federator('federator', 0, r_conf.world_size, r_conf)
federator_node.run()
try:
federator_node.run()
except Exception as e:
logging.critical(f"Federator failed execution with reason: {e}."
f"{e.with_traceback()}")
federator_node.stop_all_clients()
print('Ending program')
exit(0)
Expand Down Expand Up @@ -336,10 +342,11 @@ def launch_cluster(arg_path: Path, conf_path: Path, rank: Rank, nic: Optional[NI
# Set the seed for arrivals, torch seed is mostly ignored. Set the `arrival_seed` to a different value
# for each repetition that you want to run an experiment with.
for replication, experiment_seed in enumerate(conf.execution_config.reproducibility.seeds):
logging.info(f"Starting with experiment replication: {replication} with seed: {experiment_seed}")
init_reproducibility(conf.execution_config)
exec_orchestrator(args=args, conf=conf, replication=replication)
try:
logging.info(f"Starting with experiment replication: {replication} with seed: {experiment_seed}")
init_reproducibility(conf.execution_config)
exec_orchestrator(args=args, conf=conf, replication=replication)
pass
except Exception as e:
logging.info(f"Execution of replication {replication} with seed {experiment_seed} failed."
f"Reason: {e}")

0 comments on commit 71c9e01

Please sign in to comment.