Skip to content

Commit

Permalink
[CoreEngine] refactor to report the client status with server id so t…
Browse files Browse the repository at this point in the history
…hat the public server can quit when running on the Falcon platform.
  • Loading branch information
fedml-alex committed Aug 16, 2023
1 parent 5726bb2 commit 80d0331
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion python/fedml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
_global_training_type = None
_global_comm_backend = None

__version__ = "0.8.8a16"
__version__ = "0.8.8a17"


def init(args=None):
Expand Down
11 changes: 8 additions & 3 deletions python/fedml/cli/edge_deployment/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(self, args, edge_id=0, request_json=None, agent_config=None, run_id
self.mlops_metrics = None
self.client_active_list = dict()
self.ntp_offset = MLOpsUtils.get_ntp_offset()
self.server_id = None
# logging.info("Current directory of client agent: " + self.cur_dir)

def build_dynamic_constrain_variables(self, run_id, run_config):
Expand Down Expand Up @@ -344,7 +345,8 @@ def run(self, process_event):
except Exception as e:
logging.error("Runner exits with exceptions. {}".format(traceback.format_exc()))
self.mlops_metrics.common_report_client_id_status(self.run_id, self.edge_id,
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
server_id=self.server_id)
self.reset_devices_status(self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
finally:
logging.info("Release resources.")
Expand Down Expand Up @@ -475,7 +477,8 @@ def run_impl(self):
sys_utils.log_return_info(entry_file, ret_code)

self.mlops_metrics.report_client_id_status(run_id, self.edge_id,
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
server_id=self.server_id)

self.mlops_metrics.client_send_exit_train_msg(run_id, self.edge_id,
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
Expand Down Expand Up @@ -599,7 +602,8 @@ def exit_run_with_exception(self):
ClientConstants.cleanup_bootstrap_process(self.run_id)

self.mlops_metrics.report_client_id_status(self.run_id, self.edge_id,
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
server_id=self.server_id)

time.sleep(1)

Expand Down Expand Up @@ -813,6 +817,7 @@ def callback_start_train(self, topic, payload):
self.run_process_event = multiprocessing.Event()
self.run_process_event.clear()
client_runner.run_process_event = self.run_process_event
client_runner.server_id = request_json.get("server_id", "0")
logging.info("start the runner process.")
self.run_process = Process(target=client_runner.run, args=(self.run_process_event,))
self.run_process.start()
Expand Down
11 changes: 10 additions & 1 deletion python/fedml/cli/server_deployment/server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,13 @@ def callback_start_train(self, topic=None, payload=None):
self.run_process_event.clear()
self.run(self.run_process_event)

def simulate_as_cloud_server(self):
message_bytes = json.dumps(self.request_json).encode("ascii")
base64_bytes = base64.b64encode(message_bytes)
runner_cmd_encoded = base64_bytes.decode("ascii")

# fedml login ${ACCOUNT_ID} -v ${FEDML_VERSION} -s -r cloud_server -rc ${FEDML_RUNNER_CMD} -id ${SERVER_DEVICE_ID};

def start_cloud_server_process_entry(self):
try:
self.start_cloud_server_process()
Expand Down Expand Up @@ -1095,7 +1102,9 @@ def send_exit_train_with_exception_request_to_edges(self, edge_id_list, payload)
for edge_id in edge_id_list:
topic_exit_train = "flserver_agent/" + str(edge_id) + "/exit_train_with_exception"
logging.error("exit_train_with_exception: send topic " + topic_exit_train)
self.client_mqtt_mgr.send_message(topic_exit_train, payload)
payload_obj = json.loads(payload)
payload_obj["server_id"] = self.edge_id
self.client_mqtt_mgr.send_message(topic_exit_train, json.dumps(payload_obj))
self.mlops_metrics.common_broadcast_client_training_status(edge_id,
ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)

Expand Down
11 changes: 6 additions & 5 deletions python/fedml/core/mlops/mlops_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,14 @@ def client_send_exit_train_msg(self, run_id, edge_id, status, msg=None):
logging.info("client_send_exit_train_msg.")
self.messenger.send_message_json(topic_exit_train_with_exception, message_json)

def report_client_id_status(self, run_id, edge_id, status, running_json=None, is_from_model=False):
def report_client_id_status(self, run_id, edge_id, status, running_json=None,
is_from_model=False, server_id="0"):
# if not self.comm_sanity_check():
# return
"""
this is used for communication between client agent (FedML cli module) and client
"""
self.common_report_client_id_status(run_id, edge_id, status)
self.common_report_client_id_status(run_id, edge_id, status, server_id)

self.report_client_device_status_to_web_ui(edge_id, status)

Expand All @@ -201,14 +202,14 @@ def report_client_id_status(self, run_id, edge_id, status, running_json=None, is
from ...cli.edge_deployment.client_data_interface import FedMLClientDataInterface
FedMLClientDataInterface.get_instance().save_job(run_id, edge_id, status, running_json)

def common_report_client_id_status(self, run_id, edge_id, status):
def common_report_client_id_status(self, run_id, edge_id, status, server_id="0"):
# if not self.comm_sanity_check():
# return
"""
this is used for communication between client agent (FedML cli module) and client
"""
topic_name = "fl_client/flclient_agent_" + str(edge_id) + "/status"
msg = {"run_id": run_id, "edge_id": edge_id, "status": status}
msg = {"run_id": run_id, "edge_id": edge_id, "status": status, "server_id": server_id}
message_json = json.dumps(msg)
# logging.info("report_client_id_status. message_json = %s" % message_json)
self.messenger.send_message_json(topic_name, message_json)
Expand Down Expand Up @@ -300,7 +301,7 @@ def report_server_id_status(self, run_id, status):
msg = {"run_id": run_id, "edge_id": self.edge_id, "status": status}
message_json = json.dumps(msg)
# logging.info("report_server_id_status server id {}".format(server_agent_id))
# logging.info("report_server_id_status. message_json = %s" % message_json)
logging.info("report_server_id_status. message_json = %s" % message_json)
self.messenger.send_message_json(topic_name, message_json)

self.report_server_device_status_to_web_ui(run_id, status)
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def finalize_options(self):

setup(
name="fedml",
version="0.8.8a16",
version="0.8.8a17",
author="FedML Team",
author_email="[email protected]",
description="A research and production integrated edge-cloud library for "
Expand Down

0 comments on commit 80d0331

Please sign in to comment.