diff --git a/python/fedml/__init__.py b/python/fedml/__init__.py index 4deaf41c66..7d1b6f60c6 100644 --- a/python/fedml/__init__.py +++ b/python/fedml/__init__.py @@ -25,7 +25,7 @@ _global_training_type = None _global_comm_backend = None -__version__ = "0.8.8a16" +__version__ = "0.8.8a17" def init(args=None): diff --git a/python/fedml/cli/edge_deployment/client_runner.py b/python/fedml/cli/edge_deployment/client_runner.py index c3e20515cd..86d5453535 100755 --- a/python/fedml/cli/edge_deployment/client_runner.py +++ b/python/fedml/cli/edge_deployment/client_runner.py @@ -91,6 +91,7 @@ def __init__(self, args, edge_id=0, request_json=None, agent_config=None, run_id self.mlops_metrics = None self.client_active_list = dict() self.ntp_offset = MLOpsUtils.get_ntp_offset() + self.server_id = None # logging.info("Current directory of client agent: " + self.cur_dir) def build_dynamic_constrain_variables(self, run_id, run_config): @@ -344,7 +345,8 @@ def run(self, process_event): except Exception as e: logging.error("Runner exits with exceptions. {}".format(traceback.format_exc())) self.mlops_metrics.common_report_client_id_status(self.run_id, self.edge_id, - ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) + ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, + server_id=self.server_id) self.reset_devices_status(self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) finally: logging.info("Release resources.") @@ -475,7 +477,8 @@ def run_impl(self): sys_utils.log_return_info(entry_file, ret_code) self.mlops_metrics.report_client_id_status(run_id, self.edge_id, - ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) + ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, + server_id=self.server_id) self.mlops_metrics.client_send_exit_train_msg(run_id, self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) @@ -599,7 +602,8 @@ def exit_run_with_exception(self): ClientConstants.cleanup_bootstrap_process(self.run_id) self.mlops_metrics.report_client_id_status(self.run_id, self.edge_id, - ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) + ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, + server_id=self.server_id) time.sleep(1) @@ -813,6 +817,7 @@ def callback_start_train(self, topic, payload): self.run_process_event = multiprocessing.Event() self.run_process_event.clear() client_runner.run_process_event = self.run_process_event + client_runner.server_id = request_json.get("server_id", "0") logging.info("start the runner process.") self.run_process = Process(target=client_runner.run, args=(self.run_process_event,)) self.run_process.start() diff --git a/python/fedml/cli/server_deployment/server_runner.py b/python/fedml/cli/server_deployment/server_runner.py index 1b937fc53e..61b30f509e 100755 --- a/python/fedml/cli/server_deployment/server_runner.py +++ b/python/fedml/cli/server_deployment/server_runner.py @@ -863,6 +863,13 @@ def callback_start_train(self, topic=None, payload=None): self.run_process_event.clear() self.run(self.run_process_event) + def simulate_as_cloud_server(self): + message_bytes = json.dumps(self.request_json).encode("ascii") + base64_bytes = base64.b64encode(message_bytes) + runner_cmd_encoded = base64_bytes.decode("ascii") + + # fedml login ${ACCOUNT_ID} -v ${FEDML_VERSION} -s -r cloud_server -rc ${FEDML_RUNNER_CMD} -id ${SERVER_DEVICE_ID}; + def start_cloud_server_process_entry(self): try: self.start_cloud_server_process() @@ -1095,7 +1102,9 @@ def send_exit_train_with_exception_request_to_edges(self, edge_id_list, payload) for edge_id in edge_id_list: topic_exit_train = "flserver_agent/" + str(edge_id) + "/exit_train_with_exception" logging.error("exit_train_with_exception: send topic " + topic_exit_train) - self.client_mqtt_mgr.send_message(topic_exit_train, payload) + payload_obj = json.loads(payload) + payload_obj["server_id"] = self.edge_id + self.client_mqtt_mgr.send_message(topic_exit_train, json.dumps(payload_obj)) self.mlops_metrics.common_broadcast_client_training_status(edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED) diff --git a/python/fedml/core/mlops/mlops_metrics.py b/python/fedml/core/mlops/mlops_metrics.py index 0458b4d272..c65e803fdc 100644 --- a/python/fedml/core/mlops/mlops_metrics.py +++ b/python/fedml/core/mlops/mlops_metrics.py @@ -184,13 +184,14 @@ def client_send_exit_train_msg(self, run_id, edge_id, status, msg=None): logging.info("client_send_exit_train_msg.") self.messenger.send_message_json(topic_exit_train_with_exception, message_json) - def report_client_id_status(self, run_id, edge_id, status, running_json=None, is_from_model=False): + def report_client_id_status(self, run_id, edge_id, status, running_json=None, + is_from_model=False, server_id="0"): # if not self.comm_sanity_check(): # return """ this is used for communication between client agent (FedML cli module) and client """ - self.common_report_client_id_status(run_id, edge_id, status) + self.common_report_client_id_status(run_id, edge_id, status, server_id) self.report_client_device_status_to_web_ui(edge_id, status) @@ -201,14 +202,14 @@ def report_client_id_status(self, run_id, edge_id, status, running_json=None, is from ...cli.edge_deployment.client_data_interface import FedMLClientDataInterface FedMLClientDataInterface.get_instance().save_job(run_id, edge_id, status, running_json) - def common_report_client_id_status(self, run_id, edge_id, status): + def common_report_client_id_status(self, run_id, edge_id, status, server_id="0"): # if not self.comm_sanity_check(): # return """ this is used for communication between client agent (FedML cli module) and client """ topic_name = "fl_client/flclient_agent_" + str(edge_id) + "/status" - msg = {"run_id": run_id, "edge_id": edge_id, "status": status} + msg = {"run_id": run_id, "edge_id": edge_id, "status": status, "server_id": server_id} message_json = json.dumps(msg) # logging.info("report_client_id_status. message_json = %s" % message_json) self.messenger.send_message_json(topic_name, message_json) @@ -300,7 +301,7 @@ def report_server_id_status(self, run_id, status): msg = {"run_id": run_id, "edge_id": self.edge_id, "status": status} message_json = json.dumps(msg) # logging.info("report_server_id_status server id {}".format(server_agent_id)) - # logging.info("report_server_id_status. message_json = %s" % message_json) + logging.info("report_server_id_status. message_json = %s" % message_json) self.messenger.send_message_json(topic_name, message_json) self.report_server_device_status_to_web_ui(run_id, status) diff --git a/python/setup.py b/python/setup.py index 6e19cc09b6..ccce01bce9 100644 --- a/python/setup.py +++ b/python/setup.py @@ -91,7 +91,7 @@ def finalize_options(self): setup( name="fedml", - version="0.8.8a16", + version="0.8.8a17", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for "