Skip to content

Commit

Permalink
Added a retry mechanism for the connection of flower clients to the f…
Browse files Browse the repository at this point in the history
…lower server.
  • Loading branch information
KFilippopolitis committed Jul 2, 2024
1 parent 1385732 commit 7b502bc
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 10 deletions.
25 changes: 22 additions & 3 deletions exareme2/algorithms/flower/logistic_regression/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import time
import warnings
from math import log2

import flwr as fl
from flwr.common.logger import FLOWER_LOGGER
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
from utils import get_model_parameters
Expand Down Expand Up @@ -44,6 +47,22 @@ def evaluate(self, parameters, config):
set_initial_params(model, X_train, full_data, inputdata)

client = LogisticRegressionClient(model, X_train, y_train)
fl.client.start_client(
server_address=os.environ["SERVER_ADDRESS"], client=client.to_client()
)

attempts = 0
max_attempts = int(log2(int(os.environ["TIMEOUT"])))
while True:
try:
fl.client.start_client(
server_address=os.environ["SERVER_ADDRESS"], client=client.to_client()
)
FLOWER_LOGGER.debug("Connection successful on attempt", attempts + 1)
break
except Exception as e:
FLOWER_LOGGER.warning(
f"Connection with the server failed. Attempt {attempts} failed: {e}"
)
time.sleep(pow(2, attempts))
attempts += 1
if attempts >= max_attempts:
FLOWER_LOGGER.error("Could not establish connection to the server.")
raise e
3 changes: 2 additions & 1 deletion exareme2/controller/celery/tasks_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,14 @@ def queue_healthcheck_task(
)

def start_flower_client(
self, request_id, algorithm_name, server_address
self, request_id, algorithm_name, server_address, execution_timeout
) -> WorkerTaskResult:
return self._queue_task(
task_signature=TASK_SIGNATURES["start_flower_client"],
request_id=request_id,
algorithm_name=algorithm_name,
server_address=server_address,
execution_timeout=execution_timeout,
)

def start_flower_server(
Expand Down
5 changes: 4 additions & 1 deletion exareme2/controller/services/flower/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import List

from exareme2.controller import config as ctrl_config
from exareme2.controller import logger as ctrl_logger
from exareme2.controller.federation_info_logs import log_experiment_execution
from exareme2.controller.services.flower.tasks_handler import FlowerTasksHandler
Expand Down Expand Up @@ -87,7 +88,9 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
)
clients_pids = {
handler.start_flower_client(
algorithm_name, str(server_address)
algorithm_name,
str(server_address),
ctrl_config.flower_execution_timeout,
): handler
for handler in task_handlers
}
Expand Down
6 changes: 4 additions & 2 deletions exareme2/controller/services/flower/tasks_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ def worker_id(self) -> str:
def worker_data_address(self) -> str:
return self._db_address

def start_flower_client(self, algorithm_name, server_address) -> int:
def start_flower_client(
self, algorithm_name, server_address, execution_timeout
) -> int:
return self._worker_tasks_handler.start_flower_client(
self._request_id, algorithm_name, server_address
self._request_id, algorithm_name, server_address, execution_timeout
).get(timeout=self._tasks_timeout)

def start_flower_server(
Expand Down
6 changes: 4 additions & 2 deletions exareme2/worker/flower/starter/starter_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@


@shared_task
def start_flower_client(request_id: str, algorithm_name, server_address) -> int:
def start_flower_client(
request_id: str, algorithm_name, server_address, execution_timeout
) -> int:
return starter_service.start_flower_client(
request_id, algorithm_name, server_address
request_id, algorithm_name, server_address, execution_timeout
)


Expand Down
5 changes: 4 additions & 1 deletion exareme2/worker/flower/starter/starter_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@


@initialise_logger
def start_flower_client(request_id: str, algorithm_name, server_address) -> int:
def start_flower_client(
request_id: str, algorithm_name, server_address, execution_timeout
) -> int:
env_vars = {
"MONETDB_IP": worker_config.monetdb.ip,
"MONETDB_PORT": worker_config.monetdb.port,
Expand All @@ -20,6 +22,7 @@ def start_flower_client(request_id: str, algorithm_name, server_address) -> int:
"CONTROLLER_IP": worker_config.controller.ip,
"CONTROLLER_PORT": worker_config.controller.port,
"DATA_PATH": worker_config.data_path,
"TIMEOUT": execution_timeout,
}
process = FlowerProcess(f"{algorithm_name}/client.py", env_vars=env_vars)
logger = get_logger()
Expand Down

0 comments on commit 7b502bc

Please sign in to comment.